On pp 590-593 of Programming in Scala this is discussed in more detail: basically the react method never returns normally (it terminates with an exception) and therefore its call stack does not need to be preserved. You can think of it as looping forever.
在Scala编程的第590-593页中,将更详细地讨论这一点:基本上,react方法永远不会正常返回(它以异常终止),因此不需要保留其调用堆栈。你可以把它想象成永远循环。
https://www.brianstorti.com/the-actor-model/
https://www.jianshu.com/p/449850aa8e82
CPU的工艺制程和发热稳定性之间难以取舍,取而代之的策略则是增加核心数量,目前家用电脑四核已经非常常见,服务器更是达到了32核64线程。为了有效地利用多核CPU,我们在代码层面就应该考虑到并发性。十几年的痛苦开发经历告诉我们,threads并不是获取并发性的好方法,往往会带来难以查找的bug,但是不用害怕,今天我们有很多其他方法来获得易用的并发性,比如我们接下来介绍的Actor模型。
模型 Model
Actor模型是一个概念模型,用于处理并发计算。它定义了一系列系统组件应该如何动作和交互的通用规则,最著名的使用这套规则的编程语言是Erlang。这篇文章更关注模型本身而不是它在不同语言的实现。
Actors
一个Actor指的是一个最基本的计算单元。它能接收一个消息并且基于其执行计算。
这个理念很像面向对象语言,一个对象接收一条消息(方法调用),然后根据接收的消息做事(调用了哪个方法)。
Actors一大重要特征在于actors之间相互隔离,它们并不互相共享内存。这点区别于上述的对象。也就是说,一个actor能维持一个私有的状态,并且这个状态不可能被另一个actor所改变。
聚沙成塔
One ant is no ant, one actor is no actor.
光有一个actor是不够的,多个actors才能组成系统。在actor模型里每个actor都有地址,所以它们才能够相互发送消息。
Actors有邮箱
只得指明的一点是,尽管许多actors同时运行,但是一个actor只能顺序地处理消息。也就是说其它actors发送了三条消息给一个actor,这个actor只能一次处理一条。所以如果你要并行处理3条消息,你需要把这条消息发给3个actors。
消息异步地传送到actor,所以当actor正在处理消息时,新来的消息应该存储到别的地方。Mailbox就是这些消息存储的地方。
Actors通过异步消息沟通,在处理消息之前消息被存放在Mailbox中
Actors做什么
当一个actor接收到消息后,它能做如下三件事中的一件:
Create more actors; 创建其他actors
Send messages to other actors; 向其他actors发送消息
Designates what to do with the next message. 指定下一条消息到来的行为
前两件事比较直观,第三件却很有意思。
我之前说过一个actor能维持一个私有状态。「指定下一条消息来到做什么」意味着可以定义下条消息来到时的状态。更清楚地说,就是actors如何修改状态。
设想有一个actor像计算器,它的初始状态是数字0。当这个actor接收到add(1)消息时,它并不改变它原本的状态,而是指定当它接收到下一个消息时,状态会变为1。
容错 Fault tolerance
Erlang 引入了「随它崩溃」的哲学理念,这部分关键代码被监控着,监控者的唯一职责是知道代码崩溃后干什么(如将这个单元代码重置为正常状态),让这种理念成为可能的正是actor模型。
每段代码都运行在process中,process是erlang称呼actor的方式。这个process完全独立,意味着它的状态不会影响其他process。我们有个supervisor,实际上它只是另一个process(所有东西都是actor),当被监控的process挂了,supervisor这个process会被通知并对此进行处理。这就让我们能创建「自愈」系统了。如果一个actor到达异常状态并崩溃,无论如何,supervisor都可以做出反应并尝试把它变成一致状态,这里有很多策略,最常见的是根据初始状态重启actor。
分布式 Distribution
另一个关于actor模型的有趣方面是它并不在意消息发送到的actor是本地的或者是另外节点上的。
转念一想,如果actor只是一些代码,包含了一个mailbox和一个内部状态,actor只对消息做出响应,谁会关注它运行在哪个机器上呢?只要我们能让消息到达就行了。这允许我们基于许多计算机上构建系统,并且恢复其中任何一台。
进一步了解
这里是一个快速的概念模型回顾,其中的概念被运用到许多知名语言和库中,比如Erlang和Elixir, Akka (for the JVM) 和 Celluloid (for Ruby)。
如果你想更深入了解actor及其背后的原理,你可以进一步阅读下面书籍和文章:
Seven Concurrency Models in Seven Weeks: When Threads Unravel
Programming Elixir
Elixir in Action
Actor模型
为什么Actor模型是高并发事务的终极解决方案?
这个视频比较完整地讨论了Actor的概念模型。
The Actor Model (everything you wanted to know)
Scala或Erlang的进程信箱都是一种Actor模型,也有Java的专门的Actor模型,这里是几种Actor模型比较明白了Actor模型原理,使用Disruptor这样无锁队列也可以自己实现Actor模型,让一个普通对象与外界的交互调用通过Disruptor消息队列实现,比如LMAX架构就是这样实现高频交易,从2009年成功运行至今,被Martin Fowler推崇。
Actor模型作为Akka中最核心的概念,所以Actor在Akka中的组织结构也至关重要,本文主要介绍Akka中Actor系统。
Actor系统
Actor作为一种封装状态和行为的对象,总是需要一个系统去统一的组织和管理它们,在Akka中即为ActorSystem,其实这非常容易理解,好比一个公司,每个员工都可以看成一个Actor,它们有自己的职位和职责,但是我们需要把员工集合起来,统一进行管理和分配任务,所以我们需要一个相应的系统进行管理,好比这里的ActorSystem对Actor进行管理一样。
ActorSystem的主要功能
ActorSystem主要有以下三个功能:
管理调度服务
配置相关参数
日志功能
1.管理调度服务
ActorSystem的的精髓在于将任务分拆,直到一个任务小到可以被完整处理,然后将其委托给Actor进行处理,所以ActorSystem最核心的一个功能就是管理和调度整个系统的运行,好比一个公司的管理者,他需要制定整个公司的发展计划,还需要将工作分配给相应的工作人员去完成,保障整个公司的正确运转,其实这里也体现了软件设计中的分而治之,Actor中的核心思想也是这样。
ActorSystem模型例子:
ActorSystem模型例子
上图是一个简单的开发协作的过程,我觉得这个例子应该可以清晰的表达Akka中Actor的组织结构,当然不仅于此。主要有以下几个特点:
Akka中Actor的组织是一种树形结构
每个Actor都有父级,有可能有子级当然也可能没有
父级Actor给其子级Actor分配资源,任务,并管理其的生命状态(监管和监控)
Actor系统往往有成千上万个Actor,使用树形机构来组织管理Actor是非常适合的。
而且Akka天生就是分布式,你可以向一个远程的Actor发送消息,但你需要知道这个Actor的具体位置在哪,这时候你就会发现,树形结构对于确定一个Actor的路径来说是非常有利(比如Linux的文件存储),所以我觉得Actor用树形结构组织可以说是再完美不过了。
2.根据配置创建环境
一个完善的ActorSystem必须有相关的配置信息,比如使用的日志管理,不同环境打印的日志级别,拦截器,邮箱等等,Akka使用Typesafe配置库,这是一个非常强大的配置库,后续我也准备写一篇后续文章,大家尽请期待哈。
下面用一个简单的例子来说明一下ActorSystem会根据配置文件内容去生成相应的Actor系统环境:
1.首先我们按照默认配置打印一下系统的日志级别,搭建Akka环境请看我上一篇文章:Akka系列(一):Akka简介与Actor模型
val actorSystem = ActorSystem(“robot-system”)
println(s”the ActorSystem logLevel is ${actorSystem.settings.LogLevel}”)复制代码
运行结果:
the ActorSystem logLevel is INFO复制代码
可以看出ActorSystem默认的日志输出级别是INFO。
2.现在我们在application.conf里配置日志的输出级别:
akka {
loglevel = “DEBUG”
}复制代码
运行结果:
[DEBUG] [03/26/2017 12:07:12.434] [main] [EventStream(akka://robot-system)] logger log1-Logging$DefaultLogger started
[DEBUG] [03/26/2017 12:07:12.436] [main] [EventStream(akka://robot-system)] Default Loggers started
the ActorSystem logLevel is DEBUG复制代码
可以发现我们ActorSystem的日志输出级别已经变成了DEBUG。
这里主要是演示ActorSystem可以根据配置文件的内容去加载相应的环境,并应用到整个ActorSystem中,这对于我们配置ActorSystem环境来说是非常方便的。
3.日志功能
有很多人可能会疑惑,日志不应该只是记录程序运行状态和排除错误的嘛,怎么在Akka中会变得至关重要,Akka拥有高容错机制,这无疑需要完善的日志记录才能使Actor出错后能及时做出相应的恢复策略,比如Akka中的持久化,具体相应的一些作用我可能会在后续写相应章节的时候提到。
Actor引用,路径和地址
有了上面的知识,这里了解Actor引用,路径和地址就容易多了。
什么时Actor引用?
Actor引用是ActorRef的子类,每个Actor有唯一的ActorRef,Actor引用可以看成是Actor的代理,与Actor打交道都需要通过Actor引用,Actor引用可以帮对应Actor发送消息,也可以接收消息,向Actor发送消息其实是将消息发送到Actor对应的引用上,再由它将消息投寄到具体Actor的信箱中,所以ActorRef在整个Actor系统是一个非常重要的角色。
如何获得Actor引用?
直接创建Actor
查找已经存在的Actor
1.获得ActorRef
看我上一篇文章的同学对这种方式获得Actor引用应该是比较了解,这里我会具体演示一下获得ActorRef的几种方式:
假定现在由这么一个场景:老板嗅到了市场上的一个商机,准备开启一个新项目,他将要求传达给了经理,经理根据相应的需求,来安排适合的的员工进行工作。
这个例子很简单,现在我们来模拟一下这个场景:
1.首先我们来创建一些消息:
trait Message {
val content: String
}
case class Business(content: String) extends Message {}
case class Meeting(content: String) extends Message {}
case class Confirm(content: String, actorPath: ActorPath) extends Message {}
case class DoAction(content: String) extends Message {}
case class Done(content: String) extends Message {}复制代码
2.我们来创建一家公司,这里就是ActorSystem的化身:
val actorSystem = ActorSystem(“company-system”) //首先我们创建一家公司
//创建Actor得到ActorRef的一种方式,利用ActorSystem.actorOf
val bossActor = actorSystem.actorOf(Props[BossActor], “boss”) //公司有一个Boss
bossActor ! Business(“Fitness industry has great prospects”) //从市场上观察到健身行业将会有很大的前景复制代码
3.这里我们会创建几种角色,比如上面Boss,这里我们还有Manager,Worker,让我们来看看吧:
class BossActor extends Actor {
val log = Logging(context.system, this)
implicit val askTimeout = Timeout(5 seconds)
import context.dispatcher
var taskCount = 0
def receive: Receive = {
case b: Business =>
log.info(“I must to do some thing,go,go,go!”)
println(self.path.address)
//创建Actor得到ActorRef的另一种方式,利用ActorContext.actorOf
val managerActors = (1 to 3).map(i =>
context.actorOf(Props[ManagerActor], s”manager${i}”)) //这里我们召唤3个主管
//告诉他们开会商量大计划
managerActors foreach {
_ ? Meeting(“Meeting to discuss big plans”) map {
case c: Confirm =>
//为什么这里可以知道父级Actor的信息?
//熟悉树结构的同学应该知道每个节点有且只有一个父节点(根节点除外)
log.info(c.actorPath.parent.toString)
//根据Actor路径查找已经存在的Actor获得ActorRef
//这里c.actorPath是绝对路径,你也可以根据相对路径得到相应的ActorRef
val manager = context.actorSelection(c.actorPath)
manager ! DoAction(“Do thing”)
}
}
case d: Done => {
taskCount += 1
if (taskCount == 3) {
log.info(“the project is done, we will earn much money”)
context.system.terminate()
}
}
}
}
class ManagerActor extends Actor {
val log = Logging(context.system, this)
def receive: Receive = {
case m: Meeting =>
sender() ! Confirm(“I have receive command”, self.path)
case d: DoAction =>
val workerActor = context.actorOf(Props[WorkerActor], “worker”)
workerActor forward d
}
}
class WorkerActor extends Actor {
val log = Logging(context.system, this)
def receive: Receive = {
case d: DoAction =>
log.info(“I have receive task”)
sender() ! Done(“I hava done work”)
}
}复制代码
光看这段代码可能不那么容易理解,这里我会画一个流程图帮助你理解这段程序:
程序流程图:
程序流程图
看了上面的流程图对程序应该有所了解了,过多的解释我这里就不讲解了,可以看注释,或者下载源代码自己去跑一跑。源码链接
这里主要是有两个知识点:
创建Actor获得ActorRef的两种方式
根据Actor路径获得ActorRef
前一个知识点应该比较清晰了,具体来说说第二个。
2.Actor路径与地址
熟悉类Unix系统的同学应该对路径这个概念很熟悉了。ActorSystem中的路径也很类似,每个ActorSystem都有一个根守护者,用/表示,在根守护者下有一个名user的Actor,它是所有system.actorOf()创建的父Actor,所以我们程序中bossActor的路径为:
/user/boss
地址顾名思义是Actor所在的位置,为什么要有地址这一个概念,这就是Akka强大的理念了,Akka中所有的东西都是被设计为在分布式环境下工作的,所以我们可以向任意位置的Actor发送消息(前提你得知道它在哪),这时候地址的作用就显现出来来,首先我们可以根据地址找到Actor在什么位置,再根据路径找到具体的Actor,比如我们示例程序中bossActor,它的完整位置是
akka://company-system/user/boss
可以发现它的地址是
akka://company-system
其中akka代表纯本地的,Akka中默认远程Actor的位置一般用akka.tcp或者akka.udp开头,当然你也可以使用第三方插件,Akka的远程调用我也会专门写一篇文章。
如果你期待更多的actor,或者程序中actor的数量随着输入的增加而增加,那么定义每个actor对应一个线程的工作方式将会带来巨大的开销:不仅每个JVM线程的执行堆栈需要内存——这部分堆栈内存通常是预分配的——每条JVM线程都还与底层操作系统的进程对应。对于不同的平台,进程间上下文的切换(CPU对进程的切换),cpu在内核模式和用户模式切换,这些都是昂贵的开销。
为了允许在JVM中有许多actor,你可以使得你的actor是基于事件的。基于事件的actor可以被实现成事件处理器(event handlers),并非线程,因此更轻量级,而不象线程的兄弟般一样重量级。既然基于消息的actor将不直接绑定到Java线程上,那么基于事件的actor就可以在一个拥有较少数量工作线程的线程池上工作。典型的,这样一个线程池应该包含和系统处理器数量一样多的工作线程。这样做可以最大化系统的并行性能,使得线程池中线程占用的内存数、系统进程间上下文切换这些开销达到最小
5.1 Events vs. threads (事件和线程的对比)
基于事件的actor对程序员来讲并非是完全透明的。这是因为基于事件的编程与基于线程的编程遵循着不同的规范。典型的actor就是花费很久的时间等待事件到来(然后处理…),而基于事件的actor和基于线程的actor的关键区别就是:基于事件的actor具有等待的策略。
基于线程的actor通过对一个对象调用wait()方法使得这个actor对应的线程持有锁并开始等待,当其他线程对相同的对象调用了notify()或者notifyAll()之后,持有锁的线程便恢复运行。(这是基本的原理,实际中的等待策略会稍微复杂些,因为线程在等待时可能被中断。)相反,基于事件的actor在actor运行时会注册一个事件处理器(event-handler)。注册后,actor的计算逻辑就完成了——之前运行这部分计算逻辑的线程响应的也就完成了任务,并且可以被调度出去运行其他任务了,如果没有其他事情可做的话该线程便进入睡眠状态。之后,当一个感兴趣的事件被触发时——即一个发送给该actor的并且被匹配上的事件到达时,actor的运行时调度器便调度该actor的事件处理器(event-handler)在线程池上执行,此时之前注册的事件处理器就会恢复运行,并处理该事件。在这种工作方式下,基于事件的actor就与底层JVM线程解除耦合了。
5.2 Making actors event-based: react (使得actor以基于事件的方式工作:react)
由于基于事件的actor和基于线程的actor在他们的等待策略上有很大的不同,把直截了当的把基于线程的actor转换成基于事件的actor。到目前为止我们看到的基于线程的actor都使用receive方法来等待一条到达该actor的邮箱的并且符合匹配规则的消息,要把它转换成基于事件的actor,只要把所有调用receive方法的地方都使用react方法替换。receive和react方法都接收一堆消息匹配的语句块作为输入参数,这些语句块负责处理匹配上的消息。
虽然使用react代替receive仅仅是简单的代码改变,但是在程序中这两者的使用非常不同,接下来的例子将揭示这些区别。
Using react to wait for messages(使用react等待消息)
下面的代码展示了一个方法构建了一个actor链,并且返回第一个actor。在actor链中的每个actor都使用react来等待一个叫做 ‘Die的消息。当它收到了这样一个消息,actor会检查它是否是此actor链中最后一个actor(如果是最后一个actor则next==null),如果不是最后一个actor,则给actor链中的下一个actor发送’Die消息,然后等待’Ack消息,当’Ack消息到达时,在自己终止之前给自己发送’Die消息的发送者发送’Ack响应,之后自己终止。如果是最后一个actor,则直接发送’Ack消息给发送者。注意,我们把最原始的发送’Die消息的发送者保存在本地变量from中,以至于可以在下面嵌套的react中引用。
def buildChain(size: Int, next: Actor): Actor = {
val a = actor {
react {
case ‘Die =>
val from = sender
if(next != null) {
next ! ‘Die
react {
case ‘Ack => from ! ‘Ack
}
} else from ! ‘Ack
}
}
if(size > 0) buildChain(size - 1, a)
else a
}
我们把buildChain方法放进一个具有main函数的对象中,如下面的代码所示。我们把命令行中的第一个参数存储在numActors变量中,这个变量用来控制actor链的长度,仅仅为了好玩,我们标记了时间来看它花费多久来建立并且销毁一个单元素的actor链。在调用了buildChain之后,我们立即给链中的第一个actor发送了一条’Die消息。
def main(args: Array[String]) {
val numActors = args(0).toInt
val start = System.currentTimeInMillis
buildChain(numActors, null) ! ‘Die
receive {
case ‘Ack =>
val end = System.currentTimeInMillis
println(“Took “ + (end - start) + “ ms”)
}
}
当每个actor给链中的下一个actor发送’Die消息后等待’Ack时,将会发生什么呢?当’Ack消息收到后,它会向链中前一个actor传播’Ack,然后自己终止。链中第一个actor是最后一个收到’Ack消息的。当main方法中的receive操作收到’Ack消息开始处理时,链中的所有actor都终止了。
How many actors are too many?(多少actor才算多?)
使用react接收消息的actor比起通常JVM的线程相比轻量多了。下面我们就看看actor到底有多轻量级,我们将用尽所有的JVM内存创建一个actor链,然后我们通过替换把react替换成receive来和基于线程的actor链比较一下。
但是,首先能创建多少基于事件的actor?创建他们需要花费多长时间?在一个测试系统中,创建并销毁1000个actor花费了115ms,然而创建并销毁10000个actor花费了540ms。创建50万个actor花费了6323ms,但是创建100万个actor花费的时间稍微长些,在不增加JVM(Java HotSpot(TM) Server VM 1.6.0)的堆内存大小时,花费大概26秒。
下面我们尝试一下基于线程的actor。既然我们将要创建非常多的线程,我们应该配置actor的运行时环境以避免不合理的开销。
Configuring the actor run-time’s thread pool(配置actor的运行时线程池)
既然我们将用基于线程的actor创建很多线程,那么提前创建好这些线程然后再给actor使用将更高效。此外我们可以调整actor内部的运行时线程池来优化actor的执行。Scala的运行时环境允许根据在receive(每个receive块都需要自己的线程)中被阻塞的actor,动态改变线程池的大小,但是调整线程池大小会非常耗时,由于线程池没有为大量的调整线程池大小操作进行优化。
内部线程池通过两个JVM属性配置,分别是 actors.corePoolSize 和actors.maxPoolSize。第一个属性是用来设置线程池初始化时的大小,第二个属性是用来限制线程池线程数量的上限。
为了最小化调整线程池线程数量所花费的时间,我们把这两个属性都设置成程序实际需要的线程数。比如,当用1000个基于线程的actor运行我们的actor链的例子时,把 actors.corePoolSize 设置为1000,把 actors.maxPoolSize 设置成1010,这样设置使得调整线程池大小的开销保持较低的状态。
设置了这些属性后,花费了12秒创建并销毁1000个基于线程的actor。创建并销毁2000个基于线程的actor花费了超过97秒。创建并销毁3000个actor,JVM抛出了java.lang.OutOfMemoryError。
就像这个简单的例子所证明的,基于事件的actor比基于线程的actor轻量多了。下面的章节如何使用有效的使用基于事件的actor编程。
Using react effectively(有效的使用react)
正像我们上面提到的,使用react等待消息的actor是以基于事件的方式工作。在这种工作方式下,actor等待消息时并不阻塞底层的工作线程,而是将reactor的模式匹配语句块注册成事件处理器。这个事件处理器会在actor的运行时环境中当匹配到的消息到达此actor时被调用。在actor进入睡眠状态前,事件处理器一直被保留着,这就是事件处理器的全部。特别的,当actor运行时,调用堆栈被当前的线程维护,当actor暂停时,调用堆栈就被丢弃。这种工作方式允许运行时系统释放底层的线程,以便此线程能够被其他actor重用。通过在比较小数量的线程上运行大量的基于事件的actor,CPU上下文切换以及与线程绑定的actor所需的资源消耗都显著降低了。
当基于事件的actor被暂停时,当前线程的调用堆栈被丢弃,这种工作模式在基于事件actor的编程模型中具有重要的后果:那就是调用react方法将不会正常返回。react,就像其他的Scala或者Java方法一样,当它执行时仅仅当它的全部调用堆栈可用时才可以正常的返回。但是基于事件的actor调用完毕后没有调用堆栈可用,因此调用react方法根本就不返回。
react方法不再返回,这意味着不再有任何代码紧跟在react方法之后。既然react不返回,跟在react方法之后的代码将不会执行。因此调用react方法必须总是基于事件actor在结束之前做的最后一件事。
既然actor的主要工作就是处理它所感兴趣的消息,并且react定义了基于事件的actor的消息处理机制,你可能认为react将总是最后一件事情、甚至仅仅是最后一件actor要做的事情。然而,有时候很方便的接连执行多个react调用。在这些情况下,你可以顺序嵌套react调用,就像之前的buildChain代码中展示的一样。
作为另一种选择,你可以定义一个递归方法依次返回多个react。比如,你可以扩展我们简单的链actor的例子,让actor等到固定数量的‘Die消息后终止。我们可以通过通过把链actor的处理消息的代码替换为一个waitFor方法,如下代码所示。waitFor方法预先测试,决定该actor是应该终止退出(if n == 0)还是继续等待消息。程序逻辑还和之前一样。区别仅仅是在把每个消息发送给from之后,我们添加了一个递归调用waitFor。
def waitFor(n: Int): Unit = if(n > 0) {
react {
case ‘Die =>
val from = sender
if(next != null) {
next ! ‘Die
react {
case ‘Ack => from ! ‘Ack; waitFor(n - 1)
}
} else {from ! ‘Ack; waitFor(n - 1)}
}
}
Recursive methods with react(使用react的递归方法)
看到上面的代码,你可能关注以这种方式递用递归方法可能会很快导致堆栈溢出,不过好消息是react方法与递归配合的非常好。无论任何时候恢复调用react方法时,都是由于actor的邮箱中收到了匹配的消息,此时会创建一个计算任务并提交到actor的内部线程池等待执行。
执行这个任务的线程在调用堆栈上除了有线程池工作线程的基本逻辑之外没有太多的其他东西。其结果就是在调用堆栈上每一次调用react方法执行都如同空执行一样轻松。像waitFor这种递归方法的调用堆栈,因此不会因为反复调用react而增长太多。
Composing react-based code with combinators(使用组合器把基于react的代码组合起来)
有时候很难或者不可能为定序的多个react使用递归方法,当使用react重用类或者方法时就会遇到这种情况。从重用的本质上讲,被重用的组件在构建之后应该不能再改动,尤其是我们不能进行侵略性的改变,比如我们用递归的方式为上面的例子代码添加一个迭代方法。本节将讲解几种基于react代码的重用方式。
def sleep(delay: Long) {
register(time, delay, self)
react {
case ‘Awake => //OK, Continue
}
}
比如,假设我们的项目中包含如上所示的sleep方法。此方法使用了定时器服务(代码中未列出来)注册了当前的actor:self,定时器会在指定的延时delay之后被唤醒。定时器会用 ‘Awake 消息通知注册的actor。为了提高效率,sleep方法用react等待 ‘Awake 消息,这样做可以使得处于睡眠状态的actor不需要消耗JVM的线程资源。
使用上面的sleep方法,调用完react后,总要求执行一些东西。因为我们要重用方法,所以我们就不能简单的到代码前面在react代码体中插入一些逻辑。相反的,我们需要一种方式把sleep方法和在收到了 ‘Awake 消息之后执行的代码结合起来,而不改变sleep方法的实现。
我们所需要的功能正好是Actor对象的控制流组合器所提供的。这个组合器允许把公共的通讯模式用一种相对简单且简洁的方式表达出来。最基本的组合器就是andThen。andThen组合器将两块代码组合到一起,互相在对方之后运行,即使第一块代码调用了react。
下面的代码展示了如何在调用了sleep方法之后使用andThen执行代码。andThen的使用如同操作符一样,被嵌在两块代码之间。第一块代码的最后一个操作调用了sleep,在sleep内部最后又调用了react。
actor {
val period = 1000
{
//sleep之前的代码
sleep(period)
andThen {
//唤醒之后的代码
}
}
}
注意,sleep函数的参数period在andThen代码块之外声明。这样做是可以的,因为这两块代码块都是闭包,闭包能够在他们的运行上下文中捕获变量。第二块代码会在第一块代码结束后运行,即便第一块代码的sleep方法内有react方法调用。然而,注意第二块代码是被actor执行的最后一块代码。andThen的使用并没有改变react方法不会返回的事实,andThen的作用仅仅是将两块代码顺序组合起来而已。
另外一个有用的组合器就是loopWhile。就像它的名字建议的那样,如果提供的条件为真的话,那么输入的闭包代码将会持续循环执行。多亏了Scala灵活的语法,loopWhile感觉就像语言原生的语法一样。下面的代码展示了一个变动的actor链的例子,这个例子使用loopWhile等待多个 ‘Die 消息。loopWhile有两个代码块,分别是条件(n > 0)和循环体,这两者都是闭包,因为这两者都访问了本地变量n。注意,循环体中最顶层的react自从最开始的例子以来一直都没有改变过。循环体也可以被抽象成一个方法,loopWhile在这两种情况下都能正常工作。
def buildChain(size: Int, next: Actor, waitNum: Int): Actor = {
val a = actor {
var n = waitNum
loopWhile (n > 0) {
n -= 1
react {
case ‘Die =>
val from = sender
if (next != null) {
next ! ‘Die
react {case ‘Ack => from ! ‘Ack}
} else from ! ‘Ack
}
}
}
if (size > 0) buildChain(size - 1, a, waitNum)
else a
}
Akka在Flink 0.9版本中已经开始采用。使用Akka,所有远程过程调用过程都实现为异步消息。这主要影响组件JobManager、TaskManager和JobClient。将来,可能还会有更多的组件被转换为actor模型,允许它们发送和处理异步消息。
actor只有一个处理线程,该线程轮询actor的邮箱并连续处理接收到的消息。作为已处理消息的结果,actor可以更改其内部状态、发送新消息或生成新actor。如果actor的内部状态是从其处理线程中独占操纵的,那么就不需要确保actor的状态线程安全。尽管单个actor本质上是顺序的,但是由多个actor组成的系统具有高度并发性和可伸缩性,因为处理线程在所有actor之间共享。这种共享也是为什么永远不应该从actor线程内调用阻塞调用的原因。这样的调用将阻止线程被其他actor用于处理他们自己的消息。
多个Actor系统可以在单个机器上共存。如果Actor系统是用RemoteActorRefProvider启动的,那么可以从可能驻留在远程计算机上的另一个Actor系统访问它。Actor系统自动识别actor消息是发给生活在同一Actor系统或远程Actor系统中的actor的。在本地通信的情况下,使用共享存储器有效地传输消息。在远程通信的情况下,通过网络堆栈发送消息。
所有actor都按层次结构组织。每个新创建的actor将其创建的actor作为父节点分配。层次结构用于监督。每位家长负责监督其子女。如果其中一个子节点出现错误,则通知他。如果actor能够解决问题,那么他可以恢复或重新启动他的孩子。如果问题超出了它的处理范围,它可以将错误升级到自己的父母。升级错误仅仅意味着当前层次之上的层次结构层现在负责解决问题。有关Akka的监督和监测的细节可以在这里找到。
系统创建的第一个actor由系统提供的监护actor/用户监控。这里将深入解释角色层次结构。有关Actor系统的更多信息,请参阅这里。
Flink系统由三个必须通信的分布式组件组成:JobClient、JobManager和TaskManager。JobClient从用户那里获取Flink作业并将其提交给JobManager。然后,JobManager负责编排作业执行。首先,它分配所需的资源量,这主要包括任务管理器上的执行槽。
在资源分配之后,JobManager将作业的各个任务部署到相应的TaskManager。在接收到任务时,TaskManager生成执行任务的线程。诸如开始计算或完成计算之类的状态更改被发送回JobManager。基于这些状态更新,JobManager将指导作业执行直到完成。一旦作业完成,它的结果将被发送回JobClient,JobClient告诉用户有关它的信息。作业执行过程如下图所示。
在这里插入图片描述
在执行任何Flink作业之前,必须启动一个JobManager和一个或多个TaskManager。然后,TaskManager通过向JobManager发送RegisterTaskManager消息在JobManager注册。JobManager通过AcknowledgeRegisting消息确认注册成功。如果任务管理器已经在JobManager注册,因为发送了多个RegisterTaskManager消息,JobManager将返回AlreadyRegistered消息。如果注册被拒绝,则JobManager将回复RefuseRegisting消息。
通过向作业管理器发送带有相应的作业图的SubmitJob消息,作业被提交给作业管理器。在接收到JobGraph之后,JobManager从JobGraph中创建一个ExecutionGraph,它用作分布式执行的逻辑表示。ExecutionGraph包含有关必须部署到TaskManager才能执行的任务的信息。
JobManager的调度器负责分配TaskManager上的可用的执行slot。在TaskManager上分配执行slot之后,将带有执行任务所需的所有信息的SubmitTask消息发送到相应的TaskManager。TaskOperationResult确认任务部署成功。部署并运行提交的作业的源之后,作业提交也被认为是成功的。JobManager通过发送带有相应作业ID的成功消息向JobClient通知该状态。
在TaskManagers上运行的单个任务的状态更新通过UpdateTaskExecutionState消息发送回JobManager。通过这些更新消息,可以更新ExecutionGraph以反映执行的当前状态。
JobManager还充当数据源的输入分割分配器。它负责将工作分布到所有TaskManager,以便尽可能保留数据位置。为了动态平衡负载,任务在完成对旧输入的处理之后请求新的输入分离。这个请求是通过向JobManager发送RequestNextInputSplit来实现的。JobManager响应NextInputSplit消息。如果没有更多的输入分隔,则消息中包含的输入分隔为空。
任务被懒部署到任务管理器。这意味着,只在其生产者之一完成生产某些数据之后才部署消耗数据的任务。一旦生产者这样做了,它将向JobManager发送ScheduleOrUpdateConsumers消息。这些消息表示消费者现在可以读取新产生的数据。如果消耗任务尚未运行,则会将其部署到TaskManager。
5.JobClient
JobClient表示分布式系统的面向用户的组件。它用于与JobManager通信,因此它负责提交Flink作业、查询提交的作业的状态以及接收当前运行的作业的状态消息。
JobClient也是通过消息进行通信的actor。有两个与作业提交相关的消息:SubmitJobDetached和SubmitJobWait。第一条消息提交作业,并且从接收任何状态消息和最终作业结果中去寄存器。如果希望以fire和forget 方式将作业提交到Flink集群,则分离模式非常有用。
SubmitJobWait消息向JobManager提交作业,并注册以接收该作业的状态消息。在内部,这是通过生成助手角色来完成的,助手角色用作状态消息的接收器。一旦作业终止,JobManager将带有持续时间和累加器结果的JobResultSuccess发送到派生的助手actor 。在接收到此消息后,助手actor 将消息转发给客户端,客户端最初发出SubmitJobWait消息,然后终止。
在actor可以和另一个actor通信之前,它必须检索ActorRef。查找此操作还需要超时。为了在actor未启动时使系统快速失败,将查找超时设置为比常规超时更小的值。如果遇到查找超时,可以通过配置中的“akka.lookup.timeout”增加查找时间。
Akka的另一个特点是它设置了最大消息大小的限制。这是因为它保留了相同大小的串行化缓冲区,并且不想浪费内存。如果因为消息超出了最大大小而遇到传输错误,则可以通过配置中的“akka.framesize”增加帧大小。
Flink使用Akka的DeathWatch机制检测出故障组件。DeathWatch允许actor观看其他actor,即使它们没有其他actor监督到,甚至存活在在不同的Actor系统里。一旦被观察到的actor挂掉或无法再访问,将向观察的actor发送终止消息。因此,在接收到这样的消息后,系统可以采取措施来处理它。在内部,DeathWatch被实现为心跳和故障检测器,该故障检测器基于心跳间隔、心跳暂停和故障阈值来估计actor何时可能挂掉。可以通过在配置中设置“akka.watch.heartbeat.interval”值来控制心跳间隔。可接受的心跳暂停可以通过“akka.watch.heartbeat.pause”指定。心跳暂停应该是心跳间隔的倍数,否则丢失的心跳将直接触发DeathWatch。故障阈值可以通过“akka.watch.threshold”指定,并且它有效地控制故障检测器的灵敏度。
在Flink中,JobManager监视所有已注册的任务管理器,TaskManager监视JobManager。这样,两个组件都知道其他组件何时不再可访问。JobManager通过将相应的TaskManager标记为dead来作出反应,从而防止将来任务部署到它。此外,它会使当前在这个任务管理器上运行的所有任务失败,并重新安排它们在另一个TaskManager上的执行。如果TaskManager仅仅因为临时连接丢失而被标记为已死,那么一旦重新建立了连接,它就可以简单地在JobManager中重新注册自己。
任务管理器还监视JobManager。此监视允许TaskManager在检测到失败的JobManager时通过使当前运行的所有任务失败而进入清洁状态。此外,如果触发的死亡仅由网络拥塞或连接丢失引起,TaskManager将尝试重新连接到JobManager。
8.未来发展
目前,只有三个组件(JobClient、JobManager和TaskManager)作为参与者实现。为了更好地利用并发性同时提高可伸缩性,可以考虑将更多的组件实现为actor。一个有希望的actor可以是ExecutionGraph,其单独的ExecutionVertices或者甚至相关联的Execution对象都可以作为actor实现。这样的细粒度actor模型具有这样的优点,即状态更新可以直接发送到相应的Execution对象。通过这种方式,JobManager将明显地从单点通信中解脱出来。