Akka-Actor模型-解决高并发的终极方案-集群篇(一)
ZealSinger 发布于 阅读:71 技术文档
⭐️
集群的相关概念,我们有讲到过,现在我们来正式学习AKKA集群拓展下的相关API和使用
启动一个集群
集群肯定首先由一个节点开始跑起来,然后随着更多的节点加入慢慢成为了一个集群。所以集群的开端可以看作是一个“允许别人加入的Actor系统”
首先就得在配置上允许Cluster拓展,需要添加配置文件和依赖
首先是依赖
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.10.5")
implementation "com.typesafe.akka:akka-cluster_${versions.ScalaBinary}"
}
然后是需要起一个配置文件application.config
akka {
actor {
provider = "cluster" // 启动模式 cluster集群模式 固定的为cluster即可
}
remote.artery {
canonical {
hostname = "127.0.0.1"
port = 2551 // 启动IP和端口
}
}
cluster {
seed-nodes = [
// 标记种子节点 种子节点是外部节点加入集群的中介节点 没有种子节点是不行的 而且可以有多个种子节点
// 一般而言 种子节点越多 集群容错越大
// 需要注意的是这个格式 akka:// 后面的系统名需要和启动节点的ActorSystem.create("....")保持一致才可以
"akka://ClusterSystem@127.0.0.1:2551"]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
之后 我们在这个项目中启动的服务 就成为了一个可以被别的节点加入的Actor Cluster
// Main.kt
fun main(args: Array<String>) {
// 这是正常的起一个Actor服务 然后由根Actor调用自定义Actor
val system = ActorSystem.create("ClusterSystem") // 这里就是配置文件中akka://后面的内容 需要相同
val firstActor = system.actorOf(
Props.create(ClusterMasterActor::class.java),
"firstActor"
)
}
// ClusterMasterActor.kt
class ClusterMasterActor():AbstractActor() {
var cluster: Cluster = Cluster.get(context.system) // 获取集群对象
// subscribe to cluster changes
override fun preStart() {
// actor订阅集群
/*
getSelf():
订阅事件的接收者(即当前 Actor),集群事件将发送到该 Actor 的邮箱。
ClusterEvent.initialStateAsEvents():
将集群的 当前状态(如已有成员)作为初始事件发送。例如,若集群已有节点 A 和 B,订阅后会立即收到 MemberUp 事件。
MemberEvent.class:
订阅所有成员状态变更事件(如节点加入 MemberUp、离开 MemberRemoved 这些消息就会发送给当前actor)。
UnreachableMember.class:
订阅节点不可达事件(如网络故障导致节点无法访问)
*/
cluster.subscribe(
self, initialStateAsEvents(), MemberEvent::class.java, UnreachableMember::class.java
)
}
// re-subscribe when restart
override fun postStop() {
// 当节点关闭的时候需要取消订阅该集群
cluster.unsubscribe(self)
}
override fun createReceive(): Receive {
return receiveBuilder()
// 自定义监听stringe类型的信息 方便后面测试
.match(String::class.java,{
println("收到来自:${context().sender()} 的消息:${it}")
})
.match(
MemberUp::class.java // MemberUp消息 是当新节点加入集群时触发的消息
) { mUp: MemberUp ->
println("Member is Up: {${mUp.member()}}")
}
.match(
UnreachableMember::class.java // UnreachableMember 当节点因网络问题不可达时触发
) { mUnreachable: UnreachableMember ->
println("Member detected as unreachable: {${mUnreachable.member()}}")
}
.match(
MemberRemoved::class.java // MemberRemoved 当节点被移除(主动退出或故障判定后)时触发。
) { mRemoved: MemberRemoved ->
println("Member is Removed: {${mRemoved.member()}}")
}
.match(
MemberEvent::class.java // MemberEvent 捕获所有未明确处理的成员事件
) { }
.build()
}
}
启动Main函数之后,如下可以看到集群创建成功且2551即当前actor加入成功
然后此时我们跑另外一个服务 代码如下
// 点击装订区域中的 <icon src="AllIcons.Actions.Execute"/> 图标。
fun main(args: Array<String>) {
val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2552").withFallback(ConfigFactory.load()) // 因为这里直接还是用的上面的那个配置文件 所以这里需要改一下端口号 可以直接在配置文件中改 我这里没有在文件中改了 在代码中通过ConfigFactory进行的修改 将端口修改为2552 因为2551上面已经再用了会冲突
val system: ActorSystem = ActorSystem.create("ClusterSystem", config) // 这里的名字必须和上面是一样的 才能保证是一个集群下
val actorRef = system.actorOf(Props.create(SecondActor::class.java), "secondActor")
actorRef.tell("Hello", ActorRef.noSender())//发送消息
}
class SecondActor(): AbstractActor() {
var cluster: Cluster = Cluster.get(context.system)
// subscribe to cluster changes
override fun preStart() {
// #subscribe
cluster.subscribe(
self, initialStateAsEvents(), MemberEvent::class.java, UnreachableMember::class.java
)
// #subscribe
}
// re-subscribe when restart
override fun postStop() {
cluster.unsubscribe(self)
}
override fun createReceive(): Receive {
return receiveBuilder()
.match(String::class.java,{
// 通过2552发给2551 实现跨端口发送
val actorSelection = context().actorSelection("akka://ClusterSystem@127.0.0.1:2551/user/firstActor")
actorSelection.tell(it,self)
})
.match(
MemberUp::class.java
) { mUp: MemberUp ->
println("Member is Up: {${mUp.member()}}")
}
.match(
UnreachableMember::class.java
) { mUnreachable: UnreachableMember ->
println("Member detected as unreachable: {${mUnreachable.member()}}")
}
.match(
MemberRemoved::class.java
) { mRemoved: MemberRemoved ->
println("Member is Removed: {${mRemoved.member()}}")
}
.match(
MemberEvent::class.java
) { }
.build()
}
}
启动后如下 右边为master,可以看到接收到了消息也接收到了集群中添加节点的消息,
通过JoinSeedNodes()/Join()加入集群
在上述案例中,无论是集群初始actor节点还是后来加入集群的节点,都得配置config文件,都得提前准备好配置文件中的种子节点信息,属于静态配置文件的方式加入
在Cluster对象中,提供了一个JoinSeedNodes()方法,允许在代码中动态的加入到集群中,该方法需要传入一个List<Address>类型的集合,这个集合就是种子节点的地址的集合,通过这个集合可以利用其中的种子节点的信息加入到集群中
// 2552Actor的代码这么写也是能成功加入的
fun main(args: Array<String>) {
val system: ActorSystem = ActorSystem.create("ClusterSystem")
val actorRef = system.actorOf(Props.create(SecondActor::class.java), "secondActor")
actorRef.tell("Hello", ActorRef.noSender())
}
class SecondActor(): AbstractActor() {
var cluster: Cluster = Cluster.get(context.system)
init {
cluster.joinSeedNodes(mutableListOf<Address>(AddressFromURIString.parse("akka://ClusterSystem@127.0.0.1:2551"))) // 通过这里进行加入种子节点信息,就不需要在配置文件中静态写死
}
// subscribe to cluster changes
override fun preStart() {
// #subscribe
cluster.subscribe(
self, initialStateAsEvents(), MemberEvent::class.java, UnreachableMember::class.java
)
// #subscribe
}
// re-subscribe when restart
override fun postStop() {
cluster.unsubscribe(self)
}
override fun createReceive(): Receive {
return receiveBuilder()
.match(String::class.java,{
val actorSelection = context.actorSelection("akka://ClusterSystem@127.0.0.1:2551/user/firstActor")
actorSelection.tell(it,self)
})
.match(
MemberUp::class.java
) { mUp: MemberUp ->
println("Member is Up: {${mUp.member()}}")
}
.match(
UnreachableMember::class.java
) { mUnreachable: UnreachableMember ->
println("Member detected as unreachable: {${mUnreachable.member()}}")
}
.match(
MemberRemoved::class.java
) { mRemoved: MemberRemoved ->
println("Member is Removed: {${mRemoved.member()}}")
}
.match(
MemberEvent::class.java
) { }
.build()
}
}
当然,这个是通过传入种子节点的集合来实现的,如果你只有一个种子节点或者就是想通过某个特定的种子节点加入,就可以使用join方法实现
cluster.join((AddressFromURIString.parse("akka://ClusterSystem@127.0.0.1:2551")))
这样也是可以的
订阅集群事件
在我们加入集群的一开始,就出现了一个cluster.subscribe()方法,该方法的主要作用就是订阅集群事件
可以来看看这个方法的函数原型
-
第一个参数代表要订阅的Actor
-
第二个参数代表订阅者在订阅集群事件时,是否接收当前集群状态的初始数据,以及以何种形式接收
该参数一般填如下两个:
ClusterEvent.initialStateAsEvents()
或者ClusterEvent.initialStateAsSnapshot()
前者代表接收当前集群状态信息,信息以消息的形式,根据不同操作对应不同的消息类型,法到接收者Actor的邮箱中。适合要当前状态和历史信息进行一一处理,需要将现有成员状态视为“事件流”处理(例如初始化时逐个处理节点),希望统一处理初始状态和动态变化(代码逻辑一致)的场景。
后者也是接收当前集群状态信息,但是信息是以一个
CurrentClusterState
对象的形式发送,也就是说所有的信息封装在了一个对象中,适用于一次性获取集群当前状态,避免处理大批量历史数据的场景 -
第三个参数代表订阅的消息类型,是一个可变参数,传入对应的需要知道的信息类型,就会将这些类型的信息一一发送or封装进行发送
public void subscribe(final ActorRef subscriber, final ClusterEvent.SubscriptionInitialStateMode initialStateMode, final Class<?>... to) {
this.subscribe(subscriber, initialStateMode, (Seq).MODULE$.wrapRefArray((Object[])to));
}
退出集群
退出集群有多种方式
首先是最优雅最推荐的是通过cluster对象的leave方法实现
cluster.leave(address)
通过调用 Cluster.leave(address)
方法,让节点主动触发退出流程。这个方法调用之后,首先会向集群广播发送离开意图,集群中其他的节点接收到意图之后会先将该节点标记为Removed,随后标记为Exiting状态,最终标记为Removed,其他节点接收到MemberRemoved事件消息,最后该节点停止所有ACTOR并且释放资源
利用tell发送Leave消息也是类似的效果
// 触发优雅离开
Cluster.get(system).manager.tell(Leave(cluster.selfMember().address))
搭配上CoordinatedShutdowm确保资源安全释放
CoordinatedShutdown(context.system).run(CoordinatedShutdown.ClusterLeavingReason)
除此之外,系统故障也会出现被动退出,但是自然是没那么好的,属于异常退出,则需要进行宕机处理Downing,一般而言,宕机处理我们使用AKKA提供了Split Brain Resolver处理,这个需要在conf文件中进行配置
# 配置 Split Brain Resolver
akka.cluster {
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver {
active-strategy = keep-majority
}
}
Node Roles 节点角色
并非所有的Node节点都需要执行相同的功能,根据不同的功能分为了不同的节点角色,可以通过节点角色来进行复杂的任务执行
首先是配置当前Actor的角色属性,通过conf配置文件进行配置
akka {
actor {
provider = "cluster"
}
remote.artery {
canonical {
hostname = "127.0.0.1"
port = 2552
}
}
cluster {
roles = ["roleOne"] // 这里进行配置
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:2551"]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
然后在代码中,可以通过cluster对象的hasRole方法判断是否具备某个角色
class SecondActor(): AbstractActor() {
var cluster: Cluster = Cluster.get(context.system)
init {
cluster.join((AddressFromURIString.parse("akka://ClusterSystem@127.0.0.1:2551")))
}
override fun preStart() {
cluster.subscribe(
self, initialStateAsEvents(), MemberEvent::class.java, UnreachableMember::class.java
)
}
// re-subscribe when restart
override fun postStop() {
cluster.unsubscribe(self)
}
override fun createReceive(): Receive {
return receiveBuilder()
.matchEquals("Leave",{
cluster.leave(cluster.selfAddress())
})
.match(String::class.java,{
val actorSelection = context.actorSelection("akka://ClusterSystem@127.0.0.1:2551/user/firstActor")
var selfMember = cluster.selfMember()
// 根据不同的角色发送不同的内容
if(selfMember.hasRole("roleOne")){
actorSelection.tell(it+"##hasRoleOne",self)
}else{
actorSelection.tell(it,self)
}
})
.match(
MemberUp::class.java
) { mUp: MemberUp ->
println("Member is Up: {${mUp.member()}}")
}
.match(
UnreachableMember::class.java
) { mUnreachable: UnreachableMember ->
println("Member detected as unreachable: {${mUnreachable.member()}}")
}
.match(
MemberRemoved::class.java
) { mRemoved: MemberRemoved ->
println("Member is Removed: {${mRemoved.member()}}")
}
.match(
MemberEvent::class.java
) { }
.build()
}
}
查看接收方的输出,效果如下
节点的状态及其流转
当一个Actor要加入到Cluster中,存在Joining ; Up ;Leaving ; Exiting ; Removed ; Down ; WeaklyUp七种状态
-
Joining
初始加入状态,节点已经连接到集群但是尚未被激活,无法正常参加到集群工作中
触发条件:当节点首次加入/连接到到集群的时候
-
Up
节点已经被激活,可以正常参与到集群工作中
触发条件:满足min-nr-of-members条件后可以由领导者激活触发
-
Leaving
节点已开始优雅关闭流程,即调用leave()方法,或者由管理命令触发
-
Exiting
节点已经完成工作交接,即将从集群中移除,由领导者确认leave离开请求后进行自动转换
-
Removed
节点已经完全从集群中移除(墓碑状态)。节点终止或者手动移除后会出现
-
Down
节点被标记为不可用,一般通过API显示标记或者故障检测(即节点确实出现了一些故障被集群检测到)
-
WeaklyUp
节点可达但是未满足最小成员条件数,这个一般是需要额外配置才会出现的状态
启用
allow-weakly-up-members
且节点可达但未满足 min-nr-of-members 条件
一般而言,其状态之间的流转如下
-
加入阶段:
-
节点启动 → 连接种子节点 → 发送
Join
命令 → 状态设为Joining
-
领导者广播新成员信息
-
-
激活阶段:
-
领导者检查配置的
min-nr-of-members
:-
全局条件:
akka.cluster.min-nr-of-members
-
角色条件:
akka.cluster.role.{role}.min-nr-of-members
-
-
同时满足所有条件后,将节点状态改为
Up
-
-
离开阶段:
-
节点调用
Cluster.leave()
→ 状态变为Leaving
-
领导者协调数据迁移 → 状态改为
Exiting
-
完成清理 → 状态变为
Removed
-
节点数量/角色数量控制集群启动
在上面节点状态中,我们其实很需要注意的就是Joining状态和Up状态,这两个状态是开发的时候最常接触的
然后在上面也说到了一个配置属性min-nr-of-members,通过添加这个配置,可以让我们的先加入的节点只是维持Joining状态,只有当Joining状态节点数量达到min-nr-of-members的时候才能激活为Up
案例如下
akka {
actor {
provider = "cluster"
}
remote.artery {
canonical {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
min-nr-of-members = 3 // 在集群主节点下加入配置 需要3个节点才能将joining升级为up
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:2551"]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
fun main(args: Array<String>) {
val system: ActorSystem = ActorSystem.create("ClusterSystem",ConfigFactory.load())
val actorRef = system.actorOf(Props.create(SecondActor::class.java), "secondActor")
actorRef.tell("Hello", ActorRef.noSender())
}
class SecondActor(): AbstractActor() {
var cluster: Cluster = Cluster.get(context.system)
init {
cluster.join((AddressFromURIString.parse("akka://ClusterSystem@127.0.0.1:2551")))
// cluster的registerOnMemberUp()方法 能注册一个回调逻辑 让该Actor从Joining状态变化为Up状态的时候触发
cluster.registerOnMemberUp({ println("Joining to Up")})
}
// subscribe to cluster changes
override fun preStart() {
// #subscribe
cluster.subscribe(
self, initialStateAsEvents(), MemberEvent::class.java, UnreachableMember::class.java
)
// #subscribe
}
// re-subscribe when restart
override fun postStop() {
cluster.unsubscribe(self)
}
override fun createReceive(): Receive {
return receiveBuilder()
.matchEquals("Leave",{
cluster.leave(cluster.selfAddress())
})
.match(String::class.java,{
val actorSelection = context.actorSelection("akka://ClusterSystem@127.0.0.1:2551/user/firstActor")
var selfMember = cluster.selfMember()
if(selfMember.hasRole("roleOne")){
actorSelection.tell(it+"##hasRoleOne",self)
}else{
actorSelection.tell(it,self)
}
})
.match(
MemberUp::class.java
) { mUp: MemberUp ->
println("Member is Up: {${mUp.member()}}")
}
.match(
UnreachableMember::class.java
) { mUnreachable: UnreachableMember ->
println("Member detected as unreachable: {${mUnreachable.member()}}")
}
.match(
MemberRemoved::class.java
) { mRemoved: MemberRemoved ->
println("Member is Removed: {${mRemoved.member()}}")
}
.match(
MemberEvent::class.java
) { }
.build()
}
}
我们首先只开始管理节点和SecondActor查看一下输出,可以看到SecondActor这边只有加入节点没有别的任何信息
然后我们此时在加入一个节点(2551节点开多个实例 改为2553端口就行) 如下可以看到 当2553加入的时候 打印了member信息也同时出发了我们上面的“Joining to Up”的逻辑,说明此时发生了状态的变化
registerOnMemberRemoved
在上面介绍到了registerOnMemberUp,这个就是一个当状态由joining到up的回调钩子,同理还有 registerOnMemberRemoved方法, 一般在该回调中进行一些清理,当当前成员状态更改为 'Removed' 或集群已关闭时,将调用该回调
文章标题:Akka-Actor模型-解决高并发的终极方案-集群篇(一)
文章链接:https://zealsinger.xyz/?post=22
本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明来自ZealSinger !
如果觉得文章对您有用,请随意打赏。
您的支持是我们继续创作的动力!

微信扫一扫

支付宝扫一扫