«

Akka-Actor模型-解决高并发的终极方案-入门篇

ZealSinger 发布于 阅读:117 技术文档


指南网站

Akka简介

Akka是在JVM上构建的,基于Actor模型的,高并发的,分布式的,容错应用的工具包和运行时,底层使用Scala编写,同时提供了面向Scala和Java的开发接口

Actor模型

在我们实现并发线程的通信方式中,其实总体就是分为两种方式:共享内存+消息传递,在大多数的开发语言中,采用的都是共享内存的形式,这也就存在一个很严重的数据竞争的问题,也是我们常见的八股的来源,所谓的数据并发问题。

而Actor模型,其实就是消息模型,每个Actor在同一时间处理最多一个消息,可以发送消息给其他的Actor,保证了单独写的原则,从而巧妙地避免了多线程地争夺,和共享数据的方式相比,消息传递机制最大的优点就是不会产生数据竞争,常见地方式有基于channel(例如golang)的消息传递和基于Actor(例如erlang)的消息传递

可以简单的理解,一个线程就是一个Actor,每个Actor中存储状态,行为,且每个Actor有且刚好仅有一个MailBox,MailBox相当于一个小型队列,用于存储Sender发送的消息,默认情况下是先进先出FIFO队列,也可以自行配置

每个Actor中不会有任何共享的数据,Actor之间通过消息的方式,一个Actor给另外一个Acotr发送消息,另外一个Acotr收到消息之后进行判断做出行为,且消息传递是异步的

image-20250521161805363

每个Actor = 状态(指actor对象的变量信息,状态由actor自身管理,避免并发环境下的锁和内存原子性问题) + 行为(指代actor中的计算逻辑,通过actor接收到的消息来改变actor的状态) + 消息/邮箱,通过不同的行为决策,可以作到任务分配和并发处理,任务拆解等

image-20250521162744309

一个系统中存在多个Actor相互发消息,但是即使同时有多个Acotr朝一个Actor发送消息,此目标Actor也只能一次处理一个消息,其余的消息就会存储到MailBox中,如果要实现并发处理,一个消息就该对应的多个目标Actor从而实现并发处理

image-20250521163347389

Akka的重要组成

快速入门

akka-guide/articles/qucikstart-akka-java.md at master · guobinhit/akka-guide

不是很清楚为何我电脑上跑不起来这个代码,但是不妨碍理解和分析

主要是三个类HelloWorldMain ; HelloWorldBot ;HelloWorld

// HelloWorldMain
public class HelloWorldMain extends AbstractBehavior<HelloWorldMain.SayHello> {
 

 public static void main(String[] args) throws Exception {
   
   final ActorSystem<SayHello> system =
       ActorSystem.create(HelloWorldMain.create(), "hello");

   system.tell(new HelloWorldMain.SayHello("World"));
   system.tell(new HelloWorldMain.SayHello("Akka"));
   

   Thread.sleep(3000);
   system.terminate();
}
 

 public static record SayHello(String name) {}

 public static Behavior<SayHello> create() {
   return Behaviors.setup(HelloWorldMain::new);
}

 private final ActorRef<HelloWorld.Greet> greeter;

 private HelloWorldMain(ActorContext<SayHello> context) {
   super(context);
   greeter = context.spawn(HelloWorld.create(), "greeter");
}

 @Override
 public Receive<SayHello> createReceive() {
   return newReceiveBuilder().onMessage(SayHello.class, this::onSayHello).build();
}

 private Behavior<SayHello> onSayHello(SayHello command) {
   ActorRef<HelloWorld.Greeted> replyTo =
       getContext().spawn(HelloWorldBot.create(3), command.name);
   greeter.tell(new HelloWorld.Greet(command.name, replyTo));
   return this;
}
}

// HelloWorld
public class HelloWorld extends AbstractBehavior<HelloWorld.Greet> {

 public static record Greet(String whom, ActorRef<Greeted> replyTo) {}
 public static record Greeted(String whom, ActorRef<Greet> from) {}

 public static Behavior<Greet> create() {
   return Behaviors.setup(HelloWorld::new);
}

 private HelloWorld(ActorContext<Greet> context) {
   super(context);
}

 @Override
 public Receive<Greet> createReceive() {
   return newReceiveBuilder().onMessage(Greet.class, this::onGreet).build();
}

 private Behavior<Greet> onGreet(Greet command) {
   getContext().getLog().info("Hello {}!", command.whom);
   command.replyTo.tell(new Greeted(command.whom, getContext().getSelf()));
   return this;
}
}

// HelloWorldBot
public class HelloWorldBot extends AbstractBehavior<HelloWorld.Greeted> {

 public static Behavior<HelloWorld.Greeted> create(int max) {
   return Behaviors.setup(context -> new HelloWorldBot(context, max));
}

 private final int max;
 private int greetingCounter;

 private HelloWorldBot(ActorContext<HelloWorld.Greeted> context, int max) {
   super(context);
   this.max = max;
}

 @Override
 public Receive<HelloWorld.Greeted> createReceive() {
   return newReceiveBuilder().onMessage(HelloWorld.Greeted.class, this::onGreeted).build();
}

 private Behavior<HelloWorld.Greeted> onGreeted(HelloWorld.Greeted message) {
   greetingCounter++;
   getContext().getLog().info("Greeting {} for {}", greetingCounter, message.from());
   if (greetingCounter == max) {
     return Behaviors.stopped();
  } else {
     message.from().tell(new HelloWorld.Greet(message.whom(), getContext().getSelf()));
     return this;
  }
}
}
// 最后输出
Hello World!
Greeting 1 for Actor[akka://hello/user/greeter#-123456789]
Hello World!
Greeting 2 for Actor[akka://hello/user/greeter#-123456789]
Hello World!
Greeting 3 for Actor[akka://hello/user/greeter#-123456789]
Hello Akka!
Greeting 1 for Actor[akka://hello/user/greeter#-123456789]
Hello Akka!
Greeting 2 for Actor[akka://hello/user/greeter#-123456789]
Hello Akka!
Greeting 3 for Actor[akka://hello/user/greeter#-123456789]
  1. 消息触发顺序

    • HelloWorldMain 收到 SayHello 消息后,创建 HelloWorldBot 并向 HelloWorld 发送 Greet

    • HelloWorld 处理 Greet 后,回复 GreetedHelloWorldBot,触发循环。

  2. 循环逻辑

    • HelloWorldBot 每收到一次 Greeted 消息,计数器加 1,并重新发送 GreetHelloWorld,直到计数器满 3 次后停止。

  3. 系统层级

    • Actor 之间通过父子关系形成树形结构,确保消息传递和监管的隔离性。

image.png

 

image-20250521185857483

image-20250521190255023

 

编程 Java