Akka-Actor模型-解决高并发的终极方案-入门篇
ZealSinger 发布于 阅读:117 技术文档
Akka简介
Akka是在JVM上构建的,基于Actor模型的,高并发的,分布式的,容错应用的工具包和运行时,底层使用Scala编写,同时提供了面向Scala和Java的开发接口
Actor模型
在我们实现并发线程的通信方式中,其实总体就是分为两种方式:共享内存+消息传递,在大多数的开发语言中,采用的都是共享内存的形式,这也就存在一个很严重的数据竞争的问题,也是我们常见的八股的来源,所谓的数据并发问题。
而Actor模型,其实就是消息模型,每个Actor在同一时间处理最多一个消息,可以发送消息给其他的Actor,保证了单独写的原则,从而巧妙地避免了多线程地争夺,和共享数据的方式相比,消息传递机制最大的优点就是不会产生数据竞争,常见地方式有基于channel(例如golang)的消息传递和基于Actor(例如erlang)的消息传递
可以简单的理解,一个线程就是一个Actor,每个Actor中存储状态,行为,且每个Actor有且刚好仅有一个MailBox,MailBox相当于一个小型队列,用于存储Sender发送的消息,默认情况下是先进先出FIFO队列,也可以自行配置
每个Actor中不会有任何共享的数据,Actor之间通过消息的方式,一个Actor给另外一个Acotr发送消息,另外一个Acotr收到消息之后进行判断做出行为,且消息传递是异步的
每个Actor = 状态(指actor对象的变量信息,状态由actor自身管理,避免并发环境下的锁和内存原子性问题) + 行为(指代actor中的计算逻辑,通过actor接收到的消息来改变actor的状态) + 消息/邮箱,通过不同的行为决策,可以作到任务分配和并发处理,任务拆解等
一个系统中存在多个Actor相互发消息,但是即使同时有多个Acotr朝一个Actor发送消息,此目标Actor也只能一次处理一个消息,其余的消息就会存储到MailBox中,如果要实现并发处理,一个消息就该对应的多个目标Actor从而实现并发处理
Akka的重要组成
-
akka-actors
akka的核心,一个用于并发和分发的模型,没有线程原语的痛苦
-
akka-stream
一种直观而安全的方式来实现异步,非阻塞的回压流处理
-
akka-http
现代的,快速的,异步的,流的HTTP服务器和客户端
-
akka-sharding
根据用户的身份,在集群中分配参与者
-
akka-cluster
通过多个节点上分布我们的系统来获取弹性
-
Distributed Data
最终一致性,高度读取和写入可用,低延迟数据
-
Akka Persistence
为参与者的事件包允许他们在重启之后达到相同的状态
-
Akka Management
云系统上允许Akka系统的拓展(k8s,aws...)
-
Alpakka
Akka流连接器用于集成其他技术
快速入门
不是很清楚为何我电脑上跑不起来这个代码,但是不妨碍理解和分析
主要是三个类HelloWorldMain ; HelloWorldBot ;HelloWorld
// HelloWorldMain
public class HelloWorldMain extends AbstractBehavior<HelloWorldMain.SayHello> {
public static void main(String[] args) throws Exception {
final ActorSystem<SayHello> system =
ActorSystem.create(HelloWorldMain.create(), "hello");
system.tell(new HelloWorldMain.SayHello("World"));
system.tell(new HelloWorldMain.SayHello("Akka"));
Thread.sleep(3000);
system.terminate();
}
public static record SayHello(String name) {}
public static Behavior<SayHello> create() {
return Behaviors.setup(HelloWorldMain::new);
}
private final ActorRef<HelloWorld.Greet> greeter;
private HelloWorldMain(ActorContext<SayHello> context) {
super(context);
greeter = context.spawn(HelloWorld.create(), "greeter");
}
public Receive<SayHello> createReceive() {
return newReceiveBuilder().onMessage(SayHello.class, this::onSayHello).build();
}
private Behavior<SayHello> onSayHello(SayHello command) {
ActorRef<HelloWorld.Greeted> replyTo =
getContext().spawn(HelloWorldBot.create(3), command.name);
greeter.tell(new HelloWorld.Greet(command.name, replyTo));
return this;
}
}
// HelloWorld
public class HelloWorld extends AbstractBehavior<HelloWorld.Greet> {
public static record Greet(String whom, ActorRef<Greeted> replyTo) {}
public static record Greeted(String whom, ActorRef<Greet> from) {}
public static Behavior<Greet> create() {
return Behaviors.setup(HelloWorld::new);
}
private HelloWorld(ActorContext<Greet> context) {
super(context);
}
public Receive<Greet> createReceive() {
return newReceiveBuilder().onMessage(Greet.class, this::onGreet).build();
}
private Behavior<Greet> onGreet(Greet command) {
getContext().getLog().info("Hello {}!", command.whom);
command.replyTo.tell(new Greeted(command.whom, getContext().getSelf()));
return this;
}
}
// HelloWorldBot
public class HelloWorldBot extends AbstractBehavior<HelloWorld.Greeted> {
public static Behavior<HelloWorld.Greeted> create(int max) {
return Behaviors.setup(context -> new HelloWorldBot(context, max));
}
private final int max;
private int greetingCounter;
private HelloWorldBot(ActorContext<HelloWorld.Greeted> context, int max) {
super(context);
this.max = max;
}
public Receive<HelloWorld.Greeted> createReceive() {
return newReceiveBuilder().onMessage(HelloWorld.Greeted.class, this::onGreeted).build();
}
private Behavior<HelloWorld.Greeted> onGreeted(HelloWorld.Greeted message) {
greetingCounter++;
getContext().getLog().info("Greeting {} for {}", greetingCounter, message.from());
if (greetingCounter == max) {
return Behaviors.stopped();
} else {
message.from().tell(new HelloWorld.Greet(message.whom(), getContext().getSelf()));
return this;
}
}
}
// 最后输出
Hello World!
Greeting 1 for Actor[akka://hello/user/greeter#-123456789]
Hello World!
Greeting 2 for Actor[akka://hello/user/greeter#-123456789]
Hello World!
Greeting 3 for Actor[akka://hello/user/greeter#-123456789]
Hello Akka!
Greeting 1 for Actor[akka://hello/user/greeter#-123456789]
Hello Akka!
Greeting 2 for Actor[akka://hello/user/greeter#-123456789]
Hello Akka!
Greeting 3 for Actor[akka://hello/user/greeter#-123456789]
-
消息触发顺序:
-
HelloWorldMain
收到SayHello
消息后,创建HelloWorldBot
并向HelloWorld
发送Greet
。 -
HelloWorld
处理Greet
后,回复Greeted
给HelloWorldBot
,触发循环。
-
-
循环逻辑:
-
HelloWorldBot
每收到一次Greeted
消息,计数器加 1,并重新发送Greet
给HelloWorld
,直到计数器满 3 次后停止。
-
-
系统层级:
-
Actor 之间通过父子关系形成树形结构,确保消息传递和监管的隔离性。
-
文章标题:Akka-Actor模型-解决高并发的终极方案-入门篇
文章链接:https://zealsinger.xyz/?post=15
本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明来自ZealSinger !
如果觉得文章对您有用,请随意打赏。
您的支持是我们继续创作的动力!

微信扫一扫

支付宝扫一扫