系列文章目录

第一章:搭建项目框架+集成netty搭建websocket+解决分布式channel问题

文章目录

目录

系列文章目录

文章目录

前言

一、创建一个maven父工程

二、项目划分了common,infrastructure,service三个子模块

 三、项目的环境搭建

四、gateway初始化

这里有个要注意的点:Spring Cloud Gateway 是使用 netty+webflux实现,且webflux与web是冲突的,所以我们要注意maven的依赖

 五、common项目初始化

六、netty-webscoket的搭建

1.nettyServer,netty服务的服务配置和加载

2.自定义WebSockerHandler

这里有个需要注意的点:channel的id是不要json序列化的,不然后续无法使用,这里的redisService是在common中封装的工具类

处理前的token采用jwt生成:

rabbitMq采用发布订阅的模式:

注意点:我们的mq中注册了连个队列,netty服务不需要监听多个队列,只需要其中一个,避免重复发送消息

模拟netty集群版:

 最后效果:

总结




前言

最近学习了netty相关知识,结合前面学过的知识融合一起做个小项目,大佬们欢迎给意见

体验地址:wechat

测试账户:18200891282/123456,18200891283/123456

一、创建一个maven父工程

为了方便所以使用了maven聚合工程,方便项目的依赖控制

1.用idea创建个maven项目,下面是父工程的pom,主要包括springcloud,ssm和一些组件的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.linjiahao</groupId>
    <artifactId>webChat</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>service</module>
        <module>common</module>
        <module>infrastructure</module>
    </modules>
    <name>webChat</name>
    <description>Demo project for Spring Boot</description>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <onlien_education.version>0.0.1-SNAPSHOT</onlien_education.version>
        <mybatis-plus.version>3.0.5</mybatis-plus.version>
        <jodatime.version>2.10.1</jodatime.version>
        <poi.version>3.17</poi.version>
        <commons-fileupload.version>1.3.1</commons-fileupload.version>
        <commons-io.version>2.6</commons-io.version>
        <httpclient.version>4.5.1</httpclient.version>
        <jwt.version>3.4.1</jwt.version>
        <fastjson.version>>1.2.76</fastjson.version>
        <gson.version>2.8.2</gson.version>
        <json.version>20170516</json.version>
        <commons-dbutils.version>1.7</commons-dbutils.version>
        <canal.client.version>1.1.0</canal.client.version>
        <docker.image.prefix>zx</docker.image.prefix>
        <cloud-alibaba.version>0.2.2.RELEASE</cloud-alibaba.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <!--Spring Cloud-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Hoxton.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${cloud-alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!--mybatis-plus 持久层-->
            <dependency>
                <groupId>com.baomidou</groupId>
                <artifactId>mybatis-plus-boot-starter</artifactId>
                <version>${mybatis-plus.version}</version>
            </dependency>
            <!--日期时间工具-->
            <dependency>
                <groupId>joda-time</groupId>
                <artifactId>joda-time</artifactId>
                <version>${jodatime.version}</version>
            </dependency>
            <!--xls-->
            <dependency>
                <groupId>org.apache.poi</groupId>
                <artifactId>poi</artifactId>
                <version>${poi.version}</version>
            </dependency>
            <!--xlsx-->
            <dependency>
                <groupId>org.apache.poi</groupId>
                <artifactId>poi-ooxml</artifactId>
                <version>${poi.version}</version>
            </dependency>
            <!--文件上传-->
            <dependency>
                <groupId>commons-fileupload</groupId>
                <artifactId>commons-fileupload</artifactId>
                <version>${commons-fileupload.version}</version>
            </dependency>
            <!--commons-io-->
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>${commons-io.version}</version>
            </dependency>

            <!--httpclient-->
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>${httpclient.version}</version>
            </dependency>
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>${gson.version}</version>
            </dependency>
            <!-- JWT -->
            <dependency>
                <groupId>com.auth0</groupId>
                <artifactId>java-jwt</artifactId>
                <version>${jwt.version}</version>
            </dependency>

            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.76</version>
            </dependency>

            <dependency>
                <groupId>org.json</groupId>
                <artifactId>json</artifactId>
                <version>${json.version}</version>

            </dependency>
            <dependency>
                <groupId>commons-dbutils</groupId>
                <artifactId>commons-dbutils</artifactId>
                <version>${commons-dbutils.version}</version>
            </dependency>

            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>${canal.client.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>


二、项目划分了common,infrastructure,service三个子模块

 

  1.  
    common模块主要是项目公用的组件工具类或者公用的基础config

  2. infrastructure模块是gateway网关或者其他中间件服务的

  3. service模块是业务服务的父工程

 三、项目的环境搭建

目前使用到的中间件有nacos,rabbitMq,redis后续还有需要所以加上mysql,自己可以在windows安装这些中间件,有云服务器最好。这里就不拓展安装的步骤,注意服务器开放端口

传送门:

centos安装Java:Centos安装Java - 简书

centos安装docker:centos7安装Docker详细步骤(无坑版教程) - 云+社区 - 腾讯云

centos中docker安装mysql:CentOS7利用docker安装MySQL5.7 - 秦浩铖 - 博客园

centos中docker安装redis:CentOS7使用Docker安装Redis图文教程_命硬的博客-CSDN博客_centos docker redis

centos中安装nacos:centos 安装 nacos_wangooo的博客-CSDN博客_centos7 nacos安装

centos中安装rabbitmq:CentOS使用Docker安装RabbitMq - 简书

四、gateway初始化

1.Spring Cloud Gateway支持websocket协议,而且目前分布式比较常用的网关,后续还有很多业务的拓展,所以单独把gateway拎出来为一个module

2.在infrastructure下新建一个module:gateway,pom依赖主要:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>infrastructure</artifactId>
        <groupId>com.linjiahao</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>gateway</artifactId>

    <dependencies>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-gateway</artifactId>
        </dependency>

        <!--gson-->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>

        <!--服务调用-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
    </dependencies>
    <build>
        <finalName>gateway</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

这里有个要注意的点:Spring Cloud Gateway 是使用 netty+webflux实现,且webflux与web是冲突的,所以我们要注意maven的依赖

gateway的项目结构:

以上主要是对跨域进行处理,还有全局错误统一返回,application的配置如下:naocs的注册ip和端口可以更换成自己的,路由规则目前只有一个

server:
  port: 8201
spring:
  application:
    name: service-gateway
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
    gateway:
      discovery:
        locator:
          enabled: true
      routes:
        - id: nettyService
          uri: lb://nettyService
          predicates:
            - Path=/ws/**

 五、common项目初始化

1.首先想到项目中使用到redis,所以封装一个全局使用的redis工具类

common的pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>webChat</artifactId>
        <groupId>com.linjiahao</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>common</artifactId>
    <packaging>pom</packaging>
    <modules>
        <module>service-base</module>
    </modules>


    <dependencies>
        <!-- redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!-- spring2.X集成redis所需common-pool2-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <scope>provided</scope>
        </dependency>
        <!--mybatis-plus-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <scope>provided </scope>
        </dependency>
        <!--lombok用来简化实体类:需要安装lombok插件-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided </scope>
        </dependency>
        <!-- redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

    </dependencies>

</project>

2.新增一个common的子模块service-base,项目结构

 service-base的pom依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>common</artifactId>
        <groupId>com.linjiahao</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>webChat-service-base</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.1-jre</version>
        </dependency>
        <dependency>
            <groupId>com.auth0</groupId>
            <artifactId>java-jwt</artifactId>
        </dependency>
    </dependencies>


</project>

六、netty-webscoket的搭建

终于到重点服务的环节了,在service中创建一个service-nettyOne的module,主要的netty依赖:

 <dependencies>
        <!--  netty -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.36.Final</version>
        </dependency>

    </dependencies>

netty的集成采用在启动类中加载netty服务,启动类实现CommandLineRunner,重写run方法来启动netty服务

1.nettyServer,netty服务的服务配置和加载

@Component
@Slf4j
public class NettyServer {

    @Value("${netty.port}")
    private int port;

    @Value("${netty.name}")
    private String name;


    @Value("${spring.cloud.nacos.discovery.server-addr}")
    private String nacosServer;

    @Resource
    private WebSockerHandler webSockerHandler;


    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.option(ChannelOption.SO_BACKLOG, 1024);
            sb.group(group, bossGroup) // 绑定线程池
                    .channel(NioServerSocketChannel.class) // 指定使用的channel
                    .localAddress(this.port)// 绑定监听端口
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
                            ch.pipeline().addLast(new HttpServerCodec());
                            //以块的方式来写的处理器
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                            ch.pipeline().addLast(new HttpObjectAggregator(8192));
                            //添加自定义处理器
                            ch.pipeline().addLast(webSockerHandler);
                        }
                    });
            ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
            log.info(NettyServer.class + "已启动,正在监听: " + cf.channel().localAddress());
            //获取nacos服务
            NamingService namingService = NamingFactory.createNamingService(nacosServer);
            //将服务注册到注册中心
            InetAddress address = InetAddress.getLocalHost();
            namingService.registerInstance(name, address.getHostAddress(), Integer.valueOf(port));
            log.info("注册到nacos成功");
            cf.channel().closeFuture().sync(); // 关闭服务器通道
        } finally {
            group.shutdownGracefully().sync(); // 释放线程池资源
            bossGroup.shutdownGracefully().sync();
        }
    }

}

这里主要对netty指定了启动端口,协议,编码处理器,还有webSockerHandler自定义的处理器 

重点:netty服务需要集群化和负载均衡,所以我们启动netty服务的同时,要把自身注册到nacos注册中心,后续通过gateway统一请求

2.自定义WebSockerHandler

netty中每一个客户端连接的channel是无法序列化,所以当我们集群之后就会出现一个问题,A,B用户的channel不在同一台服务上,当A给B发送消息时,通过A所在的服务无法将消息送达到B。

解决方案:使用redis+rabbitmq发布订阅模式来完成消息的共享

  • netty中使用DefaultChannelGroup管理当前连接的channel,通过channel的id可以查找使用
  • 每一个websocket连接都需要关联userId,所以自定义handle来处理连接前绑定userId和channel的id存放在redis中
    /**
     * @ClassName ChannelHandlerPool
     * @Description: netty通道映射
     * @Author linjiahao
     * @Date 23/12/2021
     **/
    public class ChannelHandlerPool {
    
        //channelGroup通道组
        public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        //可以存储userId与ChannelId的映射表
        public static String channelUserKey = "netty:user:channel";
    
    
    }
    
    
    

websocket在连接前是先通过http请求再升级websocket协议的,我们需要在这个http前鉴权我们的连接是否有效和关联上面说的netty channel的数据,采用redis的hash表存放用户的通道id

/**
 * @ClassName WebSocketAuthHttpHanlder
 * @Description: 鉴权
 * @Author linjiahao
 * @Date 23/12/2021
 **/
@Component
@Slf4j
public class WebSocketAuthHttpHanlder {

    private WebSocketServerHandshaker handshaker;

    @Resource
    private RedisService redisService;

    @Value("${netty.url}")
    private String webSocketUrl;

    public final String token = "token";

    public String authHandler(ChannelHandlerContext ctx, FullHttpRequest message) {
        String uri =message.getUri();
        log.info("连接前鉴权的uri:{}",uri);
        List<String> list = Arrays.asList(uri.split("\\?"));
        Boolean isHaveToken = list.stream().filter(m -> m.contains(token)).findFirst().isPresent();
        if(!isHaveToken){
            return httpResponseResult(1);
        }
        List<String> params = Arrays.asList(list.get(1).split("&"));
        String authToken = params.stream().filter(m -> m.contains(token)).findFirst().get();
        String tokenValue = authToken.split("=")[1];
        String userId = JWTUtil.getUserId(tokenValue);
        if(!JWTUtil.verify(tokenValue)){
            redisService.hdel(ChannelHandlerPool.channelUserKey,userId);
            return httpResponseResult(2);
        }
        log.info("channel的id为:{},用户id为:{}",ctx.channel().id(),userId);
        redisService.hset(ChannelHandlerPool.channelUserKey, userId, ctx.channel().id());
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                webSocketUrl, "WebSocket", false,65536 * 10);
        handshaker = wsFactory.newHandshaker(message);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory
                    .sendUnsupportedVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), message);
        }
        return BaseConstant.NETTY_AUTH_PASS;
    }


    private String httpResponseResult(Integer code){
        String result = "";
        switch (code){
            case 1:
                result = JSON.toJSONString(ErrorMessage.USER_NOT_LOGIN);
                break;
            case 2:
                result = JSON.toJSONString(ErrorMessage.TOKEN_IS_NOT_VALIDATE);
                break;
            default:
                result = JSON.toJSONString(ErrorMessage.SYSTEM_EXCEPTION);
        }
        return result;
    }

}


