Mina2框架解析一
Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。Mina 主要有1.x 和
Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。Mina 主要有1.x 和2.x 两个分支,这里我们讲解最新版本2.0,如果你使用的是Mina 1.x,那么可能会有一些功能并不适用。学习本文档,需要你已掌握JAVA IO、JAVA NIO、JAVASocket、JAVA 线程及并发库(java.util.concurrent.*)的知识。Mina 同时提供了网络通信的Server 端、Client 端的封装,无论是哪端,Mina 在整个网通通信结构中都处于如下的位置:可见Mina 的API 将真正的网络通信与我们的应用程序隔离开来,你只需要关心你要发送、接收的数据以及你的业务逻辑即可。同样的,无论是哪端,Mina 的执行流程如下所示:
(1.) IoService:这个接口在一个线程上负责套接字的建立,拥有自己的Selector,监听是否有连接被建立。
(2.) IoProcessor:这个接口在另一个线程上,负责检查是否有数据在通道上读写,也就是说它也拥有自己的Selector,这是与我们使用JAVA NIO 编码时的一个不同之处,通常在JAVA NIO 编码中,我们都是使用一个Selector,也就是不区分IoService与IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在IoService 上的过滤器,并在过滤器链之后调用IoHandler。
(3.) IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的encode 与decode是最为重要的、也是你在使用Mina 时最主要关注的地方。
(4.) IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。
1. 简单的TCPServer:
(1.) 第一步:编写IoService
按照上面的执行流程,我们首先需要编写IoService,IoService 本身既是服务端,又是客户端,我们这里编写服务端,所以使用IoAcceptor 实现,由于IoAcceptor 是与协议无关的,因为我们要编写TCPServer,所以我们使用IoAcceptor 的实现NioSocketAcceptor,实际上底层就是调用java.nio.channels.ServerSocketChannel 类。当然,如果你使用了Apache 的APR 库,那么你可以选择使AprSocketAcceptor 作为TCPServer 的实现,据传说Apache APR库的性能比JVM 自带的本地库高出很多。那么IoProcessor 是由指定的IoService 内部创建并调用的,我们并不需要关心。
IoAcceptor acceptor=new NioSocketAcceptor();
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE,10);
acceptor.bind(new InetSocketAddress(9123));
这段代码我们初始化了服务端的TCP/IP 的基于NIO 的套接字,然后调用IoSessionConfig设置读取数据的缓冲区大小、读写通道均在10 秒内无任何操作就进入空闲状态。
(2.) 第二步:编写过滤器
这里我们处理最简单的字符串传输,Mina 已经为我们提供了TextLineCodecFactory 编解码器工厂来对字符串进行编解码处理。
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimeter.WINDOWS.getValue(), LineDelimiter. WINDOWS.getValue())));
这段代码要在acceptor.bind()方法之前执行,因为绑定套接字之后就不能再做这些准备工作了。这里先不用清楚编解码器是如何工作的,这个是后面重点说明的内容,这里你只需要清楚,我们传输的以换行符为标识的数据,所以使用了Mina 自带的换行符编解码器工厂。
(3.) 第三步:编写IoHandler
这里我们只是简单的打印Client 传说过来的数据。
public class MyIoHandler extends IoHandlerAdapter {
// 这里我们使用的SLF4J作为日志门面,至于为什么在后面说明。
private final static Logger log = LoggerFactory.getLogger(MyIoHandler.class);
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
String str = message.toString();
log.info("The message received is [" + str + "]");
if (str.endsWith("quit")) {
session.close(true);
return;
}
}
}
然后我们把这个IoHandler 注册到IoService:
acceptor.setHandler(new MyIoHandler());
当然这段代码也要在acceptor.bind()方法之前执行。然后我们运行MyServer 中的main 方法,你可以看到控制台一直处于阻塞状态,此时,我们用telnet 127.0.0.1 9123 访问,然后输入一些内容,当按下回车键,你会发现数据在Server 端被输出,但要注意不要输入中文,因为Windows 的命令行窗口不会对传输的数据进行UTF-8 编码。当输入quit 结尾的字符串时,连接被断开。这里注意你如果使用的操作系统,或者使用的Telnet 软件的换行符是什么,如果不清楚,可以删掉第二步中的两个红色的参数,使用TextLineCodec 内部的自动识别机制。
2. 简单的TCPClient:
这里我们实现Mina 中的TCPClient,因为前面说过无论是Server 端还是Client 端,在Mina中的执行流程都是一样的。唯一不同的就是IoService 的Client 端实现是IoConnector。
(1.) 第一步:编写IoService并注册过滤器
public class MyClient {
main方法:
IoConnector connector=new NioSocketConnector();
connector.setConnectTimeoutMillis(30000);
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(
new TextLineCodecFactory(
Charset.forName("UTF-8"),
LineDelimiter.WINDOWS.getValue(),
LineDelimiter.WINDOWS.getValue()
)
)
);
connector.connect(new InetSocketAddress("localhost", 9123));
}
(2.) 第三步:编写IoHandler
public class ClientHandler extends IoHandlerAdapter {
private final static Logger LOGGER = LoggerFactory
.getLogger(ClientHandler.class);
private final String values;
public ClientHandler(String values) {
this.values = values;
}
@Override
public void sessionOpened(IoSession session) {
session.write(values);
}
}
注册IoHandler:
connector.setHandler(new ClientHandler(“你好!\r\n 大家好!”));
然后我们运行MyClient,你会发现MyServer 输出如下语句:
The message received is [你好!]
The message received is [大家好!]
我们看到服务端是按照收到两条消息输出的,因为我们用的编解码器是以换行符判断数据是否读取完毕的。
3. 介绍Mina的TCP的主要接口:
通过上面的两个示例,你应该对Mina 如何编写TCP/IP 协议栈的网络通信有了一些感性的认识。
(1.)IoService:
这个接口是服务端IoAcceptor、客户端IoConnector 的抽象,提供IO 服务和管理IoSession的功能,它有如下几个常用的方法:
A. TransportMetadata getTransportMetadata():
这个方法获取传输方式的元数据描述信息,也就是底层到底基于什么的实现,譬如:nio、apr 等。
B. void addListener(IoServiceListener listener):
这个方法可以为IoService 增加一个监听器,用于监听IoService 的创建、活动、失效、空闲、销毁,具体可以参考IoServiceListener 接口中的方法,这为你参与IoService 的生命周期提供了机会。
C. void removeListener(IoServiceListener listener):
这个方法用于移除上面的方法添加的监听器。
D. void setHandler(IoHandler handler):
这个方法用于向IoService 注册IoHandler,同时有getHandler()方法获取Handler。
E. Map
ConnectFuture future = connector.connect(new InetSocketAddress(
HOSTNAME, PORT));
// 等待是否连接成功,相当于是转异步执行为同步执行。
future.awaitUninterruptibly();
// 连接成功后获取会话对象。如果没有上面的等待,由于connect()方法是异步的,session
可能会无法获取。
session = future.getSession();
第二种:
ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));
future.addListener(new IoFutureListener<ConnectFuture>() {
@Override
public void operationComplete(ConnectFuture future) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
IoSession session = future.getSession();
System.out.println("++++++++++++++++++++++++++++");
}
});
System.out.println("*************");
为了更好的看清楚使用监听器是异步的,而不是像awaitUninterruptibly()那样会阻塞主线程的执行,我们在回调方法中暂停5 秒钟,然后输出+++,在最后输出。我们执行代码之后,你会发现首先输出(这证明了监听器是异步执行的),然后IoSession 对象Created,系统暂停5 秒,然后输出+++,最后IoSession 对象Opened,也就是TCP 连接建立。
4.日志配置:
前面的示例代码中提到了使用SLF4J 作为日志门面,这是因为Mina 内部使用的就是SLF4J,你也使用SLF4J 可以与之保持一致性。Mina 如果想启用日志跟踪Mina 的运行细节,你可以配置LoggingFilter 过滤器,这样你可
以看到Session 建立、打开、空闲等一系列细节在日志中输出,默认SJF4J 是按照DEBUG级别输出跟踪信息的,如果你想给某一类别的Mina 运行信息输出指定日志输出级别,可以调用LoggingFilter 的setXXXLogLevel(LogLevel.XXX)。
例:
LoggingFilter lf = new LoggingFilter();
lf.setSessionOpenedLogLevel(LogLevel.ERROR);
acceptor.getFilterChain().addLast("logger", lf);
这里IoSession 被打开的跟踪信息将以ERROR 级别输出到日志。
5.过滤器:
前面我们看到了LoggingFilter、ProtocolCodecFilter 两个过滤器,一个负责日志输出,一个负责数据的编解码,通过最前面的Mina 执行流程图,在IoProcessor 与IoHandler 之间可以有很多的过滤器,这种设计方式为你提供可插拔似的扩展功能提供了非常便利的方式,目前的Apache CXF、Apache Struts2 中的拦截器也都是一样的设计思路。Mina 中的IoFilter 是单例的,这与CXF、Apache Struts2 没什么区别。IoService 实例上会绑定一个DefaultIoFilterChainBuilder 实例,DefaultIoFilterChainBuilder 会把使用内部的EntryImpl 类把所有的过滤器按照顺序连在一起,组成一个过滤器链。
DefaultIoFilterChainBuilder 类如下常用的方法:
A. void addFirst(String name,IoFilter filter):
这个方法把过滤器添加到过滤器链的头部,头部就是IoProcessor 之后的第一个过滤器。同样的addLast()方法把过滤器添加到过滤器链的尾部。
B. void addBefore(String baseName,String name,IoFilter filter):
这个方法将过滤器添加到baseName 指定的过滤器的前面,同样的addAfter()方法把过滤器添加到baseName 指定的过滤器的后面。这里要注意无论是那种添加方法,每个过滤器的名字(参数name)必须是唯一的。
C. IoFilter remove(Stirng name):
这个方法移除指定名称的过滤器,你也可以调用另一个重载的remove()方法,指定要移除的IoFilter 的类型。
D. List getAll():
这个方法返回当前IoService 上注册的所有过滤器。默认情况下,过滤器链中是空的,也就是getAll()方法返回长度为0 的List,但实际Mina内部有两个隐藏的过滤器:HeadFilter、TailFilter,分别在List 的最开始和最末端,很明显,TailFilter 在最末端是为了调用过滤器链之后,调用IoHandler。但这两个过滤器对你来说是透明的,可以忽略它们的存在。编写一个过滤器很简单,你需要实现IoFilter 接口,如果你只关注某几个方法,可以继承IoFilterAdapter 适配器类。IoFilter 接口中主要包含两类方法,一类是与IoHandler 中的方法名一致的方法,相当于拦截IoHandler 中的方法,另一类是IoFilter 的生命周期回调方法,这些回调方法的执行顺序和解释如下所示:
(1.)init()在首次添加到链中的时候被调用,但你必须将这个IoFilter 用
ReferenceCountingFilter 包装起来,否则init()方法永远不会被调用。
(2.)onPreAdd()在调用添加到链中的方法时被调用,但此时还未真正的加入到链。
(3.)onPostAdd()在调用添加到链中的方法后被调,如果在这个方法中有异常抛出,则过滤器会立即被移除,同时destroy()方法也会被调用(前提是使用ReferenceCountingFilter包装)。
(4.)onPreRemove()在从链中移除之前调用。
(5.)onPostRemove()在从链中移除之后调用。
(6.)destory()在从链中移除时被调用,使用方法与init()要求相同。
无论是哪个方法,要注意必须在实现时调用参数nextFilter 的同名方法,否则,过滤器链的执行将被中断,IoHandler 中的同名方法一样也不会被执行,这就相当于Servlet 中的Filter 必须调用filterChain.doFilter(request,response)才能继续前进是一样的道理。
示例:
public class MyIoFilter implements IoFilter {
@Override
public void destroy() throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%�stroy");
}
@Override
public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%exceptionCaught");
nextFilter.exceptionCaught(session, cause);
}
@Override
public void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterClose");
nextFilter.filterClose(session);
}
@Override
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterWrite");
nextFilter.filterWrite(session, writeRequest);
}
@Override
public void init() throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%init");
}
@Override
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived");
nextFilter.messageReceived(session, message);
}
@Override
public void messageSent(NextFilter nextFilter, IoSession session,WriteRequest writeRequest) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageSent");
nextFilter.messageSent(session, writeRequest);
}
@Override
public void onPostAdd(IoFilterChain parent, String name,NextFilter nextFilter) throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd");
}
@Override
public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostRemove");
}
@Override
public void onPreAdd(IoFilterChain parent, String name,NextFilter nextFilter) throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd");
}
@Override
public void onPreRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreRemove");
}
@Override
public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionClosed");
nextFilter.sessionClosed(session);
}
@Override
public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated");
nextFilter.sessionCreated(session);
}
@Override
public void sessionIdle(NextFilter nextFilter, IoSession session,IdleStatus status) throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle");
nextFilter.sessionIdle(session, status);
}
@Override
public void sessionOpened(NextFilter nextFilter, IoSession session)throws Exception {
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened");
nextFilter.sessionOpened(session);
}
}
我们将这个拦截器注册到上面的TCPServer 的IoAcceptor 的过滤器链中的最后一个:
acceptor.getFilterChain().addLast(“myIoFilter”, new ReferenceCountingFilter(new MyIoFilter()));
这里我们将MyIoFilter 用ReferenceCountingFilter 包装起来,这样你可以看到init()、destroy()方法调用。我们启动客户端访问,然后关闭客户端,你会看到执行顺序如下所示:
init onPreAdd onPostAdd sessionCreated sessionOpened messageReceived filterClose sessionClosed onPreRemove onPostRemove destroy。
IoHandler 的对应方法会跟在上面的对应方法之后执行,这也就是说从横向(单独的看一个过滤器中的所有方法的执行顺序)上看,每个过滤器的执行顺序是上面所示的顺序;从纵向(方法链的调用)上看,如果有filter1、filter2 两个过滤器,sessionCreated()方法的执行顺序如下所示:
filter1-sessionCreated filter2-sessionCreated IoHandler-sessionCreated。
这里你要注意init、onPreAdd、onPostAdd 三个方法并不是在Server 启动时调用的,而是IoSession 对象创建之前调用的,也就是说IoFilterChain.addXXX()方法仅仅负责初始化过滤器并注册过滤器,但并不调用任何方法,包括init()初始化方法也是在IoProcessor 开始工作的时候被调用。IoFilter 是单例的,那么init()方法是否只被执行一次呢?这个是不一定的,因为IoFilter是被IoProcessor 调用的,而每个IoService 通常是关联多个IoProcessor,所以IoFilter的init()方法是在每个IoProcessor 线程上只执行一次。关于Mina 的线程问题,我们后面会详细讨论,这里你只需要清楚,init()与destroy()的调用次数与IoProceesor 的个数有关,假如一个IoService 关联了3 个IoProcessor,有五个并发的客户端请求,那么你会看到三次init()方法被调用,以后将不再会调用。Mina中自带的过滤器:
过滤器 说明
BlacklistFilter 设置一些IP 地址为黑名单,不允许访问。
BufferedWriteFilter 设置输出时像BufferedOutputStream 一样进行缓冲。
CompressionFilter 设置在输入、输出流时启用JZlib 压缩。
ConnectionThrottleFilter 这个过滤器指定同一个IP 地址(不含端口号)上的请求在多长的毫秒值内可以有一个请求,如果小于指定的时间间隔就有连续两个请求,那么第二个请求将被忽略(IoSession.close())。正如Throttle 的名字一样,调节访问的频率这个过滤器最好放在过滤器链的前面。
FileRegionWriteFilter 如果你想使用File 对象进行输出,请使用这个过滤器。要注意,你需要使用WriteFuture 或者在
messageSent() 方法中关闭File 所关联的FileChannel 通道。
StreamWriteFilter 如果你想使用InputStream 对象进行输出,请使用这个过滤器。要注意,你需要使用WriteFuture或者在messageSent()方法中关闭File 所关联的
FileChannel 通道。NoopFilter 这个过滤器什么也不做,如果你想测试过滤器链是否起作用,可以用它来测试。
ProfilerTimerFilter 这个过滤器用于检测每个事件方法执行的时间,所以最好放在过滤器链的前面。
ProxyFilter 这个过滤器在客户端使用ProxyConnector 作为实现时,会自动加入到过滤器链中,用于完成代理功能。
RequestResponseFilter 暂不知晓。
SessionAttributeInitializingFilter 这个过滤器在IoSession 中放入一些属性(Map),通常放在过滤器的前面,用于放置一些初始化的信息。
MdcInjectionFilter 针对日志输出做MDC 操作,可以参考LOG4J 的MDC、NDC 的文档。
WriteRequestFilter CompressionFilter、RequestResponseFilter 的基类,用于包装写请求的过滤器。
还有一些过滤器,会在各节中详细讨论,这里没有列出,譬如:前面的LoggingFilger 日志过滤器。
更多推荐
所有评论(0)