Akka-Actor模型-解决高并发的终极方案-入门篇(三)
ZealSinger 发布于 阅读:111 技术文档
⭐️
这里我们开始来介绍Akka的相关基础使用
依赖导入
maven源的那个仓库不是很清楚为啥我的报错用不了,解决这个问题我们可以直接使用Akka提供的源
// maven方式
repositories {
mavenCentral()
maven {
url "https://repo.akka.io/maven"
}
}
<properties>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-bom_${scala.binary.version}</artifactId>
<version>2.10.5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
// gradle方式 kts的gradle
plugins {
kotlin("jvm") version "1.9.0"
}
val kotlinVersion = "1.9.0"
val versions = mapOf(
"ScalaBinary" to "2.13"
)
group = "com.boss.test"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
maven {
url = uri("https://repo.akka.io/maven")
}
}
dependencies {
testImplementation(kotlin("test"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
implementation(kotlin("reflect", kotlinVersion))
implementation(platform("com.typesafe.akka:akka-bom_${versions["ScalaBinary"]}:2.10.5"))
implementation("com.typesafe.akka:akka-actor_${versions["ScalaBinary"]}")
testImplementation("com.typesafe.akka:akka-testkit_${versions["ScalaBinary"]}")
}
tasks.test {
useJUnitPlatform()
}
kotlin {
jvmToolchain(17)
}
自定义Actor类
所谓创建一个自定义Actor,实际上就是实现抽象类AbstractActor
需要实现其抽象方法createReceive,该方法没有入参回一个 AbstractActor.Receive类型的参数,可以借用receiveBuilder这个工厂方法进行快速的构建
receiveBuilder这个工厂方法利用match方法进行消息类型的判断和处理逻辑定义
// Java
public class MyActor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
public Receive createReceive() {
return receiveBuilder()
.match(
// 指定消息数据类型为String
String.class,
// 处理逻辑为一个lambda表达式,也可以是方法引用
// 处理逻辑为输出一段话
s -> {
log.info("Received String message: {}", s);
})
// matchAny即收到其他类型的即非String类型的消息都是走这个逻辑
.matchAny(o -> log.info("received unknown message"))
.build();
}
}
// kotlin
class MyActor:AbstractActor() {
// 没有入参 返回一个 AbstractActor.Receive类型的参数
// 该方法内定义的内容为 此Actor可以处理哪些消息以及如何处理这些消息的实现
// 至于返回的 AbstractActor.Receive对象的构建,可以通过Akka提供的receiveBuilder这个工厂方法进行快速的构建
override fun createReceive(): Receive {
// 这里用了kotlin的lambda相关的语法糖 看不懂的可以参考Java代码
return receiveBuilder().match(
String::class.java
) { println("receive string message:${it}") }.matchAny{ println("receive message type error:${it}")}.build()
}
}
// 方法引用的方式
class MyActor:AbstractActor() {
override fun createReceive(): Receive {
return receiveBuilder()
.match(String::class.java, this::handleStringMessage) // 使用方法引用
.matchAny(this::handleOtherMessage) // 使用方法引用
.build()
}
// 提取字符串消息处理逻辑
private fun handleStringMessage(message: String) {
println("receive string message: $message")
}
// 提取其他类型消息处理逻辑
private fun handleOtherMessage(message: Any) {
println("receive message type error: $message")
}
}
// 也可以这么写
class MyActor:AbstractActor() {
val dealStringMessage=fun(message : String){
println("receive string message:${message}")
}
val dealErrorTypeMessage=fun(message : Any?){
println("receive message error type :${message}")
}
// 没有入参 返回一个 AbstractActor.Receive类型的参数
// 该方法内定义的内容为 此Actor可以处理哪些消息以及如何处理这些消息的实现
// 至于返回的 AbstractActor.Receive对象的构建,可以通过Akka提供的receiveBuilder这个工厂方法进行快速的构建
override fun createReceive(): Receive {
return receiveBuilder().match(String::class.java, dealStringMessage).matchAny(dealErrorTypeMessage).build()
}
}
创建Actor对象
创建Actor对象不能直接采用new的方式,而是应该借助Props对象,每个Actor在Akka中都是通过Props来定义的,这样可以解耦Actor的配置和实例化。这样设计的好处是可以在不同的地方复用配置,并且保证Actor的创建符合规范,比如避免在构造函数中引入可变状态或副作用。
首先需要创建Props对象,有如下三种方法:第一种直接传入Actor的类对象(例如p1)其本质利用的是反射 ; 第二种传入Actor类对象且传入一定的参数,编译器会找对应的类的对应参数的构造方法(例如p2,p3) ; 第三种传入Actor类对象且写一个lambda表达式自定义构造方法(例如p4)
比较推荐方案二,利用反射+参数检测有参构造的形式,第一是编译时就能判断是否存在对应的参数的有参构造;第二就是不需要和方案三一样导致代码中出现过多的lambda块;第三是能方便进行序列化和逆序列化
fun main(args: Array<String>) {
val p1 = Props.create(MyActor::class.java)
val p2 = Props.create(MyActor::class.java,"")
val p3 = Props.create(MyActor::class.java,"name")
val p4 =
Props.create(MyActor::class.java, {
println("自定义构造逻辑")
MyActor("arg") }
)
}
// 对应的Java方法如下
Props props1 = Props.create(MyActor.class);
Props props2 =
Props.create(ActorWithArgs.class, () -> new ActorWithArgs("arg")); // careful, see below
Props props3 = Props.create(ActorWithArgs.class, "arg");
为每个Actor内部定义一个static静态的方法是一个不错的主意,有助于使得Props的创建更加贴近Actor的定义,也避免了使用Props.create(....)方法相关的陷阱
class MyActor():AbstractActor() {
companion object{
fun props(): Props {
return Props.create(MyActor::class.java)
}
}
override fun createReceive(): Receive {
return receiveBuilder()
.match(String::class.java, this::handleStringMessage) // 使用方法引用
.matchAny(this::handleOtherMessage) // 使用方法引用
.build()
}
// 提取字符串消息处理逻辑
private fun handleStringMessage(message: String) {
println("receive string message: $message")
}
// 提取其他类型消息处理逻辑
private fun handleOtherMessage(message: Any) {
println("receive message type error: $message")
}
}
fun main(args: Array<String>) {
val myActorProps = MyActor.props()
}
知道如何创建Props对象之后,就可以借助Props对象创建Actor对象
class FirstActor:AbstractActor() {
// 在一个actor内部创建另外一个actor,第二个参数为actor命名
//name 参数是可选的,但最好为您的 actors 命名,因为它用于日志消息和标识 actors,名称不得为空或以 $ 开头,但可以包含 URL 编码字符(例如,%20 表示空格)如果给定名称已被同一父级的另一个子级使用,则会引发 InvalidActorNameException
val childActor = context.actorOf(MyActor.props(),"child-MyActor")
override fun createReceive(): Receive {
return receiveBuilder().match(Int::class.java,{ println("Integer message:${it}")}).matchAny({ println("other type message")}).build()
}
}
可以发现childActor的数据类型是ActorRef,这个也和我们上面说到的理论知识对上了,实际上是封装成为了ActorRef对象,并且Actor在创建的时候就会自动被异步启动!
对 `actorOf` 的调用将返回 ActorRef 类型的实例。这是 actor 实例的句柄,也是与之交互的唯一方式。`ActorRef` 是不可变的,并且与它所代表的 Actor 具有一对一的关系。`ActorRef` 也是可序列化的和网络感知的。这意味着您可以序列化它,通过网络发送它,并在远程主机上使用它,并且它仍然会通过网络在原始节点上表示相同的 Actor。
在上述代码中,childActor 即 MyActor对象是FirstActor通过自己的上下文context创建的,所以MyActor是FirstActor的子Actor,两者存在监督关系,FirstActor会监督MyActor对象(没有特别声明,这里监督策略就是默认策略,遇到问题重启子Actor)
ActorSystem (整个系统)
│
└─ FirstActor (父Actor)
│
└─ myChild (子Actor, 类型为 MyActor)
Actor的生命周期
官方标准图如下
转化中文版本差不多如下
┌───────────────────────┐
│ 创建 Actor │
└──────────┬────────────┘
▼
┌───────────────────────┐
│ 分配实体路径 (Entity Path) │
│ - 路径保留 │
│ - 生成随机 UID │
└──────────┬────────────┘
▼
┌───────────────────────┐
│ 实例化 Actor (Actor Incarnation) │
│ - 创建 Mailbox │
│ - 调用 preStart() │
└──────────┬────────────┘
▼
┌───────────────────────┐
│ 运行中 (处理消息) │
│ - 通过 Mailbox 接收消息 │
└──────────┬────────────┘
│
│ 发生故障/需要重启
▼
┌───────────────────────┐
│ 重启 (Restart) │
│ - 旧实例调用 preRestart() │
│ - 销毁旧实例 │
│ - 创建新实例 │
│ - 新实例调用 postRestart()│
└──────────┬────────────┘
▼
┌───────────────────────┐
│ 运行中 (恢复处理) │
└──────────┬────────────┘
│
│ 显式停止 (Stop/PoisonPill)
▼
┌───────────────────────┐
│ 终止 (Stop) │
│ - 调用 postStop() │
│ - 发送 Terminated 消息 │
│ - 释放路径 (可复用) │
└───────────────────────┘
主要有几个重要的需要理解的点
-
Actor通过actorOf进行实例化之后,我们知道返回的是的ActorRef对象,整个系统会为其分配一个唯一路径和唯一UUID,只要这两个没有进行更换或者说被销毁,这个ActoRef就是一直可用的
-
ActorRef我们知道就是内部装了一个actor实例,但是该actor的更换不会导致ActorRef的路径和UUID进行更换or销毁,也就是说内部actor实例的销毁与重建不会影响外部ActorRef的使用,同理重启的过程也不会影响(重启的本质其实也就是对内部的旧actor实例销毁,创建新的actor实例)
-
主动调用context.stop方法或者发送PoisonPill可以终止Actor,这个终止就是会导致ActorRef的UUID和路径销毁更改,导致ActorRef不可用
//可以不需要更换ActorRef的情况 看不懂代码的可以先看下面API部分
class ParentActor : AbstractActor() {
val childRef = context.actorOf(Props.create(ChildActor::class), "child")
override fun createReceive(): Receive {
return receiveBuilder()
.matchEquals("triggerRestart") {
// 发送导致子 Actor 抛出异常的消息 子Actor异常默认策略是重启 所以发送完信息之后子Actor其实就被重启过了
childRef.tell("causeFailure", self)
// 重启后,childRef 仍有效
childRef.tell("Hello", self) // 消息会送达新实例
}
.build()
}
}
class ChildActor : AbstractActor() {
override fun createReceive(): Receive {
return receiveBuilder()
.matchEquals("causeFailure") {
throw RuntimeException("Restart triggered")
}
.matchAny {
println("Received message: $it")
}
.build()
}
}//需要更换ActorRef的情况 看不懂代码的可以先看下面API部分
class ParentActor : AbstractActor() {
var childRef = context.actorOf(Props.create(ChildActor::class), "child")
override fun createReceive(): Receive {
return receiveBuilder()
.matchEquals("killAndRecreate") {
// 终止旧子 Actor
context.stop(childRef)
// 重新创建同名子 Actor
childRef = context.actorOf(Props.create(ChildActor::class), "child")
// 必须使用新的 childRef
childRef.tell("New message", self)
}
.build()
}
}
class ChildActor : AbstractActor() {
override fun postStop() {
println("Child stopped")
}
override fun createReceive(): Receive {
return receiveBuilder()
.matchAny {
println("Received: $it")
}
.build()
}
}stop之后强行使用就会报日志was not delivered
生命周期相关方法
Actor的生命周期是通过Hooks体现和控制的,我们可以重写Hooks,从而实现对Actor生命周期的各环节的粒度控制
-
preStart():在构造函数之后调用
-
postStop():在重启和停止之前调用(我们已停止actor的消息就是通过这个方法重定向到ActorSystem的deadLetters的)
-
preRestart(reason,message):用于通知旧actor,包含重启的原因和异常消息,默认情况下停止所有子项且会调用postStop()
-
postRestart():默认情况下会调用preStart()
preRestart 和 postRestart 只在重启的时候才会被调用。它们默认调用了 preStart 和 postStop,但是调用它们的时候就不再直接调用 preStart 和 postStop 了。 这样我们就能够决定, 到底是只在 Actor 启动或停止的时候调用一次 preStart 和postStop,还是每次重启一个 Actor 的时候就调用 preStart 和postStop。
执行组件相关API
ActorSystem.create(String name)
在非Actor中使用和管理Actor,创建一个该类范围内的顶级Actor(/user)
因为所有的Actor都只能通过特定的方法进行创建和使用而不能直接new对象,如果直接new将会丧失Actor的功能从而变成一个简单的类对象
但是在非Actor不存在Actor上下文环境context,所以无法通过context调用actorOf()等相关方法创建Actor,所以必须首先创建一个顶级Actor然后再利用这个顶级Actor创建和管理别的Actor
fun main() {
// 1. 创建 ActorSystem
val system = ActorSystem.create("MySystem")
// 2. 创建顶级 Actor(属于 /user/firstActor)
val firstActor = system.actorOf(
Props.create(FirstActor::class.java),
"firstActor"
)
// 3. 向 Actor 发送消息
firstActor.tell("Hello", ActorRef.noSender())
}
createReceive()
一个Actor必带的一个方法,用于设置actor的初始行为,接收什么类型的消息以及如何进行处理。
如果当前actor行为和收到的消息部匹配,则会调用unhandled,默认情况下会调用UnhandlerMessage(message,sender,recipient)方法在actor系统的事件流上发布一个debug消息
除了上面“自定义Actor类”内容中说到的链式匹配规则,也可以拆分为小匹配规则也是可以的,可以避免长而臃肿的代码
override fun createReceive(): Receive {
val receiveBuilder = receiveBuilder()
receiveBuilder.matchEquals("a",{})
receiveBuilder.matchEquals("b",{})
receiveBuilder.match(Integer::class.java,{})
receiveBuilder.matchAny { }
return receiveBuilder.build()
}
一般更加推荐链式+方法应用而不是链式+lambda表达式,如下
static class WellStructuredActor extends AbstractActor {
public static class Msg1 {}
public static class Msg2 {}
public static class Msg3 {}
public Receive createReceive() {
return receiveBuilder()
.match(Msg1.class, this::receiveMsg1)
.match(Msg2.class, this::receiveMsg2)
.match(Msg3.class, this::receiveMsg3)
.build();
}
private void receiveMsg1(Msg1 msg) {
// actual work
}
private void receiveMsg2(Msg2 msg) {
// actual work
}
private void receiveMsg3(Msg3 msg) {
// actual work
}
}
在某些时候,ReceiveBuilder的逻辑匹配的验证会成为某些actor的瓶颈,则可以可以考虑通过UntypeAbstractActor而不是AbstractActor,UntypeAbstractActor需要实现onReceive方法,该方法接收参数为接收到的message信息,但是因为是无类型的,所以都是以Object/Any类型进行的接收,这也就比AbstractActor少了类型匹配的阶段,能更加高效
class MyUntypedActor:UntypedAbstractActor() {
override fun onReceive(msg: Any?) {
if(msg is String){
}else if(msg is Int){
}
}
}
tell()
最常用的发送消息的API,异步发送消息并且返回,高效且不一定需要目标actor回复
ask()
ask是异步但是返回的 CompletionStage对象,可以通过toCompletableFutrue方法转化为我们熟知的CompletableFuture对象,可能会需要通过get和join等阻塞式的获取结果,可能会导致出现同步代码,可以使用Futrue组合等方式实现完全的非阻塞和异步
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
import java.util.concurrent.CompletableFuture;
final Duration t = Duration.ofSeconds(5);
// using 1000ms timeout
CompletableFuture<Object> future1 =
ask(actorA, "request", Duration.ofMillis(1000)).toCompletableFuture();
// using timeout from above
CompletableFuture<Object> future2 = ask(actorB, "another request", t).toCompletableFuture();
CompletableFuture<Result> transformed =
future1.thenCombine(future2, (x, s) -> new Result((String) x, (String) s));
pipe(transformed, system.dispatcher()).to(actorC);
因为设置了超时事件,也有Futrue的返回类型,就需要目标actor必须进行回复,如果不回复则会超时认定为失败 ; 除此之外,因为ask的函数原型是
ask(actor,sender,timeout)
内部的这个actor其实是个临时actor,Akka 会隐式创建一个 临时 Actor 来接收回复,临时 Actor 的生命周期由超时时间控制,超时后自动销毁,而tell是actor.tell注定了这个tell不会是临时的,所以ask还会存在创建临时actor的开销
Akka为completionStage提供了一个thenRun的方法,当completionStage成功被响应的时候就会触发并执行thenRun中的逻辑,可以认为这是一个防止ask阻塞的方式
forward()
转发,目标actor . forward(messsage,getContext())
转发相当于当前actor是一个中介,会保留消息中的原始发送者的信息,如下
// Main
fun main(args: Array<String>) {
val system = ActorSystem.create("Main")
val firstActor = system.actorOf(Props.create(FirstActor::class.java),"first")
firstActor.tell(1, firstActor)
}
// MyActor
class MyActor():AbstractActor() {
companion object{
fun props(): Props {
return Props.create(MyActor::class.java)
}
}
override fun createReceive(): Receive {
return receiveBuilder()
.matchEquals("exception",{
throw RuntimeException("收到Exception Message")
})
.matchAny(this::handleOtherMessage) // 使用方法引用
.build()
}
// 提取其他类型消息处理逻辑
private fun handleOtherMessage(message: Any) {
println("receive message type: $message")
println(sender)
println("--------------------")
}
}
// FirstActor和SecondActor
class FirstActor:AbstractActor() {
val childActor = context.actorOf(Props.create(SecondActor::class.java),"child-Second")
override fun createReceive(): Receive {
return receiveBuilder().match(Integer::class.java,{
childActor.tell(it,self)
}).match(ActorIdentity::class.java,{
println(it.actorRef.get())
})
.matchAny({
println("other type message:${it}")
}).build()
}
}
class SecondActor:AbstractActor() {
val childActor = context.actorOf(MyActor.props(),"child-MyActor")
override fun createReceive(): Receive {
return receiveBuilder().match(Integer::class.java,{
childActor.forward(it,context)
}).match(ActorIdentity::class.java,{
println(it.actorRef.get())
})
.matchAny({
println("other type message:${it}")
}).build()
}
}
整个消息的流向如下,可以看到经过seconde之后,在MyActor中接收到的sender依旧是firstActor,说明保留了原始的发送者信息而没有覆盖(从froworad的第二个参数也可以猜测,getContext获取当前上下文环境,而不是直接传入的self,就是为了从context中获取原始的发送者信息)
getContext()
获取当前Actor的上下文信息,里面主要包含了
-
actorOf()方法:创建子Actor的工厂方法
-
当前actor所属的系统
-
children:当前actor的所有子Actor
-
lifecycle生命周期监控
-
热插拔堆栈
结合上述几个来一个小小的案例代码
// Main.kt 启动类
fun main(args: Array<String>) {
val system = ActorSystem.create("Main")
val firstActor = system.actorOf(Props.create(FirstActor::class.java))
firstActor.tell(1, ActorRef.noSender())
}
// MyActor.kt 作为子Actor 无实际用处
class MyActor():AbstractActor() {
companion object{
fun props(): Props {
return Props.create(MyActor::class.java)
}
}
override fun createReceive(): Receive {
return receiveBuilder()
.match(String::class.java, this::handleStringMessage) // 使用方法引用
.matchAny(this::handleOtherMessage) // 使用方法引用
.build()
}
// 提取字符串消息处理逻辑
private fun handleStringMessage(message: String) {
println("receive string message: $message")
}
// 提取其他类型消息处理逻辑
private fun handleOtherMessage(message: Any) {
println("receive message type error: $message")
}
}
// FiirstActor.kt
class FirstActor:AbstractActor() {
// 创建三个子Actor
val childActor = context.actorOf(MyActor.props(),"child-MyActor")
val childActor1 = context.actorOf(MyActor.props(),"child-MyActor1")
val childActor2 = context.actorOf(MyActor.props(),"child-MyActor2")
val childActor3 = context.actorOf(MyActor.props(),"child-MyActor3")
// 匹配消息以及处理策略
override fun createReceive(): Receive {
return receiveBuilder().match(Integer::class.java,{
println("Integer message:${it}")
printList(it)
}).matchAny({
println("other type message")
}).build()
}
// 利用context获取所有的子Actor
fun printList(message:Integer){
context.children.forEach {
println(it)
}
}
}
运行结果 同时也可以看到,即使Main逻辑全部结束了,整个系统还是在运行中的
getSelf()
获取当前actor对应的ActorRef对象,即获取自身的ActorRef对象
getSender()
获取最后收到的一个消息的sender对象即收到的最后一条消息的发送者
supervisorStrategy()
监督子Actor的策略,开发者可以自定义从而进行覆盖
watch()和unwatch()
生命周期监视器,能让一个Actor在另外一个Actor终止的时候收到通知(注意这里是终止,即stop,而不是临时故障重启),相当于一个stop监听器了可以简单理解为,收到一个Terminated消息,发送者固定为system的deadLetters
class FirstActor:AbstractActor() {
val childActor = context.actorOf(MyActor.props(),"child-MyActor")
val childActor2 = context.actorOf(MyActor.props(),"child-MyActor2")
override fun createReceive(): Receive {
return receiveBuilder().match(Integer::class.java,{
println("Integer message:${it}")
// context.watch()对两个子Actor的生命周期进行监听
context.watch(childActor)
context.watch(childActor2)
// exception消息导致childActor异常重启 但是重启不会触发Terminated消息
childActor.tell("exception", ActorRef.noSender())
context.stop(childActor2) // stop会触发Terminated消息
lastMessage()
}).matchAny({
println("other type message:${it}")
}).build()
}
// 打印最后一条收到的消息的发送者 可以看到是deadLetters
fun lastMessage(){
println("最后收到的一条消息发送者为:${context.sender}")
}
}
展示效果如下
需要注意的是,watch()机制即发送Terminated消息和注册终止的顺序无关,其底层其实就是分为了一个判断:如果当前时刻被监听的actor已经stop了那么立马发送一个Terminated消息,反之如果此时还没有stop则后续再监听触发
class FirstActor:AbstractActor() {
val childActor = context.actorOf(MyActor.props(),"child-MyActor")
val childActor2 = context.actorOf(MyActor.props(),"child-MyActor2")
override fun createReceive(): Receive {
return receiveBuilder().match(Integer::class.java,{
println("Integer message:${it}")
// 先将actor2进行stop 然后再注册监听 也会直接返回Terminated消息
context.stop(childActor2)
context.watch(childActor2)
lastMessage()
}).matchAny({
println("other type message:${it}")
}).build()
}
fun lastMessage(){
println("最后收到的一条消息发送者为:${context.sender}")
}
}
同时,因为注册时机和被监听actor的状态,注册多个watch的时候,会出现不同的情况,多次注册不能保证有多条消息,也不能保证只有一条(可能多条,也可以进行一定的重复消息整合机制从而减少重复消息)
利用context().unwatch()方法可以实现对actor监听的取消监听,哪怕已经生成了Terminated消息但是还未被消息的时候也是生效的(当前actor消息比较多,Terminated消息在邮箱中了但是还没有被消费,此时unwatch取消监听,就能让邮箱中这条已经存在的Terminated消息不被处理)
actorSelection()
通过Actor选择识别Actor
我们知道,每个Actor其实都有一个唯一的路径,该路径遵循了Actor链从子级到父级直接Actor系统的根来获得,并且他只有一个物理路径,如果监督链包含任何远程Supervisors,该路径可能会有所不同,系统使用这些路径来查找actors,例如当收到远程消息并且搜索recipient时,可以通过指定的绝对或者相对路径(逻辑或者物理)来查找其他的actor并且收到一个ActorSelection结果
因为是直接返回的ActorSelection对象,不是ActorRef对象,所以我们不能直接使用,而是利用其发送内置类型Identify消息然后通过getActorRef方法才能获取到其ActorRef对象
class FirstActor:AbstractActor() {
val childActor = context.actorOf(MyActor.props(),"child-MyActor")
val childActor2 = context.actorOf(MyActor.props(),"child-MyActor2")
override fun createReceive(): Receive {
return receiveBuilder().match(Integer::class.java,{
val selection: ActorSelection = context.actorSelection("akka://Main/user/first/child-MyActor")
selection.tell(Identify(1), self)
}).match(ActorIdentity::class.java,{
println(it.actorRef.get()) // it.actorRef底层自动解析上面的Identify类型的数据,返回actorSelection查询到的对应路径的actor的ActorRef,这里我们返回的是akka://Main/user/first/child-MyActor
})
.matchAny({
println("other type message:${it}")
}).build()
}
}
除了上述发Identify消息的方式,还可以通过resolveOne()这个方法,他会需要传一个超时时间timeOut,resolveOne内部会给目标路径发送一个Identify的消息,如果actor存在会回复消息ActorIdentify消息,其中就会包含ActorRef ; 如果不存在或者标识未在超时时间内完成或者响应失败,则会ActorNotFound异常
class FirstActor:AbstractActor() {
val childActor = context.actorOf(MyActor.props(),"child-MyActor")
val childActor2 = context.actorOf(MyActor.props(),"child-MyActor2")
override fun createReceive(): Receive {
return receiveBuilder().match(Integer::class.java,{
val selection: ActorSelection = context.actorSelection("akka://Main/user/first/child-MyActor")
val actorRefStage = selection.resolveOne(Timeout(3, TimeUnit.SECONDS))
actorRefStage.onComplete(
{
val get = it.get()
if(get!=null){
println(get)
}
}, ExecutionContext.global()
)
}).match(ActorIdentity::class.java,{
println(it.actorRef.get())
})
.matchAny({
println("other type message:${it}")
}).build()
}
}
setReceiveTimeout()
设置超时事件,传入一个Duration类型(这个Duration是scala包下的)的参数,表示时间间隔
设置之后,如果当前actor在设定时间内没有任何回复,系统会自动发送一个ReceiveTimeout类型的消息给当前Actor
可以用来做会话超时 ; 资源清理 ; 周期性任务等场景
package ActorBase
import akka.actor.*
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit
class FirstActor:AbstractActor() {
private val childActor: ActorRef = context.actorOf(Props.create(MyActor::class.java),"child-Second")
init {
context.setReceiveTimeout(Duration.create(3, TimeUnit.SECONDS))
}
override fun createReceive(): Receive {
val receiveBuilder = receiveBuilder()
receiveBuilder.match(Integer::class.java,{
childActor.tell(it,self)
})
receiveBuilder.match(ReceiveTimeout::class.java){
println("超时消息~自动消息 ## ${it}")
}
receiveBuilder.matchAny { println("other type message ${it}") }
return receiveBuilder.build()
}
}
后续可以通过cancelReceiveTimeout取消超时消息通知,直接context.cancelReceiveTimeout()即可
计划消息startSinglerTimer()
如果你需要发送延迟消息,可以通过是继承AbstractActorWithTimers可以实现
其底层封装了Timer,利用Timer可以调用startSingleTimer发送延迟消息
startSingleTimer(timer的唯一标识key,发送的内容,延迟的时间)
案例代码如下
需要注意的是,可以看到我们的案例也是自己给自己发的延迟消息,因为其底层是计时器,计时器与拥有它的 actor 的生命周期绑定,因此在重新启动或停止时会自动取消。且 不是线程安全的,即它只能在拥有它的 actor 中使用
class FirstActor:AbstractActorWithTimers() {
private val childActor: ActorRef = context.actorOf(Props.create(MyActor::class.java),"child-Second")
override fun createReceive(): Receive {
val receiveBuilder = receiveBuilder()
receiveBuilder.match(Integer::class.java,{
println("准备发送延迟消息${System.currentTimeMillis()}")
timers.startSingleTimer("only-one-key","Delay Message",Duration.create(3, TimeUnit.SECONDS))
})
receiveBuilder.match(ReceiveTimeout::class.java){
println("超时消息~自动消息 ## ${it}")
}
// 延迟消息匹配到这里
receiveBuilder.matchEquals("Delay Message",{
println("收到了延迟消息:${System.currentTimeMillis()}")
})
receiveBuilder.matchAny { println("other type message ${it}") }
return receiveBuilder.build()
}
}
stop()
stop()方法在生命周期的内容提到过了,就是终止该actor会导致ActorRef的不可用,这里再说一点,当该Actor中stop之后,正在处理的消息会继续处理完毕之后再stop,但是邮箱中的消息不会进行处理了,默认情况下这些消息都会被发送给ActorSystem的daLetters中,当然,这个取决于邮箱的实现,也可以有自定义的处理方式
其底层的执行顺序是:暂停邮箱处理并向气压所有子项发送停止命令;处理来自子项的内部终止通知,当最后一个子项消失才会终止自身(终止自身的逻辑是:调用postStop;转存邮箱;在DeathWatch上发布Terminated消息告诉其主管)
上述执行顺序可以确保Acotor系统子树有序的将stop指令传播给到叶子从而终止,但是如果其中有一个actor没有响应或着卡主等情况,就会导致整个Stop过程被卡住
整个Actor完全停止之后就会执行钩子方法postStop()
PoisonPill 和 kill
PoisonPill称之为毒丸,他也具备杀死Actor的能力,但是不会立马杀死,而是作为一条消息发送到目标Actor的邮箱中,当Actor处理到这个消息的时候就会开始处理
victim.tell(akka.actor.PoisonPill.getInstance(), ActorRef.noSender());
Kill也可以作为一个消息来杀死Actor,和PoisonPill有点类似,区别在于Kill是会抛出ActorKieedException从而触发失败,接收消息的actor就会因为异常从而暂停进而询问监管者actor如何处理,这个就取决于配置了。也就是说,kill消息实质上是让目标抛出一个异常,从而主动执行我们的异常策略,默认情况下就是重启
victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender());
// expecting the actor to indeed terminate:
expectTerminated(Duration.ofSeconds(3), victim);
graceful stop() 优雅终止
见名知义,是一种优雅的stop actor的方式,其主要体现在:一开始只是给Acotr发送一个关闭的消息,等待目标Actor完全停止之后才会进行关闭,避免强行关闭 ;协调多个Actor的终止顺序防止资源竞争 ;执行清理操作,允许Actor停止前通过postStop完成资源释放,日志记录等任务
try {
// 发送自定义的 SHUTDOWN 消息,等待最多5秒
// 第一个参数为目标actor 第二个参数为等待时间 第三个为关闭消息 发送给目标actor代表即将关闭了赶进完成你的事情
//该方法返回的时候 postStop方法就被执行了
CompletionStage<Boolean> stopped = gracefulStop(actorRef, Duration.ofSeconds(5), Manager.SHUTDOWN);
// 阻塞等待结果,超时设为6秒(需大于gracefulStop的超时)
boolean isStopped = stopped.toCompletableFuture().get(6, TimeUnit.SECONDS);
if (isStopped) {
System.out.println("Actor已停止");
}
} catch (AskTimeoutException e) {
System.out.println("Actor未在5秒内停止");
}
但是需要注意的是,Actor的停止和Actor的名称注销是两个异步的过程,也就是说gracefulStop返回的时候但是Actor的名称还没有注销,还在被占用,此时就可能出问题(例如你在某处用这个名字命名一个新的actor),解决方案是尽量仅在监督者中重用名字,并且确保在Terminated消息之后才命名给新的Actor
become()和unbecome()
Akka通过become和unbecome支持Actor热插拔行为
什么叫热插拔呢,就是我们在自定义Actor的时候,createReceive()中定义了一套消息匹配规则,实际上这套匹配规则可以通过become方法和unbecome方法进行切换,从而灵活的搭配多种匹配规则和处理规则
// Main.kt
fun main(args: Array<String>) {
val system = ActorSystem.create("Main")
val firstActor = system.actorOf(Props.create(FirstActor::class.java),"first")
val firstActor2 = system.actorOf(Props.create(FirstActor::class.java),"first2")
firstActor.tell("1",firstActor)
Thread.sleep(3000)
firstActor.tell("A", firstActor)
firstActor.tell("1",firstActor)
}
//FirstActor.kt
class FirstActor:AbstractActorWithTimers() {
private val childActor: ActorRef = context.actorOf(Props.create(MyActor::class.java),"child-Second")
val fun1: Receive = receiveBuilder()
.match(String::class.java,{ println("this is fun1 message = ${it}")})
.build()
val fun2: Receive = receiveBuilder()
.match(String::class.java,{ println("this is fun1 message = ${it}")})
.build()
override fun createReceive(): Receive {
return receiveBuilder()
.matchEquals("A", {context.become(fun1)})
.matchEquals("B",{context.become(fun2)})
.matchAny({ println("other message=${it}")})
.build()
}
}
运行效果如下,第一次消息“1”是匹配到了默认的matchAny上,后来发送“A”消息之后,匹配规则切换到fun1,然后再次发送消息“1”就会匹配到fun1的match中,所以输出发送了变化
同理,如果换成了B也一样的道理
unbecome()则是取消成为,具体作用这个其实取决于become的模式,因为become其实还有第二个参数discardOld,一个布尔类型的参数,代表是否启动堆栈模式,默认情况即不设置的情况下为true,即不开启堆栈模式
// become的完整函数原型
// 当我们只传入一个参数 即如上述案例代码一样 第二个参数discardOld默认为true
default void become(final Receive behavior, final boolean discardOld) {
this.become(behavior.onMessage(), discardOld);
}
如果不开启堆栈模式,系统不会记录上一次的匹配规则,每次become都是替换当前的匹配规则,就会导致当前的匹配规则如果没有进行保存的话就会被完全覆盖丢失,unbecome的作用就是直接退回到默认的匹配规则 ; 反之如果设置为false,即开启堆栈模式,所有的become都会按照先进后出的栈结构依次放入,unbecome的作用也就变成了弹出当前栈顶匹配规则,下次就会按照当前栈顶下面那个的规则进行皮匹配
// Main.kt
fun main(args: Array<String>) {
val system = ActorSystem.create("Main")
val firstActor = system.actorOf(Props.create(FirstActor::class.java),"first")
val firstActor2 = system.actorOf(Props.create(FirstActor::class.java),"first2")
firstActor.tell("1",firstActor) // 原本的receive逻辑
firstActor.tell("A", firstActor) // 切换到fun1
firstActor.tell("M",firstActor) // 证明切换到了fun1 然后切换到了fun2
firstActor.tell("E",firstActor) // 证明切换到了fun2 如果是堆栈模式 则退回到fun1 ; 如果不是则回退到原本的receive逻辑
firstActor.tell("N",firstActor) // 查看退回到了fun1还是原始receive逻辑
}
// FirstActor.kt 不使用堆栈模式
class FirstActor:AbstractActorWithTimers() {
private val childActor: ActorRef = context.actorOf(Props.create(MyActor::class.java),"child-Second")
val fun1: Receive = receiveBuilder()
.matchEquals("M",{
println("this is M message = ${it}")
context.become(fun2)
})
.matchEquals("N",{ println("this is N message = ${it}")})
.build()
val fun2: Receive = receiveBuilder()
.matchEquals("E",{
println("this is E message = ${it}")
context.unbecome()
})
.matchEquals("F",{ println("this is F message = ${it}")})
.build()
override fun createReceive(): Receive {
return receiveBuilder()
.matchEquals("A", {context.become(fun1)})
.matchEquals("B",{ println("this is B message = ${it}")})
.matchAny({ println("other message=${it}")})
.build()
}
}
如上,所有的become都是默认的,即不开启堆栈,那么运行效果如下
如果将所有的become都显示设计为discareOld=false即开启堆栈模式,则输出如下
堆栈模式下的 unbecome 行为
假设初始状态为 X,并按以下顺序调用:
第一次调用:context.become(A, discardOld = false)
当前行为堆栈:[X, A]
第二次调用:context.become(B, discardOld = false)
当前行为堆栈:[X, A, B]
调用 unbecome 时:
若在 B 中调用 unbecome(),堆栈弹出 B,回退到前一个状态 A。
再次调用 unbecome(),堆栈弹出 A,回退到初始状态 X。
总结:unbecome 按后进先出(LIFO)的顺序回退到堆栈中的前一个状态,而不是直接回到初始状态
堆栈模式的开启与否都有一定的作用场景,需要进行一定的分析和选择,但是需要注意的是,如果开启堆栈模式,那么底层就会维护一个匹配规则栈,这个需要注意维护,否则会有内存泄漏的隐患
stash()的作用就是可以将消息暂时存储隐藏起来,然后当actor重启的时候(实际上是因为preRestart逻辑中会调用依次unstashAll()方法)或者显示调用unstashAll()方法,其实所谓的藏起来,可以理解为还是通过匹配规则匹配到了某个处理方式,只不过是这个处理规则中就是将其用一个集合存储了
案例代码如下
发送open之后主动调用unstashAll()方法,将前面的stash存储起来的数据全部释放放回邮箱中,因为我们的匹配规则换为了fun1,所以最后按照fun1中的逻辑处理输出
// Main.kt
fun main(args: Array<String>) {
val system = ActorSystem.create("Main")
val firstActor = system.actorOf(Props.create(FirstActor::class.java),"first")
firstActor.tell(1, ActorRef.noSender())
firstActor.tell(2, ActorRef.noSender())
firstActor.tell("open", ActorRef.noSender())
}
// First.kt
class FirstActor : AbstractActorWithStash() {
private val childActor: ActorRef = context.actorOf(Props.create(MyActor::class.java),"child-Second")
private val stashingBehavior: Receive = receiveBuilder()
.match(Integer::class.java) {
println("暂存 Integer 消息: $it")
stash()
}
.matchEquals("open") {
println("切换到 fun1 并处理暂存消息")
context.become(fun1)
unstashAll()
}
.matchEquals("exception",{
throw Exception("exception")
})
.matchAny {
println("其他消息: $it")
}
.build()
private val fun1: Receive = receiveBuilder()
.match(Integer::class.java) {
println("fun1 处理 Integer 消息: $it")
}
.build()
override fun createReceive(): Receive = stashingBehavior
}
// 输出
暂存 Integer 消息: 1
暂存 Integer 消息: 2
切换到 fun1 并处理暂存消息
fun1 处理 Integer 消息: 1
fun1 处理 Integer 消息: 2
除此之外,我们说到preStart方法也会调用unstashAll(),将Main.kt进行一定的修改如下
发送“exception”消息让子Actor发生异常从而触发重启,可以看到输出
首先是正常的1,2放入stash,然后触发异常后释放,将消息归还到邮箱中,因为处理规则没有变化,所以还是落到了stash中
fun main(args: Array<String>) {
val system = ActorSystem.create("Main")
val firstActor = system.actorOf(Props.create(FirstActor::class.java),"first")
firstActor.tell(1, ActorRef.noSender())
firstActor.tell(2, ActorRef.noSender())
firstActor.tell("exception", ActorRef.noSender())
}
文章标题:Akka-Actor模型-解决高并发的终极方案-入门篇(三)
文章链接:https://zealsinger.xyz/?post=17
本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明来自ZealSinger !
如果觉得文章对您有用,请随意打赏。
您的支持是我们继续创作的动力!

微信扫一扫

支付宝扫一扫
