转载

【Akka】Akka入门编程实例

引言

这篇文章主要是第一次学习Akka编程,先试试水,探探坑,对Akka和SBT的使用有一个直观的了解,以几个简单的akka编程实例来说明akka的使用。希望在日后的学习和编程中,能有更多自己的体会和经验总结来分享。

Actor模型

Actor实例可以想象成是服务器上的Web服务,你无法控制,只能通过发送消息去请求执行任务或查询信息,而不能直接在Web服务中修改状态或者处理资源。通过发送不可改变的消息,虽然看上去有些限制,但是可以很简单安全的编写并发程序。

Actor系统的形象理解

一个actor是基于Actor系统的最小单元,就像面向对象系统中的对象实例一样,它也封装了状态和行为。我们无法窥探actor内部的信息,只能通过发送消息来请求状态信息(就像是问一个人,他感觉如何)。actor中有一个存放不可变状态信息的信箱。我们通过发送信息和actor进行通信,当actor收到信息之后,它会运用相关算法来处理具体的信息。

在一个应用程序中,多个actor构成了一套层级系统,像是一个家族或者一个商业组织。一个actor可以认为是一个商业组织的个人。一个actor有一个父亲,称为监督者(supervisor),还有好多孩子,可以认为,在一个商业组织中,主席(actor)下面有多个副主席,副主席也有很多下属随从。

Actor系统的最佳实践是“ 委派任务 ”,尤其是当actor的行为被阻塞的时候。可以想象,在实际商业活动中,主席将要做的工作分配给下面的几个副主席去分别执行,而副主席也会将子任务分配给自己的随从,直到该任务被下属们执行完毕。

处理故障

Actor模型的一个重要内容是处理故障。在工作工程中,如果出现错误或者抛出异常,actor和其子actor都将暂停,然后发送一条信息给监督者(supervisor)actor,报告出现故障的信号。根据工作任务和故障的性质,监督者actor将会作出几种选择:

  • 恢复下属actor,保留内部状态
  • 重启下属actor,清空状态
  • 终止下属actor
  • 上报故障

Hello,Actor实例

现在我用一个最简单的actor编程实例来介绍akka编程,先给出代码:

importakka.actor.Actor importakka.actor.ActorSystem importakka.actor.Props  classHelloActorextendsActor{ defreceive = { case"hello"=> println("hello back to you.") case_ => println("huh?")  } }  objectTest1_HelloActorextendsApp{ // actor need an ActorSystem valsystem = ActorSystem("HelloSystem") // create and start the actor valhelloActor = system.actorOf(Props[HelloActor], name="helloActor") // send two messages  helloActor ! "hello"  helloActor ! "what" // shutdown the actor system  system.shutdown } 

代码注解:

  • Actor由HelloActor定义
  • HelloActor的行为有receive方法定义实现,其中使用了模式匹配表达式
  • HelloActor接收字符串 hello 作为消息,做出相应打印动作
  • Test1_HelloActor的object用来测试actor
  • ActorSystem接收一个name参数,并且通过 system.actorOf 创建actor实例
  • 创建Actor实例名为helloActor,其构造函数没有参数
  • Actor创建后自动运行,不需调用start或者run方法
  • 通过 ! 方法来发送消息

ActorSystem

一个actor system是actors的层级集团,分享公共配置信息(比如分发器dispatchers,部署deployments,远程功能remote capabilities,地址addresses)。它同时也是创建和查询actors的入口。ActorSystem是为你的应用程序分配线程资源的结构。

ActorRef

当你调用 ActorSystemactorOf 方法时,将创建并返回一个 ActorRef 的实例:

def actorOf(props: Props, name: String): ActorRef

这个引用用来处理actor,你可以将其看做是处理实际actor的代理人(broker)或包装外观(facade)。ActorRef防止你破坏Actor模型,比如直接处理Actor实例,或直接修改Actor实例中的变量。所以只能通过给actor发送消息方式来执行任务,这种“袖手旁观(不干涉,hands-off)”的方法帮助巩固适宜的编程实践。

ActorRef有以下特点:

  • 它是不可变的
  • 它与actor实体是一对一的关系
  • 它是可序列化的,网络可感知的。这使得你可以在网络环境中传送一个ActorRef

Actor之间的通信实例

下面给出的是两个actor实例相互发送消息进行通信的PingPong示例:

importakka.actor._  caseobjectPingMessage caseobjectPongMessage caseobjectStartMessage caseobjectStopMessage  classPing(pong: ActorRef)extendsActor{ varcount =0 defincrementAndPrint {count +=1; println(s"$count:ping")} defreceive = { caseStartMessage =>  incrementAndPrint  pong ! PongMessage casePingMessage =>  incrementAndPrint if(count >99) {  sender ! StopMessage  println("ping stopped")  context.stop(self)  } else  sender ! PongMessage case_ => println("Ping got unexpected information")  } }  classPongextendsActor{ varcount =0 defreceive = { caseStopMessage =>  println("pong stopped")  context.stop(self) casePongMessage =>  count += 1  println(s"$count:pong")  sender ! PingMessage case_ => println("Pong got unexpected information")  } }  objectPingPangTestextendsApp{ valsystem = ActorSystem("PingPongTest") valpongActor = system.actorOf(Props[Pong], name="pong") valpingActor = system.actorOf(Props(newPing(pongActor)),  name = "ping")  pingActor ! StartMessage } 

代码注释:

  • 创建 ActorSystem 之后;
  • 创建 Pong 的actor实例(pongActor对象其实是 ActorRef 的实例);
  • 之后创建 Ping 的actor实例,其构造函数接受 ActorRef 参数;
  • 通过给 pingActor 发送一个 StartMessage 消息来启动pingActor和pongActor的具体动作;
  • Ping Actor和 Pong Actor通过PingMessage和PongMessage相互发送消息, sender 用来引用消息发送源Actor;
  • Ping 通过计数,知道进行了100次消息的发送之后,发送StopMessage来终止actor。分别调用自己的 context.stop 方法来结束

启动Actor

在ActorSystem层面,通过调用 system.actorOf 方法来创建actors;在actor内部,通过调用 context.actorOf 方法来创建子actor。

下面给出一个ParentChild示例:

importakka.actor._  caseclassCreateChild(name: String) caseclassName(name: String)  classChildextendsActor{ varname ="No name" overridedefpostStop: Unit = {  println(s"D'oh! They killed me ($name): ${self.path}")  } defreceive = { caseName(name) =>this.name = name case_ => println(s"Child $name got message.")  } }  classParentextendsActor{ defreceive = { caseCreateChild(name) => // Parent creates a new Child here  println(s"Parent about to create Child ($name) ...") valchild = context.actorOf(Props[Child], name=s"$name")  child ! Name(name) case_ => println(s"Parent got some other message.")  } }  objectParentChildDemoextendsApp{ valactorSystem = ActorSystem("ParentChildTest") valparent = actorSystem.actorOf(Props[Parent], name="Parent")  // send messages to Parent to create to child actors  parent ! CreateChild("XiaoMing")  parent ! CreateChild("XiaoLiang")  Thread.sleep(500)  // lookup XiaoMing, the kill it  println("Sending XiaoMing a PoisonPill ... ") valxiaoming = actorSystem.actorSelection("/user/Parent/XiaoMing")  xiaoming ! PoisonPill  println("XiaoMing was killed")   Thread.sleep(5000)  actorSystem.shutdown } 

打印结果:

Parent about to create Child (XiaoMing) ... Parent about to create Child (XiaoLiang) ... Sending XiaoMing a PoisonPill ... XiaoMing was killed D'oh! They killed me (XiaoMing): akka://ParentChildTest/user/Parent/XiaoMing D'oh! They killed me (XiaoLiang): akka://ParentChildTest/user/Parent/XiaoLiang 

终止Actor

在ActorSystem层面,通过 system.stop(actorRef) 来终止一个actor;在actor内部,使用 context.stop(actorRef) 来结束一个actor。

如果当前有正在处理的消息,对该消息的处理将在actor被终止之前完成,但是邮箱中的后续消息将不会被处理。缺省情况下这些消息会被送到 ActorSystem的 dead letter mailbox , 但是这取决于邮箱的实现。

actor终止的相关处理

actor的终止分两步: 第一步actor将停止对邮箱的处理,向所有子actor发送终止命令,然后处理来自子actor的终止消息直到所有的子actor都完成终止, 最后终止自己(调用postStop,销毁邮箱,向DeathWatch发布Terminated,通知其监管者)。这个过程保证actor系统中的子树以一种有序的方式终止,将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个actor没有响应(i.e.由于处理消息用了太长时间以至于没有收到终止命令), 整个过程将会被阻塞。

在 ActorSystem.shutdown被调用时, 系统根监管actor会被终止,以上的过程将保证整个系统的正确终止。

postStop() hook是在actor被完全终止以后调用的。这是为了清理资源:

overridedefpostStop() = { // 关闭文件或数据库连接 } 

PoisonPill和gracefulStop

还有其他两种方式,发送 PoisonPill 消息或者使用 gracefulStop 终止。

你也可以向actor发送 akka.actor.PoisonPill 消息,这个消息处理完成后actor会被终止。PoisonPill与普通消息一样被放进队列,因此会在已经入队列的其它消息之后被执行。

如果你想等待终止过程的结束,或者组合若干actor的终止次序,可以使用gracefulStop。下面给出gracefulStop的代码示例:

importakka.actor._ importakka.pattern.gracefulStop importscala.concurrent.{Await, ExecutionContext, Future} importscala.concurrent.duration._ importscala.language.postfixOps  caseobjectTestActorStop  classTestActorextendsActor{ defreceive = { caseTestActorStop =>  context.stop(self) case_ => println("TestActor got message")  } overridedefpostStop {println("TestActor: postStop")} }  objectGracefulStopTestextendsApp{ valsystem = ActorSystem("GracefulStopTest") valtestActor = system.actorOf(Props[TestActor], name="TestActor") // try to stop the actor graceful try{ valstopped: Future[Boolean] = gracefulStop(testActor,2seconds, TestActorStop)  Await.result(stopped, 3seconds)  println("testActor was stopped")  } catch{ casee: akka.pattern.AskTimeoutException => e.printStackTrace  } finally{  system.shutdown  } } 

gracefulStop(actorRef, timeout) 将返回一个Future实例,当目标actor有处理相关终止动作的消息时,会执行成功。

上面示例中,通过发送TestActorStop消息来终止actor;如果没有处理终止的工作,当超过2s后,Future抛出 akka.pattern.AskTimeoutException 异常。默认情况下,gracefulStop将发送PoisonPill消息。

Kill消息

当深入Akka actors,我们将认识监督者策略(supervisor strategies)概念。当实现了监督者策略,向actor发送一个 Kill 消息,这可以用来重新启动actor。如果使用默认的监督者策略,Kill消息将终止目标actor。

下面是示例代码:

importakka.actor._  classNumber5extendsActor{ defreceive = { case_ => println("Number 5 got a message")  } overridedefpreStart { println("Number 5 is alive")} overridedefpostStop { println("Number 5::postStop called")} overridedefpreRestart(reason: Throwable, message: Option[Any]): Unit = {  println("Number 5::preRestart called")  } overridedefpostRestart(reason: Throwable): Unit = {  println("Number 5::postRestart called")  } }  objectKillTestextendsApp{ valsystem = ActorSystem("KillTestSystem") valnumber5 = system.actorOf(Props[Number5], name="Number5")  number5 ! "hello"  number5 ! Kill  system.shutdown } 

打印的信息:

Number5isalive Number5gotamessage [ERROR][01/17/2016 19:20:09.342][KillTestSystem-akka.actor.default-dispatcher-3][akka://KillTestSystem/user/Number5]Kill(akka.actor.ActorKilledException) Number5::postStopcalled 

转载请注明作者Jason Ding及其出处

Github博客主页(http://jasonding1354.github.io/)

GitCafe博客主页(http://jasonding1354.gitcafe.io/)

CSDN博客(http://blog.csdn.net/jasonding1354)

简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)

Google搜索jasonding1354进入我的博客主页

原文  http://jasonding1354.github.io/2016/01/15/Scala/【Akka】Akka入门编程实例/
正文到此结束
Loading...