系列文章目录

Springboot集成Netty

Springboot集成Rabbitmq

Springboot集成Retry

springboot集成websocket

Springboot集成Redis

springboot整合rabbitmq使用示例


前言

本文介绍了springboot集成netty的配置以及使用方式,同时提供一种方式解决常见的tcp粘包拆包问题。


提示:以下是本篇文章正文内容,下面案例可供参考

一、Netty是什么?

是一个高性能、高可靠性的基于NIO封装的网络应用框架

二、使用步骤

引入库

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
</dependency>

请求消息类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageBean {
    /**
     * 数据长度
     */
    private Integer len;
    /**
     * 通讯数据
     */
    private byte[] content;
    public MessageBean(Object object){
        content =  JSONUtil.toJsonStr(object).getBytes(StandardCharsets.UTF_8);
        len = content.length;
    }
}

Netty配置

@Configuration
@EnableConfigurationProperties
public class NettyConfig {
    @Autowired
    MyNettyProperties myNettyProperties;

    /**
     * boss 线程池
     * 负责客户端连接
     * @return
     */
    @Bean
    public NioEventLoopGroup boosGroup(){
        return new NioEventLoopGroup(myNettyProperties.getBoss());
    }

    /**
     * worker线程池
     * 负责业务处理
     * @return
     */
    @Bean
    public NioEventLoopGroup workerGroup(){
        return  new NioEventLoopGroup(myNettyProperties.getWorker());
    }
    /**
     * 服务器启动器
     * @return
     */
    @Bean
    public ServerBootstrap serverBootstrap(){
        ServerBootstrap serverBootstrap  = new ServerBootstrap();
        serverBootstrap
                .group(boosGroup(),workerGroup())   // 指定使用的线程组
                .channel(NioServerSocketChannel.class) // 指定使用的通道
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,myNettyProperties.getTimeout()) // 指定连接超时时间
                .childHandler(new ServerHandler()); // 指定worker处理器
        return serverBootstrap;
    }

    /**
     * 客户端启动器
     * @return
     */
    @Bean
    public Bootstrap bootstrap(){
        // 新建一组线程池
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup(myNettyProperties.getBoss());
        Bootstrap bootstrap = new Bootstrap();
        bootstrap
                .group(eventExecutors)   // 指定线程组
                .option(ChannelOption.SO_KEEPALIVE, true)
                .channel(NioSocketChannel.class) // 指定通道
                .handler(new ClientHandler()); // 指定处理器
        return bootstrap;
    }
}

服务端

Netty服务端启动器
@Component
@Slf4j
public class ServerBoot {
    @Autowired
    ServerBootstrap serverBootstrap;
    @Resource
    NioEventLoopGroup boosGroup;
    @Resource
    NioEventLoopGroup workerGroup;
    @Autowired
    MyNettyProperties nettyProperties;

    /**
     * 开机启动
     * @throws InterruptedException
     */
    @PostConstruct
    public void start() throws InterruptedException {
        // 绑定端口启动
        serverBootstrap.bind(nettyProperties.getPort()).sync();
        serverBootstrap.bind(nettyProperties.getPortSalve()).sync();
        log.info("启动Netty多端口服务器: {},{}",nettyProperties.getPort(),nettyProperties.getPortSalve());
    }

    /**
     * 关闭线程池
     */
    @PreDestroy
    public void close() throws InterruptedException {
        log.info("优雅得关闭Netty服务器");
        boosGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}
Netty服务端主处理器
public class ServerHandler extends ChannelInitializer<SocketChannel> {
    /**
     * 初始化通道以及配置对应管道的处理器
     * @param socketChannel
     * @throws Exception
     */
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new MessageDecodeHandler());
        pipeline.addLast(new MessageEncodeHandler());
        pipeline.addLast(new ServerListenerHandler());
    }
}


Netty服务端监听消息处理器
@Slf4j
@ChannelHandler.Sharable
public class ServerListenerHandler extends SimpleChannelInboundHandler<MessageBean> {

    /**
     * 客户端上线的时候调用
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("{}客户端连接进来了",ctx.channel().remoteAddress());
        ctx.fireChannelActive();
    }

    /**
     * 客户端掉线的时候调用
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("{}连接断开了",ctx.channel().remoteAddress());
        ctx.fireChannelInactive();
    }


    /**
     * 读取客户端信息
     * @param channelHandlerContext
     * @param messageBean
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageBean messageBean) throws Exception {
        String remoteAddress = channelHandlerContext.channel().remoteAddress().toString();
        log.info("来自客户端{}的消息{}", remoteAddress,new String(messageBean.getContent(), CharsetUtil.UTF_8));
        channelHandlerContext.writeAndFlush(new MessageBean("收到了客户端"+ remoteAddress + "的消息"));
    }

    /**
     * 异常发生时候调用
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("{}连接出异常了",ctx.channel().remoteAddress());
        log.error(ExceptionUtil.printStackTrace((Exception) cause));
        ctx.close();
    }
}

客户端

Netty客户端启动器
@Component
public class ClientBoot {
    @Autowired
    Bootstrap bootstrap;
    @Autowired
    MyNettyProperties myNettyProperties;

    /**
     * 主端口连接
     * @return
     * @throws InterruptedException
     */
    public Channel connect() throws InterruptedException {
        // 连接服务器
        ChannelFuture channelFuture = bootstrap.connect(myNettyProperties.getHost(), myNettyProperties.getPort()).sync();
        // 监听关闭
        Channel channel = channelFuture.channel();
        return channel;
    }
    /**
     * 备用端口连接
     * @return
     * @throws InterruptedException
     */
    public Channel connectSlave() throws InterruptedException {
        // 连接服务器
        ChannelFuture channelFuture = bootstrap.connect(myNettyProperties.getHost(), myNettyProperties.getPort()).sync();
        // 监听关闭
        Channel channel = channelFuture.channel();
        channel.closeFuture().sync();
        return channel;
    }

    /**
     * 发送消息到服务器端
     * @return
     */
    public void sendMsg(MessageBean messageBean) throws InterruptedException {
        connect().writeAndFlush(messageBean);
    }
}

Netty客户端监听消息处理器
@Slf4j
@ChannelHandler.Sharable
public class ClientListenerHandler extends SimpleChannelInboundHandler<MessageBean> {
    /**
     * 服务端上线的时候调用
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("{}连上了服务器",ctx.channel().remoteAddress());
    }

    /**
     * 服务端掉线的时候调用
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("{}断开了服务器",ctx.channel().remoteAddress());
        ctx.fireChannelInactive();
    }


    /**
     * 读取服务端消息
     * @param channelHandlerContext
     * @param messageBean
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageBean messageBean) throws Exception {
        log.info("来自服务端的消息:{}",new String(messageBean.getContent(), CharsetUtil.UTF_8));
        channelHandlerContext.channel().close();
    }

    /**
     * 异常发生时候调用
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("{}连接出异常了",ctx.channel().remoteAddress());
        log.error(ExceptionUtil.printStackTrace((Exception) cause));
        ctx.close();
    }
}

公共

Netty属性配置
@ConfigurationProperties(prefix = "netty")
@Data
@Configuration
public class MyNettyProperties {
    /**
     * boss线程数量 默认为cpu线程数*2
     */
    private Integer boss;
    /**
     * worker线程数量 默认为cpu线程数*2
     */
    private Integer worker;
    /**
     * 连接超时时间 默认为30s
     */
    private Integer timeout = 30000;
    /**
     * 服务器主端口 默认7000
     */
    private Integer port = 7000;
    /**
     * 服务器备用端口 默认70001
     */
    private Integer portSalve = 7001;
    /**
     * 服务器地址 默认为本地
     */
    private String host = "127.0.0.1";
}

编码器
public class MessageEncodeHandler extends MessageToByteEncoder<MessageBean> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MessageBean messageBean, ByteBuf byteBuf) throws Exception {
        byteBuf.writeInt(messageBean.getLen());
        byteBuf.writeBytes(messageBean.getContent());
    }
}

解码器
public class MessageDecodeHandler extends ByteToMessageDecoder {


    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        int len = byteBuf.readInt();
        byte[] content = new byte[len];
        byteBuf.readBytes(content);
        MessageBean messageBean = new MessageBean();
        messageBean.setContent(content);
        messageBean.setLen(len);
        list.add(messageBean);
    }
}


Netty开关注解

Netty客户服务端开关整合注解

@Import(ServerBoot.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EnableNettyServer {
}

Netty客户端开关注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(ClientBoot.class)
public @interface EnableNettyClient {
}

Netty服务端开关注解

@Import(ServerBoot.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EnableNettyServer {
}

项目启动类开启netty注解

@SpringBootApplication
@Slf4j
@EnableNetty
public class SpringBootExampleApplication {
    public static void main(String[] args){
        SpringApplication.run(SpringBootExampleApplication.class, args);
    }
}

配置netty服务器相关信息

# netty配置
netty:
  boss: 1
  worker: 4
  timeout: 6000
  port: 7000
  portSalve: 7001
  host: 127.0.0.1

三、效果展示

浏览器访问: http://localhost:8080/netty/sendStudent?name=张三&age=20

[INFO ] 2022-05-12 14:33:29 [nioEventLoopGroup-2-1:131720:c.g.n.c.h.ClientListenerHandler:27] - /127.0.0.1:7000连上了服务器
[INFO ] 2022-05-12 14:33:29 [nioEventLoopGroup-4-1:131720:c.g.n.s.h.ServerListenerHandler:29] - /127.0.0.1:1823客户端连接进来了
[INFO ] 2022-05-12 14:33:29 [nioEventLoopGroup-4-1:131735:c.g.n.s.h.ServerListenerHandler:54] - 来自客户端/127.0.0.1:1823的消息{"name":"张三","age":20}
[INFO ] 2022-05-12 14:33:29 [nioEventLoopGroup-2-1:131735:c.g.n.c.h.ClientListenerHandler:50] - 来自服务端的消息:收到了客户端/127.0.0.1:1823的消息
[INFO ] 2022-05-12 14:33:29 [nioEventLoopGroup-4-1:131735:c.g.n.s.h.ServerListenerHandler:40] - /127.0.0.1:1823连接断开了
[INFO ] 2022-05-12 14:33:29 [nioEventLoopGroup-2-1:131735:c.g.n.c.h.ClientListenerHandler:37] - /127.0.0.1:7000断开了服务器

四、总结

本文在springboot中集成了netty,并通过一种自定义编解码器方式解决常见的tcp粘包拆包问题,整理成单独模块被其他springboot项目调用。

五、项目源码

使用示例源码地址:
https://gitee.com/teajoy/springboot-modules/tree/master/springboot-example
netty模块源码地址:
https://gitee.com/teajoy/springboot-modules/tree/master/springboot-netty

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