1、依赖

<!-- netty依赖-->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.0.23.Final</version>
</dependency>

2、搭建 Netty 服务端

Netty服务器:NettyServer

public class NettyServer {

    public NettyServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        new EchoServer(8888).start(); // 启动
    }

    public void start() {
        // 负责连接请求
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 负责事件响应
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 服务器启动项
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 临时存放已完成三次握手的请求的队列的最大长度。
            // 如果未设置或所设置的值小于1,Java将使用默认值50。
            // 如果大于队列的最大长度,请求会被拒绝
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            // 保持长连接状态
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            //绑定线程池:handler是针对bossGroup,childHandler是针对workerHandler
            serverBootstrap.group(bossGroup, workerGroup)
                    // 选择nioChannel
                    .channel(NioServerSocketChannel.class)
                    // 绑定监听端口
                    .localAddress(this.port)
                    // 绑定客户端连接时候触发操作
                    .childHandler(new ChannelInitializer<SocketChannel>() { 
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            logger.info("新增客户端连接,IP:" + ch.localAddress().getHostName() + ",Port:" + ch.localAddress().getPort());
                            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 22, 2, 1, 0));
                            ch.pipeline().addLast(new ByteArrayEncoder());// 编码(发送数据)
                            ch.pipeline().addLast(new ByteArrayDecoder());// 解码(接受数据)
                            ch.pipeline().addLast(new NettyServerHandler()); // 客户端触发操作
                        }
                    });

            // 服务器异步创建绑定定
            ChannelFuture channelFuture = serverBootstrap.bind().sync();
            //该方法进行阻塞,等待服务端链路关闭之后继续执行。
            //这种模式一般都是使用Netty模块主动向服务端发送请求,然后最后结束才使用
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

Netty处理器:NettyServerHandler

