Akka-Actor模型-解决高并发的终极方案-入门篇(六)
ZealSinger 发布于 阅读:89 技术文档
基础概念
需要导入依赖
testImplementation "com.typesafe.akka:akka-persistence-testkit_${versions.ScalaBinary}"
AKKA Persistence能够使有状体的actor能够持久化,以便在actor重新启动的时候例如JVM崩溃后,由supervior或者手动停止-启动时,或者集群内部迁移的时候进行恢复
其背后的本质关键是:只存储状态变化的事件,而不是当前状态本身
核心相关API是
-
AbstractPersistenActor 定义持久化Actor就需要实现这个类。能够将事件持久化到日志中,并可以线程安全的进行相关操作,可以用于实现命令和事件源的actor,当参与者启动or重启的时候,日志消息将重放到该参与者,以便他能从这些日志消息中恢复状态
-
AbstractPersistentActorWithAtLeastOnceDelivery
至少一次传递语义的消息发送到目标,即使在发送方or接收方JVM崩溃的情况下
-
AsyncWriterJournal
日志存储发送到持久Actor的消息序列,应用程序可以控制记录哪些消息,以及Actor接收哪些消息而不记录,且日志的存储后端是可插拔的
入门案例
如下 定义了两种消息类型,即 Cmd
和 Evt
,分别表示命令和事件。ExamplePersistentActor
的状态
是 ExampleState
中包含的持久化事件数据的列表。
// 消息类型一
class Cmd implements Serializable {
private static final long serialVersionUID = 1L;
private final String data;
public Cmd(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
// 消息类型二
class Evt implements Serializable {
private static final long serialVersionUID = 1L;
private final String data;
public Evt(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
class ExampleState implements Serializable {
private static final long serialVersionUID = 1L;
private final ArrayList<String> events;
public ExampleState() {
this(new ArrayList<>());
}
public ExampleState(ArrayList<String> events) {
this.events = events;
}
public ExampleState copy() {
return new ExampleState(new ArrayList<>(events));
}
public void update(Evt evt) {
events.add(evt.getData());
}
public int size() {
return events.size();
}
@Override
public String toString() {
return events.toString();
}
}
class ExamplePersistentActor extends AbstractPersistentActor {
private ExampleState state = new ExampleState();
private int snapShotInterval = 1000;
public int getNumEvents() {
return state.size();
}
@Override
public String persistenceId() {
return "sample-id-1";
}
// 在这里定义接收Evt和SnapshotOffer消息来定义在恢复期间如何更新状态
// 即整个恢复过程的方法就是重写这个方法的逻辑
// AKKA对于重启的actor会自动触发该方法
@Override
public Receive createReceiveRecover() {
return receiveBuilder()
.match(Evt.class, state::update) // 依次将快照中的Evt事件通过这个进行处理,应用事件到状态
.match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot()) // 恢复快照状态 就相当于拿到之前存的所有的Evt事件
.build();
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
Cmd.class,
c -> {
final String data = c.getData();
final Evt evt = new Evt(data + "-" + getNumEvents());
// persist方法就是持久化事件的方法并为成功保留的事件执行事件处理程序。成功持久化的事件将在内部作为 触发事件处理程序执行的单个消息 发送回参与者,持久化事件的发送方默认是响应命令的发送方,这将允许事件处理程序回复命令的发送方 persist也能保证持久化Actor不会在persist调用和关联事件处理程序的执行之间收到进一步的命令
persist(
evt,
(Evt e) -> {
state.update(e); // 更新状态为最新的Evt
getContext().getSystem().getEventStream().publish(e);// 发布事件
// 这里是条件过滤 saveSnapshot(state.copy())保存当前快照 之所以进行条件控制 主要是为了防止每一次都进行状态的快照存储导致内存过大(属于IO操作开销很大)
if (lastSequenceNr() % snapShotInterval == 0 && lastSequenceNr() != 0)
saveSnapshot(state.copy());
});
})
.matchEquals("print", s -> System.out.println(state))
.build();
}
}
处理流程
恢复流程
persist持久化时间,如果持久化失败,将会调用onPersistFailure(默认逻辑为记录错误),并且将actor无条件的停止 ; 如果事件在被持久化之前被拒绝,例如序列化错误,将会调用onPersistRejected(默认记录警告)
Test Actor
AKKA提供的专用的测试模块
首先需要导入依赖
testImplementation "com.typesafe.akka:akka-testkit_${versions.ScalaBinary}"
主要功能都包含在TestKit这个工具包内,Testkit 允许在受控但真实的环境中测试角色。环境的定义在很大程度上取决于手头的问题和打算测试的级别,范围从简单检查到完整的系统测试
public class TestKitSampleTest extends AbstractJavaTest {
public static class SomeActor extends AbstractActor {
ActorRef target = null;
public Receive createReceive() {
return receiveBuilder()
.matchEquals(
"hello",
message -> {
getSender().tell("world", getSelf());
if (target != null) target.forward(message, getContext());
})
.match(
ActorRef.class,
actorRef -> {
target = actorRef;
getSender().tell("done", getSelf());
})
.build();
}
}
static ActorSystem system;
public static void setup() {
system = ActorSystem.create();
}
public static void teardown() {
TestKit.shutdownActorSystem(system);
system = null;
}
// test注解标识专门的测试方法
public void testIt() {
new TestKit(system) {
{
// 正常的创建目标Actor
final Props props = Props.create(SomeActor.class);
final ActorRef subject = system.actorOf(props);
final TestKit probe = new TestKit(system);
// 正常发送消息 发送内容为probe.getRef的消息给subject
subject.tell(probe.getRef(), getRef());
// 期待在1s内接收到为done的消息
expectMsg(Duration.ofSeconds(1), "done");
// 验证代码逻辑
within(
Duration.ofSeconds(3), // 设计该逻辑需要在3s内完成
() -> {
// 发送内容为hello的消息给subject
subject.tell("hello", getRef());
// 等待直到probe收到消息
awaitCond(probe::msgAvailable);
// 0秒内期待回复为world的回答
expectMsg(Duration.ZERO, "world");
//同上
probe.expectMsg(Duration.ZERO, "hello");
//对于回复进行断言判断 probe最后收到的消息发送者应该是当前测试探针
Assert.assertEquals(getRef(), probe.getLastSender());
// 等待3s剩下的时间
expectNoMessage();
return null;
});
}
};
}
}
经典工作流如下
TestKit和TestProbe
Test Kit本质上是一个 测试环境容器 + 测试探针 的组合体,拥有测试相关的所有功能,共享主测试环境的消息队列,可以创建和管理其他的Actor
而Test Probe只是一个测试探针,只能充当消息的接收者即接收Actor,不具备创建和管理别的Actor的能力,并且是自己独立的消息队列
可以看看两者的使用常用
// TestKit
// 场景:作为主测试入口
new TestKit(system) {{
// 1. 创建被测Actor
ActorRef subject = system.actorOf(Props.create(Subject.class));
// 2. 创建辅助TestKit实例(作为第二观察点)
TestKit observer = new TestKit(system);
// 3. 复杂测试逻辑
subject.tell("start", getRef());
observer.expectMsg("started");
}};
//TestProbe
// 场景:模拟多个外部依赖
TestProbe dbProbe = new TestProbe(system, "db");
TestProbe apiProbe = new TestProbe(system, "api");
subject.tell(new ProcessData(dbProbe.ref(), apiProbe.ref()), ActorRef.noSender());
// 验证外部调用
dbProbe.expectMsgClass(SaveCommand.class);
apiProbe.expectMsgClass(FetchRequest.class);
-
TestKit
实例 是:-
一个自包含的测试环境
-
具备完整的测试能力
-
适合作为主测试入口
-
相对重量级
-
一般用于复杂的场景
-
-
TestProbe
是:-
轻量级消息观察器
-
专注于消息接收和断言
-
适合模拟依赖和多参与者场景
-
资源效率更高
-
-
文章标题:Akka-Actor模型-解决高并发的终极方案-入门篇(六)
文章链接:https://zealsinger.xyz/?post=21
本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明来自ZealSinger !
如果觉得文章对您有用,请随意打赏。
您的支持是我们继续创作的动力!

微信扫一扫

支付宝扫一扫