«

Akka-Actor模型-解决高并发的终极方案-入门篇(六)

ZealSinger 发布于 阅读:89 技术文档


Persistence持久化

基础概念

需要导入依赖
testImplementation "com.typesafe.akka:akka-persistence-testkit_${versions.ScalaBinary}"

AKKA Persistence能够使有状体的actor能够持久化,以便在actor重新启动的时候例如JVM崩溃后,由supervior或者手动停止-启动时,或者集群内部迁移的时候进行恢复

其背后的本质关键是:只存储状态变化的事件,而不是当前状态本身

核心相关API是

入门案例

如下 定义了两种消息类型,即 CmdEvt,分别表示命令和事件。ExamplePersistentActor 的状态ExampleState 中包含的持久化事件数据的列表。

// 消息类型一
class Cmd implements Serializable {
 private static final long serialVersionUID = 1L;
 private final String data;

 public Cmd(String data) {
   this.data = data;
}

 public String getData() {
   return data;
}
}

// 消息类型二
class Evt implements Serializable {
 private static final long serialVersionUID = 1L;
 private final String data;

 public Evt(String data) {
   this.data = data;
}

 public String getData() {
   return data;
}
}

class ExampleState implements Serializable {
 private static final long serialVersionUID = 1L;
 private final ArrayList<String> events;

 public ExampleState() {
   this(new ArrayList<>());
}

 public ExampleState(ArrayList<String> events) {
   this.events = events;
}

 public ExampleState copy() {
   return new ExampleState(new ArrayList<>(events));
}

 public void update(Evt evt) {
   events.add(evt.getData());
}

 public int size() {
   return events.size();
}

 @Override
 public String toString() {
   return events.toString();
}
}

class ExamplePersistentActor extends AbstractPersistentActor {

 private ExampleState state = new ExampleState();
 private int snapShotInterval = 1000;

 public int getNumEvents() {
   return state.size();
}

 @Override
 public String persistenceId() {
   return "sample-id-1";
}
   
   // 在这里定义接收Evt和SnapshotOffer消息来定义在恢复期间如何更新状态
// 即整个恢复过程的方法就是重写这个方法的逻辑
   // AKKA对于重启的actor会自动触发该方法
 @Override
 public Receive createReceiveRecover() {
   return receiveBuilder()
      .match(Evt.class, state::update)  // 依次将快照中的Evt事件通过这个进行处理,应用事件到状态
      .match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot()) // 恢复快照状态 就相当于拿到之前存的所有的Evt事件
      .build();
}

 @Override
 public Receive createReceive() {
   return receiveBuilder()
      .match(
           Cmd.class,
           c -> {
             final String data = c.getData();
             final Evt evt = new Evt(data + "-" + getNumEvents());
               // persist方法就是持久化事件的方法并为成功保留的事件执行事件处理程序。成功持久化的事件将在内部作为 触发事件处理程序执行的单个消息 发送回参与者,持久化事件的发送方默认是响应命令的发送方,这将允许事件处理程序回复命令的发送方 persist也能保证持久化Actor不会在persist调用和关联事件处理程序的执行之间收到进一步的命令
             persist(
                 evt,
                (Evt e) -> {
                   state.update(e); // 更新状态为最新的Evt
                   getContext().getSystem().getEventStream().publish(e);// 发布事件
                     // 这里是条件过滤 saveSnapshot(state.copy())保存当前快照 之所以进行条件控制 主要是为了防止每一次都进行状态的快照存储导致内存过大(属于IO操作开销很大)
                   if (lastSequenceNr() % snapShotInterval == 0 && lastSequenceNr() != 0)
                     saveSnapshot(state.copy());
                });
          })
      .matchEquals("print", s -> System.out.println(state))
      .build();
}
}

处理流程

 
 
 
 
 
满足条件
不满足
接收Cmd命令
创建Evt事件
持久化事件到日志
更新内存状态
检查序列号
保存状态快照
完成处理
 

恢复流程

 
 
 
 
 
 
Actor启动
加载最新快照
恢复快照状态
重放快照后的事件
更新状态
进入就绪状态
 

persist持久化时间,如果持久化失败,将会调用onPersistFailure(默认逻辑为记录错误),并且将actor无条件的停止 ; 如果事件在被持久化之前被拒绝,例如序列化错误,将会调用onPersistRejected(默认记录警告)

 

