akka介绍
1、概述Akka是JAVA虚拟机JVM平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口。Akka处理并发的方法基于Actor模型。在Akka里,Actor之间通信的唯一机制就是消息传递。Akka框架支持两种语言Java和Scala。Akka是一个运行时与编程模型一致的系统。2、Akka中的Actor是什么Actor本质上就是
1、概述
Akka是JAVA虚拟机JVM平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口。
Akka处理并发的方法基于Actor模型。在Akka里,Actor之间通信的唯一机制就是消息传递。Akka框架支持两种语言Java和Scala。Akka是一个运行时与编程模型一致的系统。
2、Akka中的Actor是什么
Actor本质上就是接收消息并采取行动处理消息的对象。它从消息源中解耦出来,只负责正确识别接收到的消息类型,并采取相应的行动。
收到一条消息之后,一个Actor可能会采取以下一个或多个行动:
执行一些本身的操作(例如进行计算、持久化数据、调用外部的Web服务等);
把消息或衍生消息转发给另外一个Actor;
实例化一个新的Actor并把消息转发给它。
或者,如果这个Actor认为合适的话,可能会完全忽略这条消息(也就是说,它可能选择不响应)。
3、Akka使用场景
任何需要高吞吐率和低延迟的系统都可以使用Akka。
Actor使你能够进行服务失败管理(监管者),负载管理(缓和策略、超时和隔离),水平和垂直方向上的可扩展性(增加cpu核数和/或增加更多的机器)管理。
4、准备工作
Akka要求你安装了 Java 1.6或更高版本。
下载
下载Akka有几种方法。你可以下载包含微内核的完整发布包(包含所有的模块). 或者也可以从Maven或sbt从Akka Maven仓库下载对akka的依赖。
模块
Akka的模块化做得非常好,它为不同的功能提供了不同的Jar包。
akka-actor-2.0.jar – 标准Actor, 有类型Actor,等等
akka-remote-2.0.jar – 远程Actor
akka-slf4j-2.0.jar – SLF4J事件处理监听器
akka-testkit-2.0.jar – 用于测试Actor的工具包
akka-kernel-2.0.jar – Akka微内核,可运行一个基本的最小应用服务器
akka--mailbox-2.0.jar – Akka可容错邮箱
要查看每个Akka模块的jar包依赖见 依赖 章节. 虽然不重要不过akka-actor 没有外部依赖 (除了 scala-library.jar JAR包).
使用发布版:http://akka.io/downloads,下载发布包并解压.
5、maven配置
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.10</artifactId>
<version>2.1.2</version>
</dependency>
6、akka使用
(1)作为一个库,以普通jar包的形式放在classpath上,或放到web应用中的 WEB-INF/lib位置
(2)作为一个独立的应用程序,使用 Microkernel(微内核),自己有一个main类来初始化Actor系统
将Akka作为一个库。如果你是编写web应用,你估计要使用这种方式。通过添加更多的模块,可以有多种以库的形式使用Akka的方式。
7、akka的Java API
(1)Creating Actors
/*
Creating Actors
xtending the UntypedActor class and implementing the onReceive method
*/
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MyUntypedActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@override
public void onReceive(Object message) throws Exception {
if (message instanceof String)
log.info("Received String message: {}", message);
else
unhandled(message);
}
}
UntypedActor只定义了一个抽象方法,就是上面提到的onReceive(Objectmessage), 用来实现actor的行为。
如果当前actor的行为与收到的消息不匹配,则会调用unhandled方法, 它的缺省实现是向actor系统的事件流中发布一条 akka.actor.UnhandledMessage(message, sender, recipient)。
另外,它还包括:
getSelf(); //代表本actor的 ActorRef
getSender(); //代表最近收到的消息的发送actor,通常用于下面将讲到的回应消息中
supervisorStrategy(); //用户可重写它来定义对子actor的监管策略
getContext(); //暴露actor和当前消息的上下文信息
(2)Send messages
tell 意思是“fire-and-forget”, 异步发送一个消息并立即返回。这是发送消息的推荐方式。不会阻塞地等待消息。它拥有最好的并发性和可扩展性。ask 异步发送一条消息并返回一个 Future代表一个可能的回应。需要采用Future的处理模式。
public class MyReceivedTimeoutUntypedActor extends UntypedActor {
public MyReceivedTimeoutUntypedActor() {
getContext().setReceiveTimeout(Duration.parse("30 seconds"));
}
public void onReceive(Object message) {
if (message.equals("Hello")) {
getSender().tell("Hello world");
} else if (message == Actors.receiveTimeout()) {
throw new RuntimeException("received timeout");
} else {
unhandled(message);
}
}
}
如果服务端处理消息时发生了异常而导致没有给客户端回应,那么客户端收到的结果将会收到Timeout的Failure:Failure(akka.pattern.AskTimeoutException: Timed out)。可以将异常捕获用Failure封装异常发给客户端:actor.tell(new akka.actor.Status.Failure(e))。
Future的onComplete, onResult, 或 onTimeout 方法可以用来注册一个回调,以便在Future完成时得到通知。从而提供一种避免阻塞的方法。
Future<Object> future = Patterns.ask(queryActor, param, timeout);
future.onComplete(new OnComplete<Object>() {
public void onComplete(Throwable failure, Object result) {
if (failure != null) {
// time out exception occurs
if (failure.getMessage() != null) {
logger.error(failure.getMessage(), failure);
}
// mark as timeout error
res.resume(Response.ok().entity(result).build());
} else {
res.resume(Response.ok().entity(result).build());
}
logger.info("query cost " + stopwatch.elapsedTime(TimeUnit.MILLISECONDS) + " query is " + param);
}
}, actorSystem.dispatcher());
更多推荐
所有评论(0)