今天我们分享springboot集成netty的过程:

1、jar包依赖,Netty服务端和客户端依赖的jar包一样:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.28.Final</version>
</dependency>
<!--  序列化jar -->
<dependency>
    <groupId>de.javakaffee</groupId>
    <artifactId>kryo-serializers</artifactId>
    <version>0.42</version>
</dependency>

2、Netty服务端核心代码(boot启动类省略):

2.1、netty功能启动入口


import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 类说明:服务端Handler的初始化
 * 交给Spring 托管,ServerBusiHandler用注入方式实例化后加入Netty的pipeline
 */
@Service//启动时扫描,即交给spring管理
public class ServerInit extends ChannelInitializer<SocketChannel> {

    @Autowired
    private ServerBusiHandler serverBusiHandler;

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        /*Netty提供的日志打印Handler,可以展示发送接收出去的字节*/
        //ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
        /*剥离接收到的消息的长度字段,拿到实际的消息报文的字节数组*/
        ch.pipeline().addLast("frameDecoder",
                new LengthFieldBasedFrameDecoder(65535,
                        0,2,0,
                        2));
        /*给发送出去的消息增加长度字段*/
        ch.pipeline().addLast("frameEncoder",
                new LengthFieldPrepender(2));
        /*反序列化,将字节数组转换为消息实体*/
        ch.pipeline().addLast(new KryoDecoder());
        /*序列化,将消息实体转换为字节数组准备进行网络传输*/
        ch.pipeline().addLast("MessageEncoder",
                new KryoEncoder());
        /*超时检测*/
        ch.pipeline().addLast("readTimeoutHandler",
                new ReadTimeoutHandler(50));
        /*登录应答*/
        ch.pipeline().addLast(new LoginAuthRespHandler());

        /*心跳应答*/
        ch.pipeline().addLast("HeartBeatHandler",
                new HeartBeatRespHandler());

        /*服务端业务处理*/
        ch.pipeline().addLast("ServerBusiHandler",
                serverBusiHandler);
    }
}

2.2、netty自定义配置入口 ServerInit


import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 类说明:服务端Handler的初始化
 * 交给Spring 托管,ServerBusiHandler用注入方式实例化后加入Netty的pipeline
 */
@Service//启动时扫描,即交给spring管理
public class ServerInit extends ChannelInitializer<SocketChannel> {

    @Autowired
    private ServerBusiHandler serverBusiHandler;

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        /*Netty提供的日志打印Handler,可以展示发送接收出去的字节*/
        //ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
        /*剥离接收到的消息的长度字段,拿到实际的消息报文的字节数组*/
        ch.pipeline().addLast("frameDecoder",
                new LengthFieldBasedFrameDecoder(65535,
                        0,2,0,
                        2));
        /*给发送出去的消息增加长度字段*/
        ch.pipeline().addLast("frameEncoder",
                new LengthFieldPrepender(2));
        /*反序列化,将字节数组转换为消息实体*/
        ch.pipeline().addLast(new KryoDecoder());
        /*序列化,将消息实体转换为字节数组准备进行网络传输*/
        ch.pipeline().addLast("MessageEncoder",
                new KryoEncoder());
        /*超时检测*/
        ch.pipeline().addLast("readTimeoutHandler",
                new ReadTimeoutHandler(50));
        /*登录应答*/
        ch.pipeline().addLast(new LoginAuthRespHandler());

        /*心跳应答*/
        ch.pipeline().addLast("HeartBeatHandler",
                new HeartBeatRespHandler());

        /*服务端业务处理*/
        ch.pipeline().addLast("ServerBusiHandler",
                serverBusiHandler);
    }
}

2.3、netty处理业务的Handler入口 ServerBusiHandler 类:

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

/**
 * @author Mark老师   享学课堂 https://enjoy.ke.qq.com
 * 类说明:业务处理类
 * channelRead0方法中有了实际的业务处理,负责具体的业务方法的调用
 *
 */
