最近在学习 Akka ,因为Scala的新版本已将Actors迁移到Akka [ 1 ](教材就是那本烂到一定程度的 Akka in action ,排版字体代码示例一如既往的Manning式的糟糕)——至于为什么要学习Actors,大概是因为 红宝书 里提出了一些对异步编程组合子的设计要求。
官方定义:
Akka is a toolkit and runtime for building highly concurrent, distributed, and resilient message-driven applications on the JVM.
做为一个消息驱动的响庆式工具集,Akka具有如下特性:
同时,借助 Actor model,Akka实现了消息驱动,并为上层屏蔽了线程相关细节,提供了简洁的编程接口。
Actor model 最早是由Carl Hewitt在1973定义,由Erlang OTP (Open Telecom Platform) 发扬光大,其消息传递更加符合面向对象的原始意图。Actor model属于并发计算模型 ,通过对Actors并发原语的定义和使用,来避免使用者直接接触多线程并发或线程池等基础概念。
传统的并发模型是基于多线程之间的共享内存,使用同步(锁)方法防止写争夺;而Actor使用的是消息模型,每个Actor在同一时间处理最多一个消息,可以发送消息给其他Actors,保证了 单写原则 ,从而避免了多线程写争夺。
关于这个Actors, InfoQ 上有一个很生动的解释:
你可以将Actors当作是一群人,他们互相之间不会面对面地交流,而只是通过邮件的方式进行沟通。
Actors之间进行通信时,它们通过Actor URI 来进行定位,格式如下:
如 akka.tcp://backend@0.0.0.0:2551/user/tooSimple 就是一个合法的Actors URI,它代表的意思是:这个Actor 运行在backend系统,走的是akka.tcp协议管道,本机2551端口,路径为根结点下的tooSimple结点
Actors在收到消息时,它可以做出以下决策(可多选):
这个决策模型,在代码中看起来是这样的(根据消息的不同,会有不同的决策):
def receive = {
case Withdraw(amount) => {
if(this.balance > amount) {
this.balance -= this.balance
}
}
case Deposit(amount) => {
this.balance += amount
}
}
actor ! message 一个Actor的生命周期由如下事件组成(重启与否,由Actor的监督者/父结点来决定:
来看一个演示Actors生命周期的代码片断(LifeCycleHook是用于输出生命周期事件的Actor,TestActor是与它交互的Actor):
import akka.actor.{ActorSystem, Props, ActorLogging, Actor}
class LifeCycleHook extends Actor
with ActorLogging {
println("Constructor")
override def preStart() {
println("preStart")
}
override def preRestart(reason: Throwable,
message: Option[Any]): Unit = {
println("preRestart")
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable): Unit = {
println("postRestart")
super.postRestart(reason)
}
override def postStop() {
println("postStop")
}
def receive = {
case "restart" => throw new IllegalStateException("force restart")
case msg: AnyRef => println("Receive")
}
}
class TestActor extends Actor {
def receive = {
case _ => println("Received")
}
}
object app extends App {
implicit val system = ActorSystem("my-system")
val testActor = system.actorOf(Props[TestActor], "Sender")
val testActorRef = system.actorOf(Props[LifeCycleHook], "LifeCycleHook")
testActorRef ! "restart"
Thread.sleep(2000)
testActorRef.tell("msg", testActor)
system.stop(testActorRef)
Thread.sleep(1000)
system.stop(testActor)
system.shutdown()
}
它的输出:
Actor model里只有消息,只关注和传递消息,没有共享数据结构,Locking-free,这也是Akka能提供高性能计算模型的原因
PS: Akka也有.NET平台的实现 => { http://getakka.net/ }