Akka 和 μJavaActors

Akka和μJavaActorsμJavaActors均是java的Actor库,其中Akka提供了叫为完整的Actor开发框架,较为庞大,学习成本很高,μJavaActors 是一个轻量级Actor库,大约仅有1200行代码,比较适合入门。


一.Akka Demo
Akka是一个相当成熟、强大的库,github上download下的是Akka的源码,应该使用sbt构建的工程,如果没有使用sbt经验,想导出jar还挺不容易的,推荐Akka官网下载akka各个组件的jar去使用,简单介绍一下helloworld 级别Akka的demo。



1.Akka的主要组件



akka-actor.jar : Actor核心组件了,定义了Acotr核心类



akka-slf4f.jar : SLF4F Logger的支持,一个打log的组件,不用太关注



akka-remote.jar : Actor做远程调用的jar,类似RFC吧



akka-cluster : actor做集群管理组件



akka-camel.jar : 对Apache Camel 集成接口



scala-library-2.11.8.jar : akka核心应该是Scala写的,这个组件就是对akka的核心支持



Akka还有很多组件,不过对于hello world级的程序简单了解几个就ok了。工程是基于eclipse的,需要包含下面几个基础的组件:
akka-actor.jar : Actor核心组件了,定义了Acotr核心类
akka-slf4f.jar : SLF4F Logger的支持,一个打log的组件,不用太关注
scala-library-2.11.8.jar : akka核心应该是Scala写的,这个组件就是对akka的核心支持
config-1.3.0.jar



编写两个Actor:
package demo02;



import akka.actor.UntypedActor;
/*




  • UntypedAcotr是无类型Actor的一个抽象类,继承与核心类Actor
    */
    public class Greeter extends UntypedActor {



    public static enum Msg{
    GREET , DONE;
    }
    /**



    • 每个Actor必须实现OnReceive,当该Actor收到消息调用该方法
      */
      @Override
      public void onReceive(Object msg) throws Throwable {
      if(msg == Msg.GREET){
      System.out.println(“Hello world”);
      /**
      * 这里吐槽一下Akka对于发消息的设计,发送消息的设计竟然是:
      * receiver.tell(msg , sender)
      * 也许没理解akka设计的理念,但是正常人设计不应该是:
      * sender.tell(msg , receiver)
      * 汗……
      */
      getSender().tell(Msg.DONE, getSelf());
      }else{
      unhandled(msg);
      }



    }





}
package demo02;



import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;



public class HelloWorld extends UntypedActor {



@Override
public void preStart(){
final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class));
greeter.tell(Greeter.Msg.GREET, getSelf());
}


@Override
public void onReceive(Object msg) throws Throwable {

if(msg == Greeter.Msg.DONE){
getContext().stop(getSelf());
}else{
unhandled(msg);
}

}


}
下面是Main方法:
package demo02;



import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;



public class Main {



public static void main(String[] args) {
//ActorSystem 相当于ActorManager,管理各种Acor调度、线程管理等
ActorSystem system = ActorSystem.create("hello");
//创建一个HelloWorld 类型的Actor,在Actor启动前,会调preStart(),此时会想Greeter发消息
ActorRef actor = system.actorOf(Props.create(HelloWorld.class));
//添加结束终结Actor,当ActorSystem调Stop时,会向每个Actor发送Terminated消息
system.actorOf(Props.create(Terminator.class, actor), "terminator");

}
public static class Terminator extends UntypedActor{

private final LoggingAdapter log = Logging.getLogger(getContext().system(),this);
private ActorRef actorRef = null;

public Terminator(ActorRef ref){
System.out.println("Terminator Init !!!");
actorRef = ref;
getContext().watch(actorRef);
}

@Override
public void onReceive(Object msg) throws Throwable {
if (msg instanceof Terminated) {
log.info("{} has terminated, shutting down system", actorRef.path());
getContext().system().terminate();
} else {
unhandled(msg);
}

}

}


}
上面代码在akka的源码中sample都可以找到的

二.μJavaActors
μJavaActors 是一个十分轻量级的Actor库,实现核心的Actor调度,不涉及复杂的框架,简单分析一下它的源码吧



1.Actor核心接口



Actor:定义了一个标准的Actor应该具有行为



ActorManager:Actor管理器接口,提供线程管理,Actor调度等



Messager : Actor相互间传递传递的消息接口,当然附带的接口还有MessageEvent和MessageListener



简单引用作者对这个概念的描述:



Actor 是一个执行单元,一次处理一条消息。Actor 具有以下关键行为或特征:



