Akka-Actor模型-解决高并发的终极方案-入门篇(五)
ZealSinger 发布于 阅读:70 技术文档
消息可以通过路由器发送,以有效地将它们路由到目标参与者,称为其路由 ,其主要作用是:负载均衡+并行处理+动态调整+策略抽象
tell 和 routing的区别
直接使用actor.tell(msg, sender)
的问题:
-
单点瓶颈:所有消息集中到一个Actor,无法利用多核/多节点资源。
-
手动管理复杂:若需动态增减Actor,需自行维护列表并实现分发逻辑。
-
缺乏策略支持:如广播、分片等高级功能需要重复造轮子。
Router的优势:
-
抽象分发逻辑:内置策略(如
RoundRobinPool
)开箱即用。 -
透明扩展:通过配置即可增减Routee,无需修改业务代码。
-
与监管策略集成:Routee故障后,Router可自动重启或替换
入门案例
自己创建actor
// Main.kt
fun main() {
val system = ActorSystem.create("RouterSystem")
val masterActor = system.actorOf(Props.create(ClusterMasterActor::class.java))
repeat(100){masterActor.tell("Hello World${it}",ActorRef.noSender())} // 发送一百条消息
}
// masterActor
class ClusterMasterActor :AbstractActor(), RequiresMessageQueue<BoundedMessageQueueSemantics> {
// 用于路由的router对象
private var router:Router? = null
init {
val routees: MutableList<Routee> = mutableListOf() // 将要路由管理的子Actor放在一个集合中
for (i in 1..5) {
val r = context.actorOf(Props.create(ChildActor::class.java))
context.watch(r)
routees.add(ActorRefRoutee(r)) // 利用ActorRefRoutee将普通的ActorRef封装
}
router = Router(RoundRobinRoutingLogic(),routees) // 将所有的子Actor封装给route对象
}
override fun createReceive(): Receive {
return receiveBuilder()
.match(String::class.java,{
router?.route(it, sender);
})
.match(Terminated::class.java){
router = router?.removeRoutee(it.actor());
val r = context.actorOf(Props.create(ChildActor::class.java))
context.watch(r)
router = router?.addRoutee(ActorRefRoutee(r))
}
.build()
}
}
// child
class ChildActor: AbstractActor() {
override fun createReceive(): Receive {
return receiveBuilder()
.match(String::class.java){
println("${self.path().name()} 处理消息: $it") // 输出自己的路径 方便查看效果
}
.matchEquals("Arithmetic",{throw ArithmeticException("exception")})
.matchEquals("Null",{
println("抛出异常时间${System.currentTimeMillis()}")
throw NullPointerException("exception")
})
.matchEquals("Ill",{throw IllegalArgumentException("exception")})
.build()
}
}
可以看到 这里是使用的rout(message,sender)方法而不是tell方法
可以看到实际上是不同的Actor处理的每条消息,这个几个处理是负载均衡+并行处理的
需要注意,Route对象是final类型即不可变,所以其可以在Actor之外进行调用 ; RoutingLogic也是线程安全的
Route创建Actor
在上述案例中,我们是自己创建了childActor对象,然后将其封装成ctorRefRoutee对象放到集合中,最后一起给Router对象
但是事实上,Route对象本身具备创建Actor的能力,主要有Pool模式(创建为子角色,并在路由终止时将其从路由器中删除)和Group模式(一般针对远端的外部已经创建的Actor,且不会监视终止)
// Pool模式
// 定义 Worker Actor
class Worker : AbstractActor() {
override fun createReceive() = receiveBuilder()
.matchAny { msg ->
println("${self.path().name} 处理消息: $msg")
}
.build()
}
fun main() {
val system = ActorSystem.create("RouterSystem")
// 创建 Router,管理 3 个 Worker 子 Actor,使用轮询策略
val router: ActorRef = system.actorOf(
RoundRobinPool(3) // 3 个子 Actor
.props(Props.create(Worker::class.java)) // 绑定到 Worker
.withDispatcher("akka.actor.default-dispatcher"),
"poolRouter"
)
// 发送消息到 Router,自动分发
router.tell("任务1", ActorRef.noSender())
router.tell("任务2", ActorRef.noSender())
router.tell("任务3", ActorRef.noSender())
// 关闭系统(实际应用中不需要立即关闭)
Thread.sleep(1000)
system.terminate()
}
// 输出
$c 处理消息: 任务3
$b 处理消息: 任务
$a 处理消息: 任务1
// Pool创建rout的方式 我们这里展示的是代码的方式 其实哈可以通过conf配置文件的方式 如下
RoundRobinPool(3) .props(Props.create(Worker::class.java)) 可以转化为
akka.actor.deployment {
/parent/router1 {
router = round-robin-pool
nr-of-instances = 3
}
}
ActorRef router1 = getContext().actorOf(FromConfig.getInstance().props(Props.create(Worker.class)), "router1");
// Group模式 主要是跨节点联系已存在的Actor节点 不具备创建的能力
// Worker 定义同上
fun main() {
val system = ActorSystem.create("RouterSystem")
// 先手动创建 3 个 Worker 这里主要是模拟远端的已经存在三个Actor 用于对应下面的三个路径
val worker1 = system.actorOf(Props.create(Worker::class.java), "worker1")
val worker2 = system.actorOf(Props.create(Worker::class.java), "worker2")
val worker3 = system.actorOf(Props.create(Worker::class.java), "worker3")
// 定义这些 Worker 的路径
val paths = asList(
"/user/worker1",
"/user/worker2",
"/user/worker3"
)
// 创建 Group 路由
val router = system.actorOf(
RoundRobinGroup(paths).props(), // 传入路径集合 内部进行创建
"groupRouter"
)
// 发送消息
router.tell("广播消息", ActorRef.noSender())
Thread.sleep(1000)
system.terminate()
}
// group模式和pool模式一样,可以在配置文件中先写好然后利用FromConfig的方式进行加载
akka.actor.deployment {
/parent/router3 {
router = round-robin-group
routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
}
}
ActorRef router3 = getContext().actorOf(FromConfig.getInstance().props(), "router3");
在如上案例中任务1、任务2、任务3确实会被自动负载均衡并并行分发给三个由Route创建的三个Worker Actor
RoundRobinPool 和 RoundRobinGroup
上述案例代码中,用到的就是这两个,这两个标识轮询路由,即对于内部的Actor每个消息都是轮询进行的
RandomPool 和 RandomGroup
随机路由,每个消息都是随机路由到任何一个子Actor中
使用方式
// pool
ActorRef router6 =
getContext().actorOf(new RandomPool(5).props(Props.create(Worker.class)), "router6");
和
akka.actor.deployment {
/parent/router5 {
router = random-pool
nr-of-instances = 5
}
}
ActorRef router5 =
getContext().actorOf(FromConfig.getInstance().props(Props.create(Worker.class)), "router5");
// group也是类似
List<String> paths = Arrays.asList("/user/workers/w1", "/user/workers/w2", "/user/workers/w3");
ActorRef router8 = getContext().actorOf(new RandomGroup(paths).props(), "router8");
和
akka.actor.deployment {
/parent/router7 {
router = random-group
routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
}
}
ActorRef router7 = getContext().actorOf(FromConfig.getInstance().props(), "router7");
BalancingPool 平衡池
BalancingPool
是 Akka 中一种特殊的路由策略,其核心目标是动态平衡负载,通过共享同一个邮箱的多个 Routee(子 Actor)来实现高效的任务分配。以下是其核心特性和注意事项:
核心特性
-
动态负载均衡
-
工作窃取机制:空闲的 Routee 会从共享的邮箱中主动拉取消息处理,避免某些 Routee 过载。
-
共享邮箱:所有 Routee 共享同一个邮箱,消息不绑定到特定 Actor,实现任务自动平衡。
-
-
无状态 Routee
-
无独立身份:Routee 虽然有不同的名称,但不能直接通过路径访问(如
actorSelection
),因为它们可能处理任意消息。 -
状态必须包含在消息中:由于消息可能被任意 Routee 处理,无法依赖 Routee 内部状态,需将状态通过消息传递。
-
-
强制使用
BalancingDispatcher
-
线程协作:使用特殊调度器,确保所有 Routee 共享同一线程池,实现高效协作。
-
配置覆盖:即使 Routee 的 Props 中指定了其他 Dispatcher,也会被强制替换为
BalancingDispatcher
。
-
-
适用场景
-
无状态任务处理:例如计算密集型作业、无状态请求处理。
-
高吞吐量需求:当消息处理速度不一致时,自动平衡负载。
-
需要注意,我们只有BalancingPool而没有BalancingGroup,也就意味只能由Route自动创建子Actor而不能利用已有Actor
案例代码
// 定义无状态的 Worker
class StatelessWorker : AbstractActor() {
override fun createReceive() = receiveBuilder()
.matchAny { msg ->
println("${self.path().name()} 处理消息: $msg")
// 模拟耗时操作
Thread.sleep(100)
}
.build()
}
fun main() {
val system = ActorSystem.create("BalancingPoolDemo")
// 创建 BalancingPool Router,包含 3 个 Routee
val router: ActorRef = system.actorOf(
BalancingPool(3)
.props(Props.create(StatelessWorker::class.java)),
"balancingRouter"
)
// 发送 10 条消息,观察负载均衡
repeat(10) { i ->
router.tell("任务$i", ActorRef.noSender())
}
// 等待处理完成
Thread.sleep(1000)
system.terminate()
}
还可以为平衡池配置线程池参数
akka.actor.deployment {
/balancingRouter {
router = balancing-pool
nr-of-instances = 3
pool-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 3
core-pool-size-max = 3
}
}
}
}
但是需要注意,一般不会使用平衡池来发送广播消息,能发送,但是可能会导致多个子Actor均收到消息从而导致重复消费
router.tell(Broadcast("广播消息"), ActorRef.noSender())
SmallestMailboxPool 最小邮箱池
核心特性
-
路由策略:选择邮箱中待处理消息最少的 Routee(子 Actor),优先级顺序为:
-
空闲(未在处理消息)且邮箱为空的 Routee。
-
邮箱为空的 Routee。
-
邮箱中消息最少的 Routee。
-
远程 Routee(优先级最低,因无法准确获取邮箱大小)。
-
-
适用场景:
-
需要负载均衡且有状态的任务(如数据库连接池)。
-
Routee 处理消息耗时不均时,避免某些 Actor 过载。
-
-
限制:
-
无 Group 模式:无法直接引用现有 Actor(需动态管理邮箱状态)。
-
远程 Routee 优先级低:因无法准确感知远程 Actor 的邮箱状态。
-
案例代码
// 定义 Worker(可维护状态)
class StatefulWorker : AbstractActor() {
private var count = 0
override fun createReceive() = receiveBuilder()
.matchAny { msg ->
count++
println("${self.path().name()} 处理消息: $msg (累计处理: $count)")
Thread.sleep(100) // 模拟耗时操作
}
.build()
}
fun main() {
val system = ActorSystem.create("SmallestMailboxDemo")
// 创建 SmallestMailboxPool Router,包含 3 个 Routee
val router: ActorRef = system.actorOf(
SmallestMailboxPool(3)
.props(Props.create(StatefulWorker::class.java)),
"smallestMailboxRouter"
)
// 发送 10 条消息,观察负载均衡
repeat(10) { i ->
router.tell("任务$i", ActorRef.noSender())
}
Thread.sleep(1000)
system.terminate()
}
平衡池和最小邮箱池的区别
BalancingPool | SmallestMailboxPool | |
---|---|---|
负载均衡机制 | 共享邮箱,主动拉取消息 | 根据邮箱大小选择空闲 Actor |
Routee 状态 | 必须无状态 | 可以有状态 |
消息队列要求 | 必须线程安全(MultipleConsumerSemantics ) |
无特殊要求 |
适用场景 | 无状态高吞吐任务 | 需要状态维护或复杂交互的任务 |
BroadcastPool与 BroadcastGroup 广播池
默认会将消息广播给内部的所有子Actor
class StatelessWorker : AbstractActor() {
override fun createReceive() = receiveBuilder()
.matchAny { msg ->
println("${self.path().name()} 收到广播: $msg")
}
.build()
}
fun main() {
val system = ActorSystem.create("BroadcastDemo")
// 创建 BroadcastPool Router,包含 3 个子 Actor
val router: ActorRef = system.actorOf(
BroadcastPool(3)
.props(Props.create(StatelessWorker::class.java)),
"broadcastPoolRouter"
)
// 发送广播消息
router.tell("全局配置更新", ActorRef.noSender())
Thread.sleep(500)
system.terminate()
}
ScatterGatherFirstCompletedPool and ScatterGatherFirstCompletedGroup
广播并且等待子Actor回复,当第一个回复到达,并且返回结果
// 定义处理耗时任务的 Worker
class Worker : AbstractActor() {
override fun createReceive() = receiveBuilder()
.matchAny { msg ->
Thread.sleep((100..500).random().toLong()) // 模拟随机耗时
sender.tell("处理结果: $msg", self)
}
.build()
}
fun main() {
val system = ActorSystem.create("ScatterGatherDemo")
// 创建 ScatterGatherFirstCompletedPool,设置超时 1 秒
val router: ActorRef = system.actorOf(
ScatterGatherFirstCompletedPool(
3,
Duration.ofSeconds(1) // 超时时间
).props(Props.create(Worker::class.java)),
"scatterGatherRouter"
)
// 发送请求并等待第一个响应
// ask返回的是scale下的一个Future对象 需要使用onComplete对象来实现获取结果
Patterns.ask(router, "请求数据", 2000).onComplete({ println(it) }, ExecutionContext.fromExecutor(Executors.newCachedThreadPool())) // 输出Success(处理结果:请求数据)
system.terminate()
}
Group也是同理,针对的是已经创建的Actor,这里就不演示了
TailChoppingPool and TailChoppingGroup
TailChoppingRouter 将首先将消息发送到一个随机选择的路由,然后在短暂的延迟后发送到第二个路由(从剩余的路由中随机选择),依此类推。它等待第一个回复,然后返回并将其转发回原始发件人。其他回复将被丢弃
TailChoppingRouter
是 Akka 中一种智能容错路由策略,旨在通过分阶段尝试多个 Routee 来最小化请求延迟,尤其适用于对响应时间敏感且后端服务存在性能波动的场景
class Worker : AbstractActor() {
override fun createReceive() = receiveBuilder()
.matchAny { msg ->
Thread.sleep((50..200).random().toLong()) // 模拟随机处理时间
sender.tell("处理成功: $msg", self)
}
.build()
}
fun main() {
val system = ActorSystem.create("TailChoppingDemo")
// 配置 TailChoppingPool
val router = system.actorOf(
TailChoppingPool(
3, // 子 Actor 数量
// 需要注意下面三个时间 时间设置不好容易导致冲突从而Failure
Duration.ofMillis(30000), // 总超时时间
Duration.ofMillis(1000) // 尝试间隔
).props(Props.create(Worker::class.java)),
"tailChoppingRouter"
)
// 发送请求并等待响应
val result = Patterns.ask(router, "查询订单", 10000).onComplete({ println(it) }, ExecutionContext.fromExecutor(Executors.newCachedThreadPool()))
system.terminate()
}
ConsistentHashingPool and ConsistentHashingGroup
采用一致性Hash的路由策略
一致性Hash策略很常见的了,具体的原理就不说了,百度一下都能查到,主要是防止了普通Hash节点变化的时候导致的雪崩现象
特殊消息处理
正常情况下,大部分消息都是会通过Router根据规则路由到目标子Actor,但是某些特殊消息,会被Router自身解决而不是分发路由到子Actor处理
(1) 广播消息(Broadcast)
-
行为: 广播消息会被转发给 所有 Routee,无论路由策略如何。
-
语法:
router.tell(Broadcast("广播内容"), ActorRef.noSender())
-
适用场景: 需要所有子 Actor 同时处理同一消息(如全局配置更新)。
-
例外说明: 广播消息是 唯一一种会被所有类型路由(包括简单 Router)处理 的特殊消息。
(2) 终止消息(PoisonPill、Kill)
-
行为: 这些消息不会被转发给 Routee,而是由 路由 Actor 自身处理:
-
PoisonPill
:路由 Actor 会终止自己,并停止所有子 Actor。 -
Kill
:路由 Actor 会抛出ActorKilledException
(触发监管策略)。
-
-
错误用法示例:
// 错误!会导致路由 Actor 终止,而非子 Actor
router.tell(PoisonPill, ActorRef.noSender())
(3) 管理消息(GetRoutees、AddRoutee、RemoveRoutee)
-
行为:
路由 Actor 直接处理这些消息,动态调整 Routee 列表:
-
GetRoutees
:返回当前所有 Routee 的引用。 -
AddRoutee
:添加新的 Routee。 -
RemoveRoutee
:移除指定 Routee。
-
-
示例:
// 获取当前所有 Routee
router.tell(GetRoutees, sender)
// 添加新 Routee
val newWorker = system.actorOf(Props.create(Worker::class.java))
router.tell(AddRoutee(ActorRefRoutee(newWorker)), sender)
自定义路由器
当Akka自带的路由器不符合我们的需求,那么就可以自定义路由器
自定义路由器首先需要保证路由器的行为能和其余成熟路由器一样正常工作,路由器相对于普通Actor而言性能更高,也就意味者编写难度更大
可以看下如下参考代码,自定义Routing路由器,需要继承RoutingLogic类并且实现其方法select,主要职责是决定消息如何路由到Routees,需要保障线程安全,否则会问题很大
class CustomRoutingLogic(private val configParam: Int) : RoutingLogic() {
// 可复用内置策略
private val fallbackStrategy = RoundRobinRoutingLogic()
override fun select(message: Any, routees: IndexedSeq<Routee>): Routee {
return when {
routees.isEmpty() -> Routee.noRoutee
message is SpecialMessage -> handleSpecial(message, routees)
else -> fallbackStrategy.select(message, routees)
}
}
private fun handleSpecial(message: SpecialMessage, routees: IndexedSeq<Routee>): Routee {
// 自定义选择逻辑
val selected = (0 until configParam).map {
routees[message.hashCode() % routees.size]
}
return SeveralRoutees(selected)
}
}
然后是路由器的配置文件,实现方式:继承 GroupBase
或 Pool
类
class CustomRouterConfig(
private val routeePaths: List<String>,
private val logicParam: Int
) : GroupBase() {
// 从配置加载的构造函数
constructor(config: Config) : this(
config.getStringList("routees.paths"),
config.getInt("custom-param")
)
override fun getPaths(system: ActorSystem): Iterable<String> = routeePaths
override fun createRouter(system: ActorSystem): Router {
return Router(CustomRoutingLogic(logicParam))
}
override fun routerDispatcher(): String = Dispatchers.DefaultDispatcherId
}
自定义类和配置类都写完了,就可以准备使用,首先需要在配置文件application.conf中指定我们的配置类
akka.actor.deployment {
/myCustomRouter {
router = "com.example.CustomRouterConfig"
routees.paths = ["/user/worker1", "/user/worker2"]
custom-param = 3
}
}
然后就可以在代码中获取对应的路由器对象并且使用
// 编程式创建
val customRouter = system.actorOf(
CustomRouterConfig(paths, 3).props(),
"programmaticRouter"
)
// 配置驱动创建
val configuredRouter = system.actorOf(
FromConfig.getInstance().props(),
"configuredRouter"
)
FSM有限状态机
FSM有限状态机由AKKA Actor的抽象基类提供,专门用于简化具有清晰状态和状态驱动行为的 Actor 的实现 , 在Erlang中有者很好的描述实践
FSM可以描述成如下核心思想逻辑:State标识当前状态,Event标识事件,Actions标识对应的要执行的动作,第二个State标识新的状态,其核心思想就是将状态转化机内建到Actro模型中
State(S) x Event(E) -> Actions (A), State(S')
普通Actor模型的挑战和FSM解决方案
普通的Actor模型中,一个Actor的行为和状态都是通过接收到的消息Receive触发然后进行的,对于复杂的逻辑(多个状态,状态间的转化等等),这些逻辑往往需要散落在receive中的多个case或者说if-else这类控制语句中,导致代码冗长,嵌套,难以理解和维护,状态流转十分的不直观
而在FSM解决方案中,FSM-Actor将Actro明确的建模为一个有限状态机,强制你将系统分为了:
-
有限个离散状态(States):例如Idle,Processing,WaitingForResponse,Error
-
触发状态转化的事件(Event):通常就是指Actor接收到的消息
-
状态转化规则(Transitions):定义在哪个状态下接收哪种事件的时候,需要转化到某种状态
-
动作(Action):在进入某个状态;处于某个状态待定接收消息但不发生转换;退出某个状态的时候,需要进行的操作
-
状态数据(State Data):一个随状态变化而可能变化的数据结构,携带与当前状态相关的信息
在Java/Kotlin中通过AbstractFSM<S, D>来实现FSM-Actor,通过继承实现这个抽象类,可以帮助我们定义FSM-Actor
案例
本次案例项目代码功能为:通过FSM-Actor完成类似的作为流处理中的窗口操作
首先从AbstractFSM<S, D>上就可以看出来,需要两个泛型类型,这两个泛型类型分别代表啥呢?S其实就是State代表状态,D代表Data即状态数据
那么首先我们就需要有能作为state和data的类,一般这两个类我们定义为enum枚举类型
// states
enum State {
Idle,
Active
}
// state data
interface Data {}
enum Uninitialized implements Data {
Uninitialized
}
// Todo类其实有点感觉像行为 但是实际上是属于数据状态
final class Todo implements Data {
private final ActorRef target;
private final List<Object> queue;
public Todo(ActorRef target, List<Object> queue) {
this.target = target;
this.queue = queue;
}
public ActorRef getTarget() {
return target;
}
public List<Object> getQueue() {
return queue;
}
public String toString() {
return "Todo{" + "target=" + target + ", queue=" + queue + '}';
}
public Todo addElement(Object element) {
List<Object> nQueue = new LinkedList<>(queue);
nQueue.add(element);
return new Todo(this.target, nQueue);
}
public Todo copy(List<Object> queue) {
return new Todo(this.target, queue);
}
public Todo copy(ActorRef target) {
return new Todo(target, this.queue);
}
}
定义完成之后,我们需要定义一些FSM-Actor需要接收和处理的消息类型
// SetTarget 设置目标Actor
final class SetTarget {
private final ActorRef ref; // 目标Actor引用
}
// Queue 入队消息
final class Queue {
private final Object obj; // 要入队的对象
}
// Batch 批量消息(最终发送的,消息都会被封装为这个对象)
final class Batch {
private final List<Object> list; // 批量对象列表
}
// Flush 刷新指令
enum Flush { Flush }
那么到这里,我们可以来看看我们的FSM-Actor对象
public class Buncher extends AbstractFSM<State, Data> {
{
// startWith 函数原型如下 接收一个state状态名,一个stateData数据状态 也就代表该Actor初始化状态为传入的这两个状态
/*
public final void startWith(final S stateName, final D stateData, final Option<FiniteDuration> timeout) {
FSM.startWith$(this, stateName, stateData, timeout);
}
*/
startWith(Idle, Uninitialized);
/*
when(<name>[, stateTimeout = <timeout>])(stateFunction)
给定的名称必须是与AbstractFSM类的第一个类型参数类型兼容的对象。此对象用作哈希键,因此必须确保它正确实现equals和hashCode;尤其是它不能是可变的。最适合这些需求的是case对象。
如果给定stateTimeout参数,那么默认情况下,所有转换到该状态(包括保持)的操作都将接收该超时。使用显式超时启动转换可用于重写此默认值,有关详细信息,请参阅「Initiating Transitions」。在使用setStateTimeout(state, duration)进行操作处理期间,可以更改任何状态的状态超时。这将启用运行时配置,例如通过外部消息。
此when的含义是
当处于Idle状态的时候,收到SetTarget消息时:保持Idle状态,更新数据为Todo(包含目标Actor和空队列)
*/
when(
Idle,
matchEvent(
SetTarget.class,
Uninitialized.class,
(setTarget, uninitialized) ->
stay().using(new Todo(setTarget.getRef(), new LinkedList<>()))));
/*
状态转换钩子
关键行为:当从Active转换到Idle时
发送队列中的所有消息(作为Batch)给目标Actor
*/
onTransition(
matchState(
Active,
Idle,
() -> {
// reuse this matcher
final UnitMatch<Data> m =
UnitMatch.create(
matchData(
Todo.class,
todo ->
todo.getTarget().tell(new Batch(todo.getQueue()), getSelf())));
m.match(stateData());
})
.state(
Idle,
Active,
() -> {
/* Do something here */
}));
/*
当处于Active状态的时候触发
两种触发条件:收到Flush消息(手动刷新)和 1秒超时(自动刷新)
处理逻辑:
转移到Idle状态,重置队列(清空)
*/
when(
Active,
Duration.ofSeconds(1L),
matchEvent(
Arrays.asList(Flush.class, StateTimeout()),
Todo.class,
(event, todo) -> goTo(Idle).using(todo.copy(new LinkedList<>()))));
/*
未处理消息
处理Queue消息:将对象添加到队列,转移到Active状态
其他未处理消息:记录警告并保持状态
*/
whenUnhandled(
matchEvent(
Queue.class,
Todo.class,
(queue, todo) -> goTo(Active).using(todo.addElement(queue.getObj())))
.anyEvent(
(event, state) -> {
log()
.warning(
"received unhandled request {} in state {}/{}",
event,
stateName(),
state);
return stay();
}));
initialize();
}
}
启动转换
任何stateFunction
的结果都必须是下一个状态的定义,除非终止 FSM,如「」。状态定义可以是当前状态(如stay
指令所述),也可以是goto(state)
给出的不同状态。结果对象允许通过下面描述的修饰符进一步限定:
-
forMax(duration)
,此修饰符设置下一个状态的状态超时。这意味着计时器(timer
)启动,到期时向 FSM 发送StateTimeout
消息。此计时器在同时接收到任何其他消息时被取消;你可以依赖这样一个事实,即在干预消息之后将不会处理StateTimeout
消息。此修饰符还可用于重写为目标状态指定的任何默认超时。如果要取消默认超时,请使用Duration.Inf
。 -
using(data)
,此修饰符将旧状态数据替换为给定的新数据。如果你遵循上面的建议,这是唯一一个修改内部状态数据的地方。 -
replying(msg)
,此修饰符向当前处理的消息发送答复,否则不会修改状态转换。
需要注意的是,在when语句块中不能出现return语句
监视转换
内部监视
从案例代码来看,监视转换就是通过onTransition进行的
每次监控都是将动作与转化相关联,接收一对参数(oldState 和 newState ,即matchState这个方法参数所需要的入参lambda表达式,需要的两个参数就是分别为oldState,newState,标识监控从oldState到newState的转化)
外部监视
这个属于内部监听,即只有自己这个FSM-Actot本身能获取到状态的改变,如果外部Actor需要知道另外一个FSM-Actor的状态变化情况,可以通过发送消息SubscribeTransitionCallBack(actorRef)
来注册以获得状态转换的通知。命名的 Actor 将立即发送一条CurrentState(self, stateName)
消息,并在触发状态更改时接收Transition(actorRef, oldState, newState)
消息。
通过向 FSM Actor 发送UnsubscribeTransitionCallBack(actorRef)
,可以注销外部监控。
定时器
除了状态超时之外,FSM 还管理由String
名称标识的定时器(timers
)。你可以使用
setTimer(name, msg, interval, repeat)
其中msg
是将在持续时间interval
结束后发送的消息对象。如果repeat
为true
,则计时器按interval
参数给定的固定速率调度。在添加新计时器之前,任何具有相同名称的现有计时器都将自动取消。
计时器取消可以使用:
cancelTimer(name)
它保证立即工作,这意味着即使计时器已经启动并将其排队,也不会在调用后处理计划的消息。任何计时器的状态都可以通过以下方式获取:
isTimerActive(name)
这些命名的计时器补充状态超时,因为它们不受接收其他消息的影响。
文章标题:Akka-Actor模型-解决高并发的终极方案-入门篇(五)
文章链接:https://zealsinger.xyz/?post=20
本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明来自ZealSinger !
如果觉得文章对您有用,请随意打赏。
您的支持是我们继续创作的动力!

微信扫一扫

支付宝扫一扫