转载

Akka in action: actor model

引子

最近在学习 Akka ,因为Scala的新版本已将Actors迁移到Akka [ 1 ](教材就是那本烂到一定程度的  Akka in action ,排版字体代码示例一如既往的Manning式的糟糕)——至于为什么要学习Actors,大概是因为  红宝书 里提出了一些对异步编程组合子的设计要求。

Akka

官方定义:

Akka is a toolkit and runtime for building highly concurrent, distributed, and resilient message-driven applications on the JVM.

做为一个消息驱动的响庆式工具集,Akka具有如下特性:

  • 高性能:单结点吞吐可达 5千万条消息/秒;1G堆内存可容纳250万个Actor
  • 简单:通过Actors/Future/EventStream一系列高度抽象,向开发者提供了简单的并发/分布式编程接口
  • 系统富于弹性:Akka对于错误的处理哲学是“Let it crash”,Actor生灭并不影响整体
  • 集群富于弹性:基于Gossip消息协议,可去中心化运转,CAP方面强调的是最终一致性
  • 易于扩展:可与play、slick、spray、Camel、spark 等各路框架无缝集成

同时,借助 Actor model,Akka实现了消息驱动,并为上层屏蔽了线程相关细节,提供了简洁的编程接口。

Actor model

Actor model 最早是由Carl Hewitt在1973定义,由Erlang OTP (Open Telecom Platform)  发扬光大,其消息传递更加符合面向对象的原始意图。Actor model属于并发计算模型 ,通过对Actors并发原语的定义和使用,来避免使用者直接接触多线程并发或线程池等基础概念。

传统的并发模型是基于多线程之间的共享内存,使用同步(锁)方法防止写争夺;而Actor使用的是消息模型,每个Actor在同一时间处理最多一个消息,可以发送消息给其他Actors,保证了 单写原则 ,从而避免了多线程写争夺。

关于这个Actors, InfoQ 上有一个很生动的解释:

你可以将Actors当作是一群人,他们互相之间不会面对面地交流,而只是通过邮件的方式进行沟通。

Akka in action: actor model

Actor URI

Actors之间进行通信时,它们通过Actor URI 来进行定位,格式如下:

Akka in action: actor model

akka.tcp://backend@0.0.0.0:2551/user/tooSimple 就是一个合法的Actors URI,它代表的意思是:这个Actor 运行在backend系统,走的是akka.tcp协议管道,本机2551端口,路径为根结点下的tooSimple结点

消息接收决策

Actors在收到消息时,它可以做出以下决策(可多选):

  • 发送消息
  • 对发送方进行响应
  • 进行其他核心操作

这个决策模型,在代码中看起来是这样的(根据消息的不同,会有不同的决策):

def receive = {
    case Withdraw(amount) => {
      if(this.balance > amount) {
        this.balance -= this.balance
      }
    }
    case Deposit(amount) => {
      this.balance += amount
    }
}

Actors 核心操作

  1. CREATE: 一个Actor可以创建另一个Actor,由于Actors是分层的结构,被创建的Actor将成为创建者的子节点
  2. SEND: 发送消息,形如 actor ! message
  3. BECOME:Actor在运行时的行为可以动态的改变(即指定下一次接收消息时,Actor的消息接收决策)
  4. SUPERVISE:  Actor可以监督他的子节点,并决定子结点出错时的处理策略

Actors 的生命周期

一个Actor的生命周期由如下事件组成(重启与否,由Actor的监督者/父结点来决定:

Akka in action: actor model

来看一个演示Actors生命周期的代码片断(LifeCycleHook是用于输出生命周期事件的Actor,TestActor是与它交互的Actor):

import akka.actor.{ActorSystem, Props, ActorLogging, Actor}
 
class LifeCycleHook extends Actor
  with ActorLogging {
  println("Constructor")
 
  override def preStart() {
    println("preStart")
  }
 
  override def preRestart(reason: Throwable,
                          message: Option[Any]): Unit = {
    println("preRestart")
    super.preRestart(reason, message)
  }
 
  override def postRestart(reason: Throwable): Unit = {
    println("postRestart")
    super.postRestart(reason)
  }
 
  override def postStop() {
    println("postStop")
  }
 
  def receive = {
    case "restart" => throw new IllegalStateException("force restart")
    case msg: AnyRef => println("Receive")
  }
}
 
class TestActor extends Actor {
  def receive = {
    case _ => println("Received")
  }
}
 
object app extends App {
  implicit val system = ActorSystem("my-system")
 
  val testActor = system.actorOf(Props[TestActor], "Sender")
  val testActorRef = system.actorOf(Props[LifeCycleHook], "LifeCycleHook")
  testActorRef ! "restart"
  Thread.sleep(2000)
 
  testActorRef.tell("msg", testActor)
  system.stop(testActorRef)
  Thread.sleep(1000)
  system.stop(testActor)
 
  system.shutdown()
}

它的输出:

Akka in action: actor model

小结

Actor model里只有消息,只关注和传递消息,没有共享数据结构,Locking-free,这也是Akka能提供高性能计算模型的原因

PS: Akka也有.NET平台的实现 => { http://getakka.net/ } 

原文  http://www.moye.me/2016/08/14/akka-in-action_actor-model/
正文到此结束
Loading...