@Service
@ChannelHandler.Sharable//共享的单例
public class ServerBusiHandler
        extends SimpleChannelInboundHandler<MyMessage> {
    private static final Log LOG
            = LogFactory.getLog(ServerBusiHandler.class);

    @Autowired
    private RegisterService registerService;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg)
            throws Exception {
        LOG.info(msg);
        MyMessage message = new MyMessage();
        MyHeader myHeader = new MyHeader();
        myHeader.setSessionID(msg.getMyHeader().getSessionID());
        myHeader.setType(MessageType.SERVICE_RESP.value());
        message.setMyHeader(myHeader);
        Map<String,Object> content = (HashMap<String,Object>)msg.getBody();
        /*方法所在类名接口名*/
        String serviceName = (String) content.get("siName");
        /*方法的名字*/
        String methodName = (String) content.get("methodName");
        /*方法的入参类型*/
        Class<?>[] paramTypes = (Class<?>[]) content.get("paraTypes");
        /*方法的入参的值*/
        Object[] args = (Object[]) content.get("args");
        /*从容器中拿到服务的Class对象*/
        Class serviceClass = registerService.getLocalService(serviceName);
        if(serviceClass == null){
            throw new ClassNotFoundException(serviceName+ " not found");
        }

        /*通过反射,执行实际的服务*/
        Method method = serviceClass.getMethod(methodName, paramTypes);
        boolean result  = (boolean)method.invoke(serviceClass.newInstance(),args);
        message.setBody(result);
        ctx.writeAndFlush(message);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx)
            throws Exception {
        LOG.info(ctx.channel().remoteAddress()+" 主动断开了连接!");
    }

}

3、客户端核心代码实现(boot启动类省略):

3.1、netty客户端启动入口

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 *类说明:rpc框架的客户端代理部分,交给Spring 托管
 * 1、动态代理的实现中,不再连接服务器,而是直接发送请求
 * 2、客户端网络部分的主体,包括Netty组件的初始化,连接服务器等
 */
@Service
public class RpcClientFrame implements Runnable{

    private static final Log LOG = LogFactory.getLog(RpcClientFrame.class);

    private ScheduledExecutorService executor = Executors
            .newScheduledThreadPool(1);
    private Channel channel;
    private EventLoopGroup group = new NioEventLoopGroup();

    /*是否用户主动关闭连接的标志值*/
    private volatile boolean userClose = false;
    /*连接是否成功关闭的标志值*/
    private volatile boolean connected = false;

    @Autowired
    private ClientInit clientInit;
    @Autowired
    private ClientBusiHandler clientBusiHandler;

    /*远程服务的代理对象,参数为客户端要调用的的服务*/
    public <T> T getRemoteProxyObject(final Class<?> serviceInterface) throws Exception {

        /*拿到一个代理对象,由这个代理对象通过网络进行实际的服务调用*/
        return (T)Proxy.newProxyInstance(serviceInterface.getClassLoader(),
                new Class<?>[]{serviceInterface},
                new DynProxy(serviceInterface,clientBusiHandler));
    }

    /*动态代理,实现对远程服务的访问*/
    private static class DynProxy implements InvocationHandler{
        private Class<?> serviceInterface;
        private ClientBusiHandler clientBusiHandler;

        public DynProxy(Class<?> serviceInterface, ClientBusiHandler clientBusiHandler) {
            this.serviceInterface = serviceInterface;
            this.clientBusiHandler = clientBusiHandler;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args)
                throws Throwable {
            Map<String,Object> content = new HashMap<>();
            content.put("siName",serviceInterface.getName());
            content.put("methodName",method.getName());
            content.put("paraTypes",method.getParameterTypes());
            content.put("args",args);
            return clientBusiHandler.send(content);
        }
    }

    public boolean isConnected() {
        return connected;
    }