这里有个需要注意的点:channel的id是不要json序列化的,不然后续无法使用,这里的redisService是在common中封装的工具类

处理完用户和channel的关联,我们就可以简单进行通信了,仅限在单机版中,所以我使用rabbitmq的发布订阅模式来广播所有的netty服务,当我们需要发送的channelId不在本服务中,就将本次消息通过mq发布出去,每个netty服务去消费这个message

自定义WebSockerHandler继承SimpleChannelInboundHandler,重写channelActive(),channelInactive(),channelRead0()这三个方法,注意SimpleChannelInboundHandler是需要指定接收客户端的泛型的!

@Component
@Slf4j
@ChannelHandler.Sharable
public class WebSockerHandler extends SimpleChannelInboundHandler<Object> {

    @Autowired
    private WebSocketAuthHttpHanlder authHttpHanlder;

    @Resource
    private RedisService redisService;

    @Resource
    private RabbitPublisher rabbitPublisher;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("与客户端建立连接,通道开启,id:{}", ctx.channel().id());
        ChannelHandlerPool.channelGroup.add(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.info("与客户端断开连接,通道关闭,id:{}", ctx.channel().id());
        ChannelHandlerPool.channelGroup.remove(ctx.channel());
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object message) {
        if (message instanceof FullHttpRequest){
            //以http请求形式接入,但是走的是websocket,首次需要鉴权
            String authMessage = authHttpHanlder.authHandler(ctx, (FullHttpRequest) message);
            log.info("鉴权结果:{}",authMessage);
            if(!authMessage.equals(BaseConstant.NETTY_AUTH_PASS)){
                ctx.channel().writeAndFlush(new TextWebSocketFrame(authMessage));
                ctx.close();
            }
        }else if (message instanceof TextWebSocketFrame){
            //处理websocket客户端的消息
            JSONObject jsonObject = JSON.parseObject(((TextWebSocketFrame) message).text());
            log.info("接收到的消息为:{}",jsonObject.toJSONString());
            String userId = jsonObject.getString("userId");
            String sendMessage = jsonObject.getString("message");
            Object channleId = redisService.hget(ChannelHandlerPool.channelUserKey, userId);
            Channel channel = ChannelHandlerPool.channelGroup.find((ChannelId)channleId);
            if(channel!=null){
                channel.writeAndFlush(new TextWebSocketFrame(sendMessage));
            }else {
                ChatMessage chatMessage = new ChatMessage();
                chatMessage.setUserId(userId);
                chatMessage.setMessage(sendMessage);
                chatMessage.setChannelId(channleId);
                rabbitPublisher.send(JSON.toJSONString(chatMessage));
            }
            ctx.channel().writeAndFlush(new TextWebSocketFrame("发送成功"));
        }

    }



}

