«

Akka-Actor模型-解决高并发的终极方案-集群篇(一)

ZealSinger 发布于 阅读:71 技术文档


Akka Cluster(集群拓展下的API)⭐️

集群的相关概念,我们有讲到过,现在我们来正式学习AKKA集群拓展下的相关API和使用

启动一个集群

集群肯定首先由一个节点开始跑起来,然后随着更多的节点加入慢慢成为了一个集群。所以集群的开端可以看作是一个“允许别人加入的Actor系统”

首先就得在配置上允许Cluster拓展,需要添加配置文件和依赖

首先是依赖

def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.10.5")

implementation "com.typesafe.akka:akka-cluster_${versions.ScalaBinary}"
}

然后是需要起一个配置文件application.config

akka {
actor {
  provider = "cluster" // 启动模式 cluster集群模式 固定的为cluster即可
}
remote.artery {
  canonical {
    hostname = "127.0.0.1"
    port = 2551   // 启动IP和端口
  }
}

cluster {
  seed-nodes = [
  // 标记种子节点 种子节点是外部节点加入集群的中介节点 没有种子节点是不行的 而且可以有多个种子节点
  // 一般而言 种子节点越多 集群容错越大
  // 需要注意的是这个格式 akka:// 后面的系统名需要和启动节点的ActorSystem.create("....")保持一致才可以
    "akka://ClusterSystem@127.0.0.1:2551"]
   
  downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}

之后 我们在这个项目中启动的服务 就成为了一个可以被别的节点加入的Actor Cluster

// Main.kt
fun main(args: Array<String>) {
   // 这是正常的起一个Actor服务 然后由根Actor调用自定义Actor
   val system = ActorSystem.create("ClusterSystem") // 这里就是配置文件中akka://后面的内容 需要相同
   val firstActor = system.actorOf(
       Props.create(ClusterMasterActor::class.java),
       "firstActor"
  )
}


// ClusterMasterActor.kt
class ClusterMasterActor():AbstractActor() {
   var cluster: Cluster = Cluster.get(context.system) // 获取集群对象

   // subscribe to cluster changes
   override fun preStart() {
       // actor订阅集群
       /*
getSelf():
订阅事件的接收者(即当前 Actor),集群事件将发送到该 Actor 的邮箱。

ClusterEvent.initialStateAsEvents():
将集群的 当前状态(如已有成员)作为初始事件发送。例如,若集群已有节点 A 和 B,订阅后会立即收到 MemberUp 事件。

MemberEvent.class:
订阅所有成员状态变更事件(如节点加入 MemberUp、离开 MemberRemoved 这些消息就会发送给当前actor)。

UnreachableMember.class:
订阅节点不可达事件(如网络故障导致节点无法访问)
       */
       cluster.subscribe(
           self, initialStateAsEvents(), MemberEvent::class.java, UnreachableMember::class.java
      )
  }

   // re-subscribe when restart
   override fun postStop() {
       // 当节点关闭的时候需要取消订阅该集群
       cluster.unsubscribe(self)
  }

   override fun createReceive(): Receive {
       return receiveBuilder()
       // 自定义监听stringe类型的信息 方便后面测试
          .match(String::class.java,{
               println("收到来自:${context().sender()} 的消息:${it}")
          })
          .match(
               MemberUp::class.java  // MemberUp消息 是当新节点加入集群时触发的消息
          ) { mUp: MemberUp ->
               println("Member is Up: {${mUp.member()}}")
          }
          .match(  
               UnreachableMember::class.java  // UnreachableMember 当节点因网络问题不可达时触发
          ) { mUnreachable: UnreachableMember ->
               println("Member detected as unreachable: {${mUnreachable.member()}}")
          }
          .match(
               MemberRemoved::class.java  // MemberRemoved 当节点被移除(主动退出或故障判定后)时触发。
          ) { mRemoved: MemberRemoved ->
               println("Member is Removed: {${mRemoved.member()}}")
          }
          .match(
               MemberEvent::class.java  // MemberEvent 捕获所有未明确处理的成员事件
          ) { }
          .build()
  }

}

启动Main函数之后,如下可以看到集群创建成功且2551即当前actor加入成功

image-20250525190404640

然后此时我们跑另外一个服务 代码如下

// 点击装订区域中的 <icon src="AllIcons.Actions.Execute"/> 图标。
fun main(args: Array<String>) {
   val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2552").withFallback(ConfigFactory.load()) // 因为这里直接还是用的上面的那个配置文件 所以这里需要改一下端口号 可以直接在配置文件中改 我这里没有在文件中改了 在代码中通过ConfigFactory进行的修改 将端口修改为2552 因为2551上面已经再用了会冲突
   val system: ActorSystem = ActorSystem.create("ClusterSystem", config) // 这里的名字必须和上面是一样的 才能保证是一个集群下
   val actorRef = system.actorOf(Props.create(SecondActor::class.java), "secondActor")
   actorRef.tell("Hello", ActorRef.noSender())//发送消息
}

class SecondActor(): AbstractActor() {
   var cluster: Cluster = Cluster.get(context.system)

   // subscribe to cluster changes
   override fun preStart() {
       // #subscribe
       cluster.subscribe(
           self, initialStateAsEvents(), MemberEvent::class.java, UnreachableMember::class.java
      )
       // #subscribe
  }

   // re-subscribe when restart
   override fun postStop() {
       cluster.unsubscribe(self)
  }

   override fun createReceive(): Receive {
       return receiveBuilder()
          .match(String::class.java,{
               // 通过2552发给2551 实现跨端口发送
               val actorSelection = context().actorSelection("akka://ClusterSystem@127.0.0.1:2551/user/firstActor")
               actorSelection.tell(it,self)
          })
          .match(
               MemberUp::class.java
          ) { mUp: MemberUp ->
               println("Member is Up: {${mUp.member()}}")
          }
          .match(
               UnreachableMember::class.java
          ) { mUnreachable: UnreachableMember ->
               println("Member detected as unreachable: {${mUnreachable.member()}}")
          }
          .match(
               MemberRemoved::class.java
          ) { mRemoved: MemberRemoved ->
               println("Member is Removed: {${mRemoved.member()}}")
          }
          .match(
               MemberEvent::class.java
          ) { }
          .build()
  }

}

启动后如下 右边为master,可以看到接收到了消息也接收到了集群中添加节点的消息

image-20250525192949316

通过JoinSeedNodes()/Join()加入集群

在上述案例中,无论是集群初始actor节点还是后来加入集群的节点,都得配置config文件,都得提前准备好配置文件中的种子节点信息,属于静态配置文件的方式加入

在Cluster对象中,提供了一个JoinSeedNodes()方法,允许在代码中动态的加入到集群中,该方法需要传入一个List<Address>类型的集合,这个集合就是种子节点的地址的集合,通过这个集合可以利用其中的种子节点的信息加入到集群中

// 2552Actor的代码这么写也是能成功加入的
fun main(args: Array<String>) {
   val system: ActorSystem = ActorSystem.create("ClusterSystem")
   val actorRef = system.actorOf(Props.create(SecondActor::class.java), "secondActor")
   actorRef.tell("Hello", ActorRef.noSender())
}

class SecondActor(): AbstractActor() {
   var cluster: Cluster = Cluster.get(context.system)
   init {
       cluster.joinSeedNodes(mutableListOf<Address>(AddressFromURIString.parse("akka://ClusterSystem@127.0.0.1:2551"))) // 通过这里进行加入种子节点信息,就不需要在配置文件中静态写死
  }

   // subscribe to cluster changes
   override fun preStart() {
       // #subscribe
       cluster.subscribe(
           self, initialStateAsEvents(), MemberEvent::class.java, UnreachableMember::class.java
      )
       // #subscribe
  }

   // re-subscribe when restart
   override fun postStop() {
       cluster.unsubscribe(self)
  }

   override fun createReceive(): Receive {
       return receiveBuilder()
          .match(String::class.java,{
               val actorSelection = context.actorSelection("akka://ClusterSystem@127.0.0.1:2551/user/firstActor")
               actorSelection.tell(it,self)
          })
          .match(
               MemberUp::class.java
          ) { mUp: MemberUp ->
               println("Member is Up: {${mUp.member()}}")
          }
          .match(
               UnreachableMember::class.java
          ) { mUnreachable: UnreachableMember ->
               println("Member detected as unreachable: {${mUnreachable.member()}}")
          }
          .match(
               MemberRemoved::class.java
          ) { mRemoved: MemberRemoved ->
               println("Member is Removed: {${mRemoved.member()}}")
          }
          .match(
               MemberEvent::class.java
          ) { }
          .build()
  }

}

当然,这个是通过传入种子节点的集合来实现的,如果你只有一个种子节点或者就是想通过某个特定的种子节点加入,就可以使用join方法实现

cluster.join((AddressFromURIString.parse("akka://ClusterSystem@127.0.0.1:2551")))

这样也是可以的

订阅集群事件

在我们加入集群的一开始,就出现了一个cluster.subscribe()方法,该方法的主要作用就是订阅集群事件

可以来看看这个方法的函数原型

public void subscribe(final ActorRef subscriber, final ClusterEvent.SubscriptionInitialStateMode initialStateMode, final Class<?>... to) {
       this.subscribe(subscriber, initialStateMode, (Seq).MODULE$.wrapRefArray((Object[])to));
  }

退出集群

退出集群有多种方式

首先是最优雅最推荐的是通过cluster对象的leave方法实现

cluster.leave(address)

通过调用 Cluster.leave(address) 方法,让节点主动触发退出流程。这个方法调用之后,首先会向集群广播发送离开意图,集群中其他的节点接收到意图之后会先将该节点标记为Removed,随后标记为Exiting状态,最终标记为Removed,其他节点接收到MemberRemoved事件消息,最后该节点停止所有ACTOR并且释放资源

image-20250526105151933

利用tell发送Leave消息也是类似的效果

// 触发优雅离开
Cluster.get(system).manager.tell(Leave(cluster.selfMember().address))

搭配上CoordinatedShutdowm确保资源安全释放

CoordinatedShutdown(context.system).run(CoordinatedShutdown.ClusterLeavingReason)

除此之外,系统故障也会出现被动退出,但是自然是没那么好的,属于异常退出,则需要进行宕机处理Downing,一般而言,宕机处理我们使用AKKA提供了Split Brain Resolver处理,这个需要在conf文件中进行配置

# 配置 Split Brain Resolver
akka.cluster {
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver {
  active-strategy = keep-majority
}
}

Node Roles 节点角色

并非所有的Node节点都需要执行相同的功能,根据不同的功能分为了不同的节点角色,可以通过节点角色来进行复杂的任务执行

首先是配置当前Actor的角色属性,通过conf配置文件进行配置

akka {
actor {
  provider = "cluster"
}
remote.artery {
  canonical {
    hostname = "127.0.0.1"
    port = 2552
  }
}

cluster {
  roles = ["roleOne"] // 这里进行配置
  seed-nodes = [
    "akka://ClusterSystem@127.0.0.1:2551"]

  downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}

然后在代码中,可以通过cluster对象的hasRole方法判断是否具备某个角色

class SecondActor(): AbstractActor() {
   var cluster: Cluster = Cluster.get(context.system)
   init {
       cluster.join((AddressFromURIString.parse("akka://ClusterSystem@127.0.0.1:2551")))
  }

   override fun preStart() {
       cluster.subscribe(
           self, initialStateAsEvents(), MemberEvent::class.java, UnreachableMember::class.java
      )
  }

   // re-subscribe when restart
   override fun postStop() {
       cluster.unsubscribe(self)
  }

   override fun createReceive(): Receive {
       return receiveBuilder()
          .matchEquals("Leave",{
               cluster.leave(cluster.selfAddress())
          })
          .match(String::class.java,{
               val actorSelection = context.actorSelection("akka://ClusterSystem@127.0.0.1:2551/user/firstActor")
               var selfMember = cluster.selfMember()
               // 根据不同的角色发送不同的内容
               if(selfMember.hasRole("roleOne")){
                   actorSelection.tell(it+"##hasRoleOne",self)
              }else{
                   actorSelection.tell(it,self)
              }
          })
          .match(
               MemberUp::class.java
          ) { mUp: MemberUp ->
               println("Member is Up: {${mUp.member()}}")
          }
          .match(
               UnreachableMember::class.java
          ) { mUnreachable: UnreachableMember ->
               println("Member detected as unreachable: {${mUnreachable.member()}}")
          }
          .match(
               MemberRemoved::class.java
          ) { mRemoved: MemberRemoved ->
               println("Member is Removed: {${mRemoved.member()}}")
          }
          .match(
               MemberEvent::class.java
          ) { }
          .build()
  }
}

查看接收方的输出,效果如下

image-20250604142000431

节点的状态及其流转

当一个Actor要加入到Cluster中,存在Joining ; Up ;Leaving ; Exiting ; Removed ; Down ; WeaklyUp七种状态

一般而言,其状态之间的流转如下

 
1. 满足 min-nr-of-members
2. 显式调用 leave
3. 领导者确认
4. 完全退出
故障检测超时
恢复通信
永久故障
启用 weakly-up
满足 min-nr-of-members
Joining
Up
Leaving
Exiting
Removed
Down
WeaklyUp
 
  1. 加入阶段

    • 节点启动 → 连接种子节点 → 发送 Join 命令 → 状态设为 Joining

    • 领导者广播新成员信息

  2. 激活阶段

    • 领导者检查配置的 min-nr-of-members

      • 全局条件:akka.cluster.min-nr-of-members

      • 角色条件:akka.cluster.role.{role}.min-nr-of-members

    • 同时满足所有条件后,将节点状态改为 Up

  3. 离开阶段

    • 节点调用 Cluster.leave() → 状态变为 Leaving

    • 领导者协调数据迁移 → 状态改为 Exiting

    • 完成清理 → 状态变为 Removed

节点数量/角色数量控制集群启动

在上面节点状态中,我们其实很需要注意的就是Joining状态和Up状态,这两个状态是开发的时候最常接触的

然后在上面也说到了一个配置属性min-nr-of-members,通过添加这个配置,可以让我们的先加入的节点只是维持Joining状态,只有当Joining状态节点数量达到min-nr-of-members的时候才能激活为Up

案例如下

akka {
actor {
  provider = "cluster"
}
remote.artery {
  canonical {
    hostname = "127.0.0.1"
    port = 2551
  }
}

cluster {
  min-nr-of-members = 3 // 在集群主节点下加入配置 需要3个节点才能将joining升级为up
  seed-nodes = [
    "akka://ClusterSystem@127.0.0.1:2551"]

  downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
fun main(args: Array<String>) {
   val system: ActorSystem = ActorSystem.create("ClusterSystem",ConfigFactory.load())
   val actorRef = system.actorOf(Props.create(SecondActor::class.java), "secondActor")
   actorRef.tell("Hello", ActorRef.noSender())
}

class SecondActor(): AbstractActor() {
   var cluster: Cluster = Cluster.get(context.system)
   init {
       cluster.join((AddressFromURIString.parse("akka://ClusterSystem@127.0.0.1:2551")))
       // cluster的registerOnMemberUp()方法 能注册一个回调逻辑 让该Actor从Joining状态变化为Up状态的时候触发
       cluster.registerOnMemberUp({ println("Joining to Up")})
  }

   // subscribe to cluster changes
   override fun preStart() {
       // #subscribe
       cluster.subscribe(
           self, initialStateAsEvents(), MemberEvent::class.java, UnreachableMember::class.java
      )
       // #subscribe
  }

   // re-subscribe when restart
   override fun postStop() {
       cluster.unsubscribe(self)
  }

   override fun createReceive(): Receive {
       return receiveBuilder()
          .matchEquals("Leave",{
               cluster.leave(cluster.selfAddress())
          })
          .match(String::class.java,{
               val actorSelection = context.actorSelection("akka://ClusterSystem@127.0.0.1:2551/user/firstActor")
               var selfMember = cluster.selfMember()
               if(selfMember.hasRole("roleOne")){
                   actorSelection.tell(it+"##hasRoleOne",self)
              }else{
                   actorSelection.tell(it,self)
              }
          })
          .match(
               MemberUp::class.java
          ) { mUp: MemberUp ->
               println("Member is Up: {${mUp.member()}}")
          }
          .match(
               UnreachableMember::class.java
          ) { mUnreachable: UnreachableMember ->
               println("Member detected as unreachable: {${mUnreachable.member()}}")
          }
          .match(
               MemberRemoved::class.java
          ) { mRemoved: MemberRemoved ->
               println("Member is Removed: {${mRemoved.member()}}")
          }
          .match(
               MemberEvent::class.java
          ) { }
          .build()
  }

}

我们首先只开始管理节点和SecondActor查看一下输出,可以看到SecondActor这边只有加入节点没有别的任何信息

image-20250604153708515

然后我们此时在加入一个节点(2551节点开多个实例 改为2553端口就行) 如下可以看到 当2553加入的时候 打印了member信息也同时出发了我们上面的“Joining to Up”的逻辑,说明此时发生了状态的变化

image-20250604153838144

registerOnMemberRemoved

在上面介绍到了registerOnMemberUp,这个就是一个当状态由joining到up的回调钩子,同理还有 registerOnMemberRemoved方法, 一般在该回调中进行一些清理,当当前成员状态更改为 'Removed' 或集群已关闭时,将调用该回调

Kotlin 编程 Java