    /*连接服务器*/
    public void connect(int port, String host) throws Exception {

        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(clientInit);
            // 发起异步连接操作
            ChannelFuture future = b.connect(
                    new InetSocketAddress(host, port)).sync();
            channel = future.sync().channel();
            /*连接成功后通知等待线程,连接已经建立*/
            synchronized (this){
                this.connected = true;
                this.notifyAll();
            }
            future.channel().closeFuture().sync();
        } finally {
            if(!userClose){/*非用户主动关闭,说明发生了网络问题,需要进行重连操作*/
                System.out.println("发现异常,可能发生了服务器异常或网络问题," +
                        "准备进行重连.....");
                //再次发起重连操作
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            TimeUnit.SECONDS.sleep(1);
                            try {
                                // 发起重连操作
                                connect(NettyConstant.REMOTE_PORT,
                                        NettyConstant.REMOTE_IP);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }else{/*用户主动关闭,释放资源*/
                channel = null;
                group.shutdownGracefully().sync();
                connected = false;
//                synchronized (this){
//                    this.connected = false;
//                    this.notifyAll();
//                }
            }
        }
    }

    @Override
    public void run() {
        try {
            connect(NettyConstant.REMOTE_PORT, NettyConstant.REMOTE_IP);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void close() {
        userClose = true;
        channel.close();
    }

    @PostConstruct
    public void startNet() throws InterruptedException {
        new Thread(this).start();
        while(!this.isConnected()){
            synchronized (this){
                this.wait();
            }
        }
        LOG.info("网络通信已准备好,可以进行业务操作了........");
    }

    @PreDestroy
    public void stopNet(){
        close();
    }

}

3.2、netty客户端自定义配置入口  ClientInit

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author Mark老师   享学课堂 https://enjoy.ke.qq.com
 * 类说明:客户端Handler的初始化
 * 交给Spring 托管,clientBusiHandler用注入方式实例化后加入Netty的pipeline
 */
@Service
public class ClientInit extends ChannelInitializer<SocketChannel> {

    @Autowired
    private ClientBusiHandler clientBusiHandler;

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        /*剥离接收到的消息的长度字段,拿到实际的消息报文的字节数组*/
        ch.pipeline().addLast("frameDecoder",
                new LengthFieldBasedFrameDecoder(65535,
                        0,2,0,
                        2));

        /*给发送出去的消息增加长度字段*/
        ch.pipeline().addLast("frameEncoder",
                new LengthFieldPrepender(2));

        /*反序列化,将字节数组转换为消息实体*/
        ch.pipeline().addLast(new KryoDecoder());
        /*序列化,将消息实体转换为字节数组准备进行网络传输*/
        ch.pipeline().addLast("MessageEncoder",
                new KryoEncoder());

        /*超时检测*/
        ch.pipeline().addLast("readTimeoutHandler",
                new ReadTimeoutHandler(10));

        /*发出登录请求*/
        ch.pipeline().addLast("LoginAuthHandler",
                new LoginAuthReqHandler());

        /*发出心跳请求*/
        ch.pipeline().addLast("HeartBeatHandler",
                new HeartBeatReqHandler());

        /*业务处理*/
        ch.pipeline().addLast("ClientBusiHandler",
                clientBusiHandler);
    }
}

3.3、netty通信中序列化的实现

序列化的Handler

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 类说明:序列化的Handler
 */
public class KryoEncoder  extends MessageToByteEncoder<MyMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, MyMessage message,
                          ByteBuf out) throws Exception {
        KryoSerializer.serialize(message, out);
        ctx.flush();
    }
}

反序列化的Handler

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * 类说明:反序列化的Handler
 */
public class KryoDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                          List<Object> out) throws Exception {
        Object obj = KryoSerializer.deserialize(in);
        out.add(obj);
    }
}

信息类的定义:

/**
 * 类说明:消息实体类
 */
public final class MyMessage {

    private MyHeader myHeader;

    private Object body;

    public final MyHeader getMyHeader() {
    	return myHeader;
    }

    public final void setMyHeader(MyHeader myHeader) {
    	this.myHeader = myHeader;
    }

    public final Object getBody() {
    	return body;
    }

    public final void setBody(Object body) {
    	this.body = body;
    }

    @Override
    public String toString() {
    	return "MyMessage [myHeader=" + myHeader + "][body="+body+"]";
    }
}

4、启动Netty服务端:

5、启动客户端

接下来就可以通信了,比如,客户端发送消息:
 

到此,springboot集成netty的过程基本结束,后期会详细分析其前后端交互过程,敬请期待。 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