每个 actor 有一个 name,该名称在每个 ActorManager 中必须是惟一的。
每个 actor 属于一个 category;类别是一种向一组 actor 中的一个成员发送消息的方式。一个 actor 一次只能属于一个类别。
只要 ActorManager 可以提供一个执行 actor 的线程,系统就会调用 receive()。为了保持最高效率,actor 应该迅速处理消息,而不要进入漫长的等待状态(比如等待人为输入)。
willReceive() 允许 actor 过滤潜在的消息主题。
peek() 允许该 actor 和其他 actor 查看是否存在挂起的消息(或许是为了选择主题)。
remove() 允许该 actor 和其他 actor 删除或取消任何尚未处理的消息。
getMessageCount() 允许该 actor 和其他 actor 获取挂起的消息数量。
getMaxMessageCount() 允许 actor 限制支持的挂起消息数量;此方法可用于预防不受控制地发送。
大部分程序都有许多 actor,这些 actor 常常具有不同的类型。actor 可在程序启动时创建或在程序执行时创建(和销毁)。本文中的 actor 包 包含一个名为 AbstractActor 的抽象类,actor 实现基于该类。



ActorManager 是一个 actor 管理器。它负责向 actor 分配线程(进而分配处理器)来处理消息。ActorManager 拥有以下关键行为或特征:



createActor() 创建一个 actor 并将它与此管理器相关联。
startActor() 启动一个 actor。
detachActor() 停止一个 actor 并将它与此管理器断开。
send()/broadcast() 将一条消息发送给一个 actor、一组 actor、一个类别中的任何 actor 或所有 actor。
在大部分程序中,只有一个 ActorManager,但如果您希望管理多个线程和/或 actor 池,也可以有多个 ActorManager。此接口的默认实现是 DefaultActorManager。



消息 是在 actor 之间发送的消息。Message 是 3 个(可选的)值和一些行为的容器:



source 是发送 actor。
subject 是定义消息含义的字符串(也称为命令)。
data 是消息的任何参数数据;通常是一个映射、列表或数组。参数可以是要处理和/或其他 actor 要与之交互的数据。
subjectMatches() 检查消息主题是否与字符串或正则表达式匹配。
μJavaActors 包的默认消息类是 DefaultMessage。



ActorManager其实只要简单浏览一下μJavaActors源码就可以理解Actor设计思路啦,主要分析一下ActorManager中的Actor调度源码:
public class ActorRunnable implements Runnable {
public boolean hasThread;
public AbstractActor actor;



    public void run() {
// logger.trace("procesNextActor starting");
int delay = 1;
while (running) {
try {
if (!procesNextActor()) {
// logger.trace("procesNextActor waiting on actor");
// sleep(delay * 1000);
synchronized (actors) {
// TOOD: adjust this delay; possible parameter
// we want to minizmize overhead (make bigger);
// but it has a big impact on message processing
// rate (makesmaller)
// actors.wait(delay * 1000);
actors.wait(100);
}
delay = Math.max(5, delay + 1);
} else {
delay = 1;
}
} catch (InterruptedException e) {
} catch (Exception e) {
logger.error("procesNextActor exception", e);
}
}
// logger.trace("procesNextActor ended");
}

protected boolean procesNextActor() {
boolean run = false, wait = false, res = false;
actor = null;
synchronized (actors) {
for (String key : runnables.keySet()) {
actor = runnables.remove(key);
break;
}
}
if (actor != null) {
// first run never started
run = true;
actor.setHasThread(true);
hasThread = true;
try {
actor.run();
} finally {
actor.setHasThread(false);
hasThread = false;
}
} else {
synchronized (actors) {
for (String key : waiters.keySet()) {
actor = waiters.remove(key);
break;
}
}
if (actor != null) {
// then waiting for responses
wait = true;
actor.setHasThread(true);
hasThread = true;
try {
res = actor.receive();
if (res) {
incDispatchCount();
}
} finally {
actor.setHasThread(false);
hasThread = false;
}
}
}
// if (!(!run && wait && !res) && a != null) {
// logger.trace("procesNextActor %b/%b/%b: %s", run, wait, res, a);
// }
return run || res;
}
} ActorMgr中有一个线程队列维护了一些ActorRunnable对象,每个ActorRunnable对象有都在无线循环调度Actor,这也就简单使得每个Actor在不同的线程中执行。当然此时会有个问题,如果有一些Actor出现资源竞争会不会出现问题,答案肯定是会的。Actor仅仅是抽象了线程调度问题并给出了一下Actor的原则,并不能完全避免资源竞争现象的出现,只能说准守Actor模式规范,,当然也可以用redis去做公共内存块,避免直接的全局资源读写。

Category algorithm