@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	// 是否登出操作
    private Boolean isLogout;

	//region 使用依赖注入
	private static NettyServerHandler nettyServerHandler;
	
    @Autowired
    MyService myService;

    public NettyServerHandler() {
    }

    @PostConstruct
    public void init() {
        nettyServerHandler = this;
    }
	//region 
 
    /**
	 * channel 通道 action 活跃
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("客户端连接成功:" + ctx.channel().localAddress().toString() + " 通道已激活!");
    }
 
    /**
	 * channel 通道 action 不活跃
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.info("客户端断开连接:" + ctx.channel().localAddress().toString() + " 通道不活跃!");
		// channel失效,从Map中移除
        NettyChannelMap.removeValue((SocketChannel) ctx.channel());
        // 关闭流
        ctx.close();
    }
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		logger.info("服务器:收到客户端数据");
		try {
			byte[] msgByte = (byte[]) msg;
            // TODO 解析数据包
            DataPackage dataPackage = nettyServerHandler.myService.baseAnalysis(msgByte);
            // 唯一标识码            
            String uuid = dataPackage.getUuid();
			// 将channel保存到内存
            NettyChannelMap.add(uuid, (SocketChannel) channelHandlerContext.channelHandlerContext.channel());
			// TODO 其他操作
            nettyServerHandler.myService.analysis(msgByte);
			//返回应答
            NettyChannelMap.get(uuid).writeAndFlush(response_msg);
		 } finally {
            ReferenceCountUtil.release(msg);
        }
    }
	
	/**
     * 读取完毕客户端发送过来的数据之后的操作
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        logger.info("服务端:接收数据处理完毕");
        // 登出:写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。
        if (this.isLogout) {
            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
 
    /**
     * 服务端发生异常的操作:可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        logger.error("服务端:发生异常,异常信息:" + cause.getMessage());
    }
 
}
 

channel 缓存类

@Slf4j
public class NettyChannelMap {
    private static Map<String, SocketChannel> map = new ConcurrentHashMap<String, SocketChannel>();
    public static void add(String clientId, SocketChannel socketChannel) {
        map.put(clientId, socketChannel);
    }

    public static SocketChannel get(String clientId) {
        return map.get(clientId);
    }
    // 从内存中移除SocketChannel
    public static String removeValue(SocketChannel removeChannel) {
        for (Map.Entry entry : map.entrySet()) {
            if (entry.getValue() == removeChannel) {
                map.remove(entry.getKey());
                logger.info("服务端从内存中移除socketChannel对象:" + entry.getKey());
                return entry.getKey().toString();
            }
        }
        return null;
    }
}

启动类:NettyApplication

@SpringBootApplication
public class NettyApplication{
    public static void main(String[] args) {
        SpringApplication springApplication = new SpringApplication(NettyApplication.class);
        springApplication.run(args);
        // netty服务器启动
        try {
            new NettyServer(8888).start(); // 启动
        } catch (Exception e) {
            logger.error("netty服务器启动异常", e);
        }
    }
}

测试工具:mqtt客户端客户端工具.rar(通信猫+MQTTBox)_mqtt客户端,mqttbox下载-其它文档类资源-CSDN下载

附录1:常用的TCP粘包拆包解决方案

通过在消息头中定义长度字段来标示消息的总长度。

大多数的协议在协议头中都会携带长度字段,用于标识消息体或则整包消息的长度。LengthFieldBasedFrameDecoder通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息,只要传入正确的参数,就可以轻松解决“读半包”的问题。

解码:LengthFieldBasedFrameDecoder

  • maxFrameLength:发送数据包的最大长度
  • lengthFieldOffset:长度域的偏移量。长度域位于整个数据包字节数组中的开始下标。
  • lengthFieldLength:长度域的字节数长度。
  • lengthAdjustment:长度域的偏移量矫正。如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。矫正的值为:包长 - 长度域的值 – 长度域偏移 – 长度域长。
  • initialBytesToStrip:丢弃的起始字节数。丢弃处于此索引值前面的字节。
ch.pipeline().addLast("framer", new LengthFieldBasedFrameDecoder(1024, 0, 4, 0,4));

编码:LengthFieldPrepender

  • lengthFieldLength:长度属性的字节长度
  • lengthIncludesLengthFieldLength:false,长度字节不算在总长度中; true,算到总长度中
ch.pipeline().addLast("addLength", new LengthFieldPrepender(4, false));

在生成的数据包中添加一个长度字段,用于记录当前数据包的长度。

这里写图片描述

参考:[netty]--最通用TCP黏包解决方案:LengthFieldBasedFrameDecoder和LengthFieldPrepender_惜暮的博客-CSDN博客

 Netty粘包/半包问题解析_AnEra的博客-CSDN博客

附录2:无法使用@Autowired注入

@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    //region 使用依赖注入
    private static NettyServerHandler nettyServerHandler;
	
    @Autowired
    MyService myService;

    public NettyServerHandler() {}

    @PostConstruct
    public void init() {
        nettyServerHandler = this;
    }
    //region 
}

调用时: 

nettyServerHandler.myService.analysis(msgByte);

参考:Netty handler无法使用@Autowired注入bean_远方丶丶的博客-CSDN博客

附录3:netty 解码器

  • 一次解码器:ByteToMessageDecoder

io.netty.buffer.ByteBuf (原始数据流)-> io.netty.buffer.ByteBuf (用户数据)

内置解码器

LineBasedFrameDecoder
是一个特殊的分隔符解码器,该解码器使用的分隔符为:windows的r\n和类linux的\n。

DelimiterBasedFrameDecoder
是更通用的分隔符解码器,可支持多个分隔符,每个分隔符可为一个或多个字符。

FixedLengthFrameDecoder
按照固定长度frameLength解码出消息帧

LengthFieldBasedFrameDecoder
基于长度字段的消息帧解码器,该解码器可根据数据包中的长度字段动态的解码出消息帧。

  • 二次解码器:MessageToMessageDecoder

io.netty.buffer.ByteBuf (用户数据)-> Java Object
常用的二次解码器,json、Protobuf、xml等

编解码字符串的StringEncoder和StringDecoder

编解码对象的ObjectEncoder和ObjectDecoder

编解码字节码的ByteArrayEncoder和ByteArrayDecoder

编解码Protobuf的ProtobufEncoder和ProtobufDecoder

参考:netty的编解码器介绍_惜暮的博客-CSDN博客

Netty笔记(六)之netty中的解码器_jannals的博客-CSDN博客

附录4:检测空闲连接和超时处理

服务端、客户端添加IdleStateHandler心跳检测处理器,并添加自定义处理类实现userEventTriggered()方法作为超时事件的逻辑处理。

Netty服务器:

Netty的IdleStateHandler心跳机制主要是用来检测远端是否存活,如果不存活或活跃则对空闲连接进行处理避免资源的浪费。

server添加下面代码

//第一个参数设置未读时间,第二个参数设置为未写时间,第三个为都未进行操作的时间,单位秒
ch.pipeline().addLast(new IdleStateHandler(4, 8, 12));
//添加超时检查机制--事件消息捕获类
// 在处理器该userEventTriggered方法中去处理 IdleStateEvent(读 空闲,写空闲,读写空闲)
ch.pipeline().addLast(new HeartbeatServerHandle());

 IdleStateHandler的构造器:

  • readerIdleTimeSeconds:读超时。即当在指定的时间间隔内没有从 Channel 读取到数据时,会触发一个 READER_IDLE 的 IdleStateEvent 事件。
  • writerIdleTimeSeconds:写超时。即当在指定的时间间隔内没有数据写入到 Channel 时,会触发一个 WRITER_IDLE 的 IdleStateEvent 事件。
  • allIdleTimeSeconds:读/写超时。即当在指定的时间间隔内没有读或写操作时,会触发一个 ALL_IDLE 的 IdleStateEvent 事件。

这三个参数默认的时间单位是。 

处理空闲事件处理器:HeartbeatServerHandle 

@Slf4j
public class HeartbeatServerHandle extends SimpleChannelInboundHandler<String> {

    private static ConcurrentHashMap<String,Long> concurrentHashMap = new ConcurrentHashMap<>();

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // 判断事件是否是IdleStateEvent
        if (evt instanceof IdleStateEvent) {
            String evtState = null;
            String key = ctx.channel().id().asLongText();
            Long count = concurrentHashMap.getOrDefault(key, 0L);

            //将该事件消息强转为心跳事件
            IdleStateEvent idleStateHandler = (IdleStateEvent) evt;
            IdleState state = idleStateHandler.state();
            switch(state) {
                case READER_IDLE:
                    evtState = "读空闲";
                    break;
                case WRITER_IDLE:
                    evtState = "写空闲";
                    break;
                case ALL_IDLE:
                    evtState = "读写空闲";
                    count++;
                    break;
                default:
                    break;
            }
            log.info("userEventTriggered-evtState:{}", evtState);
            // 空闲计数达5次, 进行测试连接是否正常
            if (count > 2L){
                ctx.writeAndFlush("测试客户端是否能接收信息")
                    // 发送失败时关闭通道, 在或者可以在达到空闲多少次后, 进行关闭通道
                        .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);  
                concurrentHashMap.remove(key);
                return;
            }
            concurrentHashMap.put(key,count);
        } else {
            // 事件不是一个 IdleStateEvent 的话,就将它传递给下一个处理程序
            super.userEventTriggered(ctx, evt);
        }
    }
}

Netty客户端

设定IdleStateHandler心跳检测每四秒进行一次写检测,如果四秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每四秒向服务端发送一次消息

server添加下面代码

ch.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
ch.pipeline().addLast(new HeartBeatClientHandler());

 处理类

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent){
        IdleStateEvent event = (IdleStateEvent)evt;
         switch (e.state()) {
            case WRITER_IDLE:
                if (curTime<beatTime){
                    curTime++;
                    ctx.writeAndFlush("biubiu");
                }
         default:
            break;
    }
}

参考:Netty网络框架学习笔记-7((心跳)检测空闲连接以及超时) - 懵懵懂懂的猫 - 博客园

netty的编解码、粘包拆包问题、心跳检测机制原理_知识分子_的博客-CSDN博客

Netty学习(五)—IdleStateHandler心跳机制_zhenyutu的博客-CSDN博客_idlestatehandler

附录5:Keepalive

keepalive机制

TCP保活机制,就是为了保证连接的有效性,探测连接的对端是否存活的作用,在间隔一定的时间发探测包,根据回复来确认该连接是否有效。

Netty中直接提供了ChannelOption.SO_KEEPALIVE选项,将其传给ServerBootstrap.childOption方法,即可开启TCP Keepalive功能,配置好相关内核参数后,剩下的交给内核搞定。

如果一方已经关闭或异常终止连接,而另一方却不知道,我们将这样的TCP连接称为半打开的。TCP通过保活定时器(KeepAlive)来检测半打开连接。在高并发的网络服务器中,经常会出现漏掉socket的情况,对应的结果有一种情况就是出现大量的CLOSE_WAIT状态的连接。这个时候,可以通过设置KEEPALIVE选项来解决这个问题。

SO_KEEPALIVE

默认值:

Netty默认关闭该功能,即值为:false 。

启用该功能时,TCP会主动探测空闲连接的有效性。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右(默认的心跳间隔是7200s即2小时)上层没有任何数据传输的情况下,这套机制才会被激活。

ServerBootstrap sb = new ServerBootstrap();
// 保持长连接状态
sb.childOption(ChannelOption.SO_KEEPALIVE, true);

设置SO_KEEPALIVE选项来开启KEEPALIVE,然后通过TCP_KEEPIDLE、TCP_KEEPINTVL和TCP_KEEPCNT设置keepalive的开始时间、间隔、次数等参数。

TCP_KEEPIDLE:在TCP保活打开的情况下,最后一次数据交换到TCP发送第一个保活探测包的间隔,即允许的持续空闲时长,或者说每次正常发送心跳的周期,默认值为7200s(2h)
TCP_KEEPCNT: 在发送第一个保活探测包之后,没有接收到对方确认,继续发送保活探测包次数,默认值为9(次)
TCP_KEEPINTVL:在发送保活探测包之后,没有接收到对方确认,继续发送保活探测包的发送频率,默认值为75s

调整Keepalive参数

环境:centOS7(linux2.4以后),JDK 11,netty-all-4.1.66.Final(4.1.36以后版本)

ServerBootstrap sb = new ServerBootstrap();
// 配置TCP Keepalive参数,将Keepalive空闲时间设为120秒
sb.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPIDLE), 120);
sb.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPINTERVAL), 75);
sb.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPCOUNT), 9);

抓包结果: 

 抓包工具:Wireshark · Go Deep.

参考:

Netty——参数说明 - 曹伟雄 - 博客园

聊聊TCP Keepalive、Netty和Docker - eshizhan - 博客园

Netty有哪些配置TCP心跳参数的方法,都有什么区别? - 墨天轮

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