Test Actor

AKKA提供的专用的测试模块

首先需要导入依赖

testImplementation "com.typesafe.akka:akka-testkit_${versions.ScalaBinary}"

主要功能都包含在TestKit这个工具包内,Testkit 允许在受控但真实的环境中测试角色。环境的定义在很大程度上取决于手头的问题和打算测试的级别,范围从简单检查到完整的系统测试

public class TestKitSampleTest extends AbstractJavaTest {

 public static class SomeActor extends AbstractActor {
   ActorRef target = null;

   @Override
   public Receive createReceive() {
     return receiveBuilder()
        .matchEquals(
             "hello",
             message -> {
               getSender().tell("world", getSelf());
               if (target != null) target.forward(message, getContext());
            })
        .match(
             ActorRef.class,
             actorRef -> {
               target = actorRef;
               getSender().tell("done", getSelf());
            })
        .build();
  }
}

 static ActorSystem system;

 @BeforeClass
 public static void setup() {
   system = ActorSystem.create();
}

 @AfterClass
 public static void teardown() {
   TestKit.shutdownActorSystem(system);
   system = null;
}

   // test注解标识专门的测试方法
 @Test
 public void testIt() {
   new TestKit(system) {
    {
         // 正常的创建目标Actor
       final Props props = Props.create(SomeActor.class);
       final ActorRef subject = system.actorOf(props);
       final TestKit probe = new TestKit(system);
         // 正常发送消息 发送内容为probe.getRef的消息给subject
       subject.tell(probe.getRef(), getRef());
         // 期待在1s内接收到为done的消息
       expectMsg(Duration.ofSeconds(1), "done");

       // 验证代码逻辑
       within(
           Duration.ofSeconds(3),  // 设计该逻辑需要在3s内完成
          () -> {
               // 发送内容为hello的消息给subject
             subject.tell("hello", getRef());
               
               // 等待直到probe收到消息
             awaitCond(probe::msgAvailable);

               // 0秒内期待回复为world的回答
             expectMsg(Duration.ZERO, "world");
 
               //同上
             probe.expectMsg(Duration.ZERO, "hello");
               
               //对于回复进行断言判断 probe最后收到的消息发送者应该是当前测试探针
             Assert.assertEquals(getRef(), probe.getLastSender());

             // 等待3s剩下的时间
             expectNoMessage();
             return null;
          });
    }
  };
}
}

经典工作流如下

 
TestKitTested ActorTestProbe发送测试消息发送给依赖组件模拟响应返回结果验证结果TestKitTested ActorTestProbe
 

TestKit和TestProbe

Test Kit本质上是一个 测试环境容器 + 测试探针 的组合体,拥有测试相关的所有功能,共享主测试环境的消息队列,可以创建和管理其他的Actor

而Test Probe只是一个测试探针,只能充当消息的接收者即接收Actor,不具备创建和管理别的Actor的能力,并且是自己独立的消息队列

可以看看两者的使用常用

// TestKit
// 场景:作为主测试入口
new TestKit(system) {{
// 1. 创建被测Actor
ActorRef subject = system.actorOf(Props.create(Subject.class));
 
// 2. 创建辅助TestKit实例(作为第二观察点)
TestKit observer = new TestKit(system);
 
// 3. 复杂测试逻辑
subject.tell("start", getRef());
observer.expectMsg("started");
}};
//TestProbe
// 场景:模拟多个外部依赖
TestProbe dbProbe = new TestProbe(system, "db");
TestProbe apiProbe = new TestProbe(system, "api");

subject.tell(new ProcessData(dbProbe.ref(), apiProbe.ref()), ActorRef.noSender());

// 验证外部调用
dbProbe.expectMsgClass(SaveCommand.class);
apiProbe.expectMsgClass(FetchRequest.class);
  1. TestKit 实例 是:

    • 一个自包含的测试环境

    • 具备完整的测试能力

    • 适合作为主测试入口

    • 相对重量级

    • 一般用于复杂的场景

  2. TestProbe 是:

    • 轻量级消息观察器

    • 专注于消息接收和断言

    • 适合模拟依赖和多参与者场景

    • 资源效率更高

    • 如果只需要验证接收消息或者其他的比较简单的验证场景,就可以使用这个

Kotlin 编程 Java