处理前的token采用jwt生成:

@Slf4j
public class JWTUtil {

    private static final String COSNT_JWT_SECRET = "linjiahao_~!@#$%^&*()_+";

    // ****************** 时间/秒 ************************//
    public static final long COSNT_one_minute = 60;
    public static final long CONST_half_hour = COSNT_one_minute * 30;
    public static final long CONST_one_hour = CONST_half_hour * 2;
    public static final long CONST_one_day = CONST_one_hour * 24;
    public static final long CONST_one_week = CONST_one_day * 7;
    public static final long CONST_half_month = CONST_one_week * 2;

    // 校验 token是否正确
    public static boolean verify(String token) {
        try {
            JWTVerifier verifier = JWT.require(Algorithm.HMAC256(COSNT_JWT_SECRET)).build();
            verifier.verify(token);
            return true;
        } catch (TokenExpiredException e) {
            log.error("token已经过期 {}", e.getMessage());
            return false;
        } catch (JWTVerificationException e) {
            log.error("token无效 {}", e.getMessage());
            return false;
        }
    }


    // 从 token中获取用户的userId
    public static String getUserId(String token) {
        try {
            DecodedJWT jwt = JWT.decode(token);
            return jwt.getClaim("userId").asString();
        } catch (JWTDecodeException e) {
            log.error("error:{}", e.getMessage());
            return "";
        }
    }

    /**
     * 生成过期时间
     * @param ttl 多少秒后过期
     * @return 过期时间的时间戳,ms
     */
    public static long generateExpireTime(long ttl) {
        return System.currentTimeMillis() + ttl * 1000;
    }

    // 生成 token
    public static String generateTokenWithUserId(String userId, long expireTime) {
        try {
            return JWT.create()
                    .withClaim("userId", userId)
                    .withExpiresAt(new Date(expireTime))
                    .sign(Algorithm.HMAC256(COSNT_JWT_SECRET));
        } catch (Exception e) {
            log.error("error:{}", e.getMessage());
            return null;
        }
    }

    public static void main(String[] args) {
        String s = generateTokenWithUserId("1234567", generateExpireTime(CONST_one_hour));
        System.out.println(s);
    }

}

rabbitMq采用发布订阅的模式:

配置:RabbitConfig,

@Configuration
public class RabbitConfig {


    /**
     * 创建人:张博
     * 时间:2018/3/5 上午10:48
     * @apiNote 定义自动删除匿名队列
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("netty-chat");
    }

    /**
     * @apiNote 定义自动删除匿名队列
     */
    @Bean
    public Queue autoDeleteQueue0() {
        return new AnonymousQueue();
    }

    /**
     * @apiNote 定义自动删除匿名队列
     */
    @Bean
    public Queue autoDeleteQueue1() {
        return new AnonymousQueue();
    }

    /**
     * @param fanoutExchange 广播交换器
     * @param autoDeleteQueue0 自动删除队列
     * @apiNote 把队列绑定到广播交换器
     * @return Binding
     */
    @Bean
    public Binding binding0(FanoutExchange fanoutExchange, Queue autoDeleteQueue0) {
        return BindingBuilder.bind(autoDeleteQueue0).to(fanoutExchange);
    }

    /**
     * @param fanoutExchange 广播交换器
     * @param autoDeleteQueue1 自动删除队列
     * @apiNote 把队列绑定到广播交换器
     * @return Binding
     */
    @Bean
    public Binding binding1(FanoutExchange fanoutExchange, Queue autoDeleteQueue1) {
        return BindingBuilder.bind(autoDeleteQueue1).to(fanoutExchange);
    }

}

rabbitPublisher发布者:

@Component
public class RabbitPublisher {

    @Autowired
    private FanoutExchange fanoutExchange;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String message) {
        rabbitTemplate.convertAndSend(fanoutExchange.getName(), "", message);
    }
}
RabbitReceiver消费:
public class RabbitReceiver {

    @Resource
    private RedisService redisService;

    @RabbitListener(queues = "#{autoDeleteQueue0.name}")
    public void receiver0(String message) {
        log.info("监听到的消息为:{}",message);
        ChatMessage chatMessage = JSON.parseObject(message, ChatMessage.class);
        Object channleId = redisService.hget(ChannelHandlerPool.channelUserKey, chatMessage.getUserId());
        Channel channel = ChannelHandlerPool.channelGroup.find((ChannelId)channleId);
        if(channel!=null){
            channel.writeAndFlush(new TextWebSocketFrame(chatMessage.getMessage()));
            log.info("消费成功");
        }
    }

}

注意点:我们的mq中注册了连个队列,netty服务不需要监听多个队列,只需要其中一个,避免重复发送消息

service-nettyOne的配置:

spring:
  application:
    name: nettyOne
  datasource:
    type: com.zaxxer.hikari.HikariDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/netty?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
    username: root
    password: 123456
    hikari:
      connection-test-query: SELECT 1
      connection-timeout: 600000
      idle-timeout: 500000
      max-lifetime: 540000
      minimum-idle: 10
      maximum-pool-size: 12
      pool-name: GuliHikariPool
  redis:
    database: 0 # 数据库索引
    host: 1.14.208.155
    port: 6379
    password: linjiahao
    timeout: 1000
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
server:
  port: 8211
netty:
  name: nettyService
  port: 8212
  url: ws://127.0.0.1:8212/ws

模拟netty集群版:

我们复制一个service-nettyOne的工程的,修改里面的netty启动端口

方便测试的前端websocket:
 

<!DOCTYPE html>
<html lang="en">
	<head>
		<meta charset="utf-8" />
		<meta http-equiv="X-UA-Compatible" content="IE=edge">
		<meta name="viewport" content="width=device-width, initial-scale=1">
		<title>本地websocket测试</title>
		<meta name="robots" content="all" />
		<meta name="keywords" content="本地,websocket,测试工具" />
		<meta name="description" content="本地,websocket,测试工具" />
		<style>
			.btn-group{
				display: inline-block;
			}
		</style>
	</head>
	<body>
		<input type='text' value='通信地址, ws://开头..' class="form-control" style='width:390px;display:inline'
		 id='wsaddr' />
		<div class="btn-group" >
			<button type="button" class="btn btn-default" onclick='addsocket();'>连接</button>
			<button type="button" class="btn btn-default" onclick='closesocket();'>断开</button>
			<button type="button" class="btn btn-default" onclick='$("#wsaddr").val("")'>清空</button>
		</div>
		<div class="row">
			<div id="output" style="border:1px solid #ccc;height:365px;overflow: auto;margin: 20px 0;"></div>
			<input type="text" id='message' class="form-control" style='width:810px' placeholder="待发信息" onkeydown="en(event);">
			<span class="input-group-btn">
				<button class="btn btn-default" type="button" onclick="doSend();">发送</button>
			</span>
			</div>
		</div>
	</body>		
		
		<script src="https://code.jquery.com/jquery-3.1.1.min.js"></script>
		<script language="javascript" type="text/javascript">
			function formatDate(now) {
				var year = now.getFullYear();
				var month = now.getMonth() + 1;
				var date = now.getDate();
				var hour = now.getHours();
				var minute = now.getMinutes();
				var second = now.getSeconds();
				return year + "-" + (month = month < 10 ? ("0" + month) : month) + "-" + (date = date < 10 ? ("0" + date) : date) +
					" " + (hour = hour < 10 ? ("0" + hour) : hour) + ":" + (minute = minute < 10 ? ("0" + minute) : minute) + ":" + (
						second = second < 10 ? ("0" + second) : second);
			}
			var output;
			var websocket;
 
			function init() {
				output = document.getElementById("output");
				testWebSocket();
			}
 
			function addsocket() {
				var wsaddr = $("#wsaddr").val();
				if (wsaddr == '') {
					alert("请填写websocket的地址");
					return false;
				}
				StartWebSocket(wsaddr);
			}
 
			function closesocket() {
				websocket.close();
			}
 
			function StartWebSocket(wsUri) {
				websocket = new WebSocket(wsUri);
				websocket.onopen = function(evt) {
					onOpen(evt)
				};
				websocket.onclose = function(evt) {
					onClose(evt)
				};
				websocket.onmessage = function(evt) {
					onMessage(evt)
				};
				websocket.onerror = function(evt) {
					onError(evt)
				};
			}
 
			function onOpen(evt) {
				writeToScreen("<span style='color:red'>连接成功,现在你可以发送信息啦!!!</span>");
			}
 
			function onClose(evt) {
				writeToScreen("<span style='color:red'>websocket连接已断开!!!</span>");
				websocket.close();
			}
 
			function onMessage(evt) {
				writeToScreen('<span style="color:blue">服务端回应&nbsp;' + formatDate(new Date()) + '</span><br/><span class="bubble">' +
					evt.data + '</span>');
			}
 
			function onError(evt) {
				writeToScreen('<span style="color: red;">发生错误:</span> ' + evt.data);
			}
 
			function doSend() {
				var message = $("#message").val();
				if (message == '') {
					alert("请先填写发送信息");
					$("#message").focus();
					return false;
				}
				if (typeof websocket === "undefined") {
					alert("websocket还没有连接,或者连接失败,请检测");
					return false;
				}
				if (websocket.readyState == 3) {
					alert("websocket已经关闭,请重新连接");
					return false;
				}
				console.log(websocket);
				$("#message").val('');
				writeToScreen('<span style="color:green">你发送的信息&nbsp;' + formatDate(new Date()) + '</span><br/>' + message);
				websocket.send(message);
			}
 
			function writeToScreen(message) {
				var div = "<div class='newmessage'>" + message + "</div>";
				var d = $("#output");
				var d = d[0];
				var doScroll = d.scrollTop == d.scrollHeight - d.clientHeight;
				$("#output").append(div);
				if (doScroll) {
					d.scrollTop = d.scrollHeight - d.clientHeight;
				}
			}
 
 
			function en(event) {
				var evt = evt ? evt : (window.event ? window.event : null);
				if (evt.keyCode == 13) {
					doSend()
				}
			}
		</script>
 
</html>

 最后效果:

 nacos中有我们启动的四个服务

通过gateway来访问我们的netty服务:

ws://127.0.0.1:8201/ws?token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2NDA1MDk1OTYsInVzZXJJZCI6IjEyMzQ1NiJ9.6zVmY-WvTJmhnrMKeD6K34KiLpyAS-MCON4wdi25dY0

 连接到第二个节点service-nettyTwo

 ws://127.0.0.1:8201/ws?token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2NDA1MzA1MzMsInVzZXJJZCI6IjEyMzQ1NjcifQ.cdkv98W-W66fWFN5sEUbbMcAz-GJ5mqfNfNlQ3895vM

 连接到第一个节点service-nettyOne

发送消息:目前两个用户所在channel不在同一个服务中

 消息成功的发送的对应的用户上

总结

以上就是简单的通过netty搭建webscoket,解决了集群和负载均衡的问题,还是有很多去优化的点,后续在拓展功能上完善代码,希望大家多多提出意见:

代码传送门:nettyChatCloud: 采用springclopud gateway+nacos+netty搭建基础的websocket服务,解决集群无法共享channel的问题

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