Springboot+Netty搭建分布式的WebSocket简单集群,后续完善即时通讯聊天系统
springcloud和netty的结合集群,采用nacos注册中心,gateway网关
系列文章目录
第一章:搭建项目框架+集成netty搭建websocket+解决分布式channel问题
文章目录
目录
二、项目划分了common,infrastructure,service三个子模块
这里有个要注意的点:Spring Cloud Gateway 是使用 netty+webflux实现,且webflux与web是冲突的,所以我们要注意maven的依赖
这里有个需要注意的点:channel的id是不要json序列化的,不然后续无法使用,这里的redisService是在common中封装的工具类
注意点:我们的mq中注册了连个队列,netty服务不需要监听多个队列,只需要其中一个,避免重复发送消息
前言
最近学习了netty相关知识,结合前面学过的知识融合一起做个小项目,大佬们欢迎给意见
体验地址:wechat
测试账户:18200891282/123456,18200891283/123456
一、创建一个maven父工程
为了方便所以使用了maven聚合工程,方便项目的依赖控制
1.用idea创建个maven项目,下面是父工程的pom,主要包括springcloud,ssm和一些组件的依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.linjiahao</groupId>
<artifactId>webChat</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>service</module>
<module>common</module>
<module>infrastructure</module>
</modules>
<name>webChat</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
<onlien_education.version>0.0.1-SNAPSHOT</onlien_education.version>
<mybatis-plus.version>3.0.5</mybatis-plus.version>
<jodatime.version>2.10.1</jodatime.version>
<poi.version>3.17</poi.version>
<commons-fileupload.version>1.3.1</commons-fileupload.version>
<commons-io.version>2.6</commons-io.version>
<httpclient.version>4.5.1</httpclient.version>
<jwt.version>3.4.1</jwt.version>
<fastjson.version>>1.2.76</fastjson.version>
<gson.version>2.8.2</gson.version>
<json.version>20170516</json.version>
<commons-dbutils.version>1.7</commons-dbutils.version>
<canal.client.version>1.1.0</canal.client.version>
<docker.image.prefix>zx</docker.image.prefix>
<cloud-alibaba.version>0.2.2.RELEASE</cloud-alibaba.version>
</properties>
<dependencyManagement>
<dependencies>
<!--Spring Cloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--mybatis-plus 持久层-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<!--日期时间工具-->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${jodatime.version}</version>
</dependency>
<!--xls-->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>${poi.version}</version>
</dependency>
<!--xlsx-->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>${poi.version}</version>
</dependency>
<!--文件上传-->
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>${commons-fileupload.version}</version>
</dependency>
<!--commons-io-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<!--httpclient-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<!-- JWT -->
<dependency>
<groupId>com.auth0</groupId>
<artifactId>java-jwt</artifactId>
<version>${jwt.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>${json.version}</version>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>${commons-dbutils.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.client.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
二、项目划分了common,infrastructure,service三个子模块
三、项目的环境搭建
目前使用到的中间件有nacos,rabbitMq,redis后续还有需要所以加上mysql,自己可以在windows安装这些中间件,有云服务器最好。这里就不拓展安装的步骤,注意服务器开放端口
传送门:
centos安装Java:Centos安装Java - 简书
centos安装docker:centos7安装Docker详细步骤(无坑版教程) - 云+社区 - 腾讯云
centos中docker安装mysql:CentOS7利用docker安装MySQL5.7 - 秦浩铖 - 博客园
centos中docker安装redis:CentOS7使用Docker安装Redis图文教程_命硬的博客-CSDN博客_centos docker redis
centos中安装nacos:centos 安装 nacos_wangooo的博客-CSDN博客_centos7 nacos安装
centos中安装rabbitmq:CentOS使用Docker安装RabbitMq - 简书
四、gateway初始化
1.Spring Cloud Gateway支持websocket协议,而且目前分布式比较常用的网关,后续还有很多业务的拓展,所以单独把gateway拎出来为一个module
2.在infrastructure下新建一个module:gateway,pom依赖主要:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>infrastructure</artifactId>
<groupId>com.linjiahao</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>gateway</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--gson-->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<!--服务调用-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
</dependencies>
<build>
<finalName>gateway</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
这里有个要注意的点:Spring Cloud Gateway 是使用 netty+webflux实现,且webflux与web是冲突的,所以我们要注意maven的依赖
gateway的项目结构:
以上主要是对跨域进行处理,还有全局错误统一返回,application的配置如下:naocs的注册ip和端口可以更换成自己的,路由规则目前只有一个
server:
port: 8201
spring:
application:
name: service-gateway
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
gateway:
discovery:
locator:
enabled: true
routes:
- id: nettyService
uri: lb://nettyService
predicates:
- Path=/ws/**
五、common项目初始化
1.首先想到项目中使用到redis,所以封装一个全局使用的redis工具类
common的pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>webChat</artifactId>
<groupId>com.linjiahao</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>common</artifactId>
<packaging>pom</packaging>
<modules>
<module>service-base</module>
</modules>
<dependencies>
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- spring2.X集成redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency>
<!--mybatis-plus-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<scope>provided </scope>
</dependency>
<!--lombok用来简化实体类:需要安装lombok插件-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided </scope>
</dependency>
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>
2.新增一个common的子模块service-base,项目结构
service-base的pom依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>common</artifactId>
<groupId>com.linjiahao</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>webChat-service-base</artifactId>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
<dependency>
<groupId>com.auth0</groupId>
<artifactId>java-jwt</artifactId>
</dependency>
</dependencies>
</project>
六、netty-webscoket的搭建
终于到重点服务的环节了,在service中创建一个service-nettyOne的module,主要的netty依赖:
<dependencies>
<!-- netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
</dependencies>
netty的集成采用在启动类中加载netty服务,启动类实现CommandLineRunner,重写run方法来启动netty服务
1.nettyServer,netty服务的服务配置和加载
@Component
@Slf4j
public class NettyServer {
@Value("${netty.port}")
private int port;
@Value("${netty.name}")
private String name;
@Value("${spring.cloud.nacos.discovery.server-addr}")
private String nacosServer;
@Resource
private WebSockerHandler webSockerHandler;
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024);
sb.group(group, bossGroup) // 绑定线程池
.channel(NioServerSocketChannel.class) // 指定使用的channel
.localAddress(this.port)// 绑定监听端口
.childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//websocket协议本身是基于http协议的,所以这边也要使用http解编码器
ch.pipeline().addLast(new HttpServerCodec());
//以块的方式来写的处理器
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpObjectAggregator(8192));
//添加自定义处理器
ch.pipeline().addLast(webSockerHandler);
}
});
ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
log.info(NettyServer.class + "已启动,正在监听: " + cf.channel().localAddress());
//获取nacos服务
NamingService namingService = NamingFactory.createNamingService(nacosServer);
//将服务注册到注册中心
InetAddress address = InetAddress.getLocalHost();
namingService.registerInstance(name, address.getHostAddress(), Integer.valueOf(port));
log.info("注册到nacos成功");
cf.channel().closeFuture().sync(); // 关闭服务器通道
} finally {
group.shutdownGracefully().sync(); // 释放线程池资源
bossGroup.shutdownGracefully().sync();
}
}
}
这里主要对netty指定了启动端口,协议,编码处理器,还有webSockerHandler自定义的处理器
重点:netty服务需要集群化和负载均衡,所以我们启动netty服务的同时,要把自身注册到nacos注册中心,后续通过gateway统一请求
2.自定义WebSockerHandler
netty中每一个客户端连接的channel是无法序列化,所以当我们集群之后就会出现一个问题,A,B用户的channel不在同一台服务上,当A给B发送消息时,通过A所在的服务无法将消息送达到B。
解决方案:使用redis+rabbitmq发布订阅模式来完成消息的共享
- netty中使用DefaultChannelGroup管理当前连接的channel,通过channel的id可以查找使用
- 每一个websocket连接都需要关联userId,所以自定义handle来处理连接前绑定userId和channel的id存放在redis中
/** * @ClassName ChannelHandlerPool * @Description: netty通道映射 * @Author linjiahao * @Date 23/12/2021 **/ public class ChannelHandlerPool { //channelGroup通道组 public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); //可以存储userId与ChannelId的映射表 public static String channelUserKey = "netty:user:channel"; }
websocket在连接前是先通过http请求再升级websocket协议的,我们需要在这个http前鉴权我们的连接是否有效和关联上面说的netty channel的数据,采用redis的hash表存放用户的通道id
/**
* @ClassName WebSocketAuthHttpHanlder
* @Description: 鉴权
* @Author linjiahao
* @Date 23/12/2021
**/
@Component
@Slf4j
public class WebSocketAuthHttpHanlder {
private WebSocketServerHandshaker handshaker;
@Resource
private RedisService redisService;
@Value("${netty.url}")
private String webSocketUrl;
public final String token = "token";
public String authHandler(ChannelHandlerContext ctx, FullHttpRequest message) {
String uri =message.getUri();
log.info("连接前鉴权的uri:{}",uri);
List<String> list = Arrays.asList(uri.split("\\?"));
Boolean isHaveToken = list.stream().filter(m -> m.contains(token)).findFirst().isPresent();
if(!isHaveToken){
return httpResponseResult(1);
}
List<String> params = Arrays.asList(list.get(1).split("&"));
String authToken = params.stream().filter(m -> m.contains(token)).findFirst().get();
String tokenValue = authToken.split("=")[1];
String userId = JWTUtil.getUserId(tokenValue);
if(!JWTUtil.verify(tokenValue)){
redisService.hdel(ChannelHandlerPool.channelUserKey,userId);
return httpResponseResult(2);
}
log.info("channel的id为:{},用户id为:{}",ctx.channel().id(),userId);
redisService.hset(ChannelHandlerPool.channelUserKey, userId, ctx.channel().id());
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
webSocketUrl, "WebSocket", false,65536 * 10);
handshaker = wsFactory.newHandshaker(message);
if (handshaker == null) {
WebSocketServerHandshakerFactory
.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), message);
}
return BaseConstant.NETTY_AUTH_PASS;
}
private String httpResponseResult(Integer code){
String result = "";
switch (code){
case 1:
result = JSON.toJSONString(ErrorMessage.USER_NOT_LOGIN);
break;
case 2:
result = JSON.toJSONString(ErrorMessage.TOKEN_IS_NOT_VALIDATE);
break;
default:
result = JSON.toJSONString(ErrorMessage.SYSTEM_EXCEPTION);
}
return result;
}
}
这里有个需要注意的点:channel的id是不要json序列化的,不然后续无法使用,这里的redisService是在common中封装的工具类
处理完用户和channel的关联,我们就可以简单进行通信了,仅限在单机版中,所以我使用rabbitmq的发布订阅模式来广播所有的netty服务,当我们需要发送的channelId不在本服务中,就将本次消息通过mq发布出去,每个netty服务去消费这个message
自定义WebSockerHandler继承SimpleChannelInboundHandler,重写channelActive(),channelInactive(),channelRead0()这三个方法,注意SimpleChannelInboundHandler是需要指定接收客户端的泛型的!
@Component
@Slf4j
@ChannelHandler.Sharable
public class WebSockerHandler extends SimpleChannelInboundHandler<Object> {
@Autowired
private WebSocketAuthHttpHanlder authHttpHanlder;
@Resource
private RedisService redisService;
@Resource
private RabbitPublisher rabbitPublisher;
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("与客户端建立连接,通道开启,id:{}", ctx.channel().id());
ChannelHandlerPool.channelGroup.add(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("与客户端断开连接,通道关闭,id:{}", ctx.channel().id());
ChannelHandlerPool.channelGroup.remove(ctx.channel());
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object message) {
if (message instanceof FullHttpRequest){
//以http请求形式接入,但是走的是websocket,首次需要鉴权
String authMessage = authHttpHanlder.authHandler(ctx, (FullHttpRequest) message);
log.info("鉴权结果:{}",authMessage);
if(!authMessage.equals(BaseConstant.NETTY_AUTH_PASS)){
ctx.channel().writeAndFlush(new TextWebSocketFrame(authMessage));
ctx.close();
}
}else if (message instanceof TextWebSocketFrame){
//处理websocket客户端的消息
JSONObject jsonObject = JSON.parseObject(((TextWebSocketFrame) message).text());
log.info("接收到的消息为:{}",jsonObject.toJSONString());
String userId = jsonObject.getString("userId");
String sendMessage = jsonObject.getString("message");
Object channleId = redisService.hget(ChannelHandlerPool.channelUserKey, userId);
Channel channel = ChannelHandlerPool.channelGroup.find((ChannelId)channleId);
if(channel!=null){
channel.writeAndFlush(new TextWebSocketFrame(sendMessage));
}else {
ChatMessage chatMessage = new ChatMessage();
chatMessage.setUserId(userId);
chatMessage.setMessage(sendMessage);
chatMessage.setChannelId(channleId);
rabbitPublisher.send(JSON.toJSONString(chatMessage));
}
ctx.channel().writeAndFlush(new TextWebSocketFrame("发送成功"));
}
}
}
处理前的token采用jwt生成:
@Slf4j
public class JWTUtil {
private static final String COSNT_JWT_SECRET = "linjiahao_~!@#$%^&*()_+";
// ****************** 时间/秒 ************************//
public static final long COSNT_one_minute = 60;
public static final long CONST_half_hour = COSNT_one_minute * 30;
public static final long CONST_one_hour = CONST_half_hour * 2;
public static final long CONST_one_day = CONST_one_hour * 24;
public static final long CONST_one_week = CONST_one_day * 7;
public static final long CONST_half_month = CONST_one_week * 2;
// 校验 token是否正确
public static boolean verify(String token) {
try {
JWTVerifier verifier = JWT.require(Algorithm.HMAC256(COSNT_JWT_SECRET)).build();
verifier.verify(token);
return true;
} catch (TokenExpiredException e) {
log.error("token已经过期 {}", e.getMessage());
return false;
} catch (JWTVerificationException e) {
log.error("token无效 {}", e.getMessage());
return false;
}
}
// 从 token中获取用户的userId
public static String getUserId(String token) {
try {
DecodedJWT jwt = JWT.decode(token);
return jwt.getClaim("userId").asString();
} catch (JWTDecodeException e) {
log.error("error:{}", e.getMessage());
return "";
}
}
/**
* 生成过期时间
* @param ttl 多少秒后过期
* @return 过期时间的时间戳,ms
*/
public static long generateExpireTime(long ttl) {
return System.currentTimeMillis() + ttl * 1000;
}
// 生成 token
public static String generateTokenWithUserId(String userId, long expireTime) {
try {
return JWT.create()
.withClaim("userId", userId)
.withExpiresAt(new Date(expireTime))
.sign(Algorithm.HMAC256(COSNT_JWT_SECRET));
} catch (Exception e) {
log.error("error:{}", e.getMessage());
return null;
}
}
public static void main(String[] args) {
String s = generateTokenWithUserId("1234567", generateExpireTime(CONST_one_hour));
System.out.println(s);
}
}
rabbitMq采用发布订阅的模式:
配置:RabbitConfig,
@Configuration
public class RabbitConfig {
/**
* 创建人:张博
* 时间:2018/3/5 上午10:48
* @apiNote 定义自动删除匿名队列
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("netty-chat");
}
/**
* @apiNote 定义自动删除匿名队列
*/
@Bean
public Queue autoDeleteQueue0() {
return new AnonymousQueue();
}
/**
* @apiNote 定义自动删除匿名队列
*/
@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}
/**
* @param fanoutExchange 广播交换器
* @param autoDeleteQueue0 自动删除队列
* @apiNote 把队列绑定到广播交换器
* @return Binding
*/
@Bean
public Binding binding0(FanoutExchange fanoutExchange, Queue autoDeleteQueue0) {
return BindingBuilder.bind(autoDeleteQueue0).to(fanoutExchange);
}
/**
* @param fanoutExchange 广播交换器
* @param autoDeleteQueue1 自动删除队列
* @apiNote 把队列绑定到广播交换器
* @return Binding
*/
@Bean
public Binding binding1(FanoutExchange fanoutExchange, Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(fanoutExchange);
}
}
rabbitPublisher发布者:
@Component
public class RabbitPublisher {
@Autowired
private FanoutExchange fanoutExchange;
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String message) {
rabbitTemplate.convertAndSend(fanoutExchange.getName(), "", message);
}
}
RabbitReceiver消费:
public class RabbitReceiver {
@Resource
private RedisService redisService;
@RabbitListener(queues = "#{autoDeleteQueue0.name}")
public void receiver0(String message) {
log.info("监听到的消息为:{}",message);
ChatMessage chatMessage = JSON.parseObject(message, ChatMessage.class);
Object channleId = redisService.hget(ChannelHandlerPool.channelUserKey, chatMessage.getUserId());
Channel channel = ChannelHandlerPool.channelGroup.find((ChannelId)channleId);
if(channel!=null){
channel.writeAndFlush(new TextWebSocketFrame(chatMessage.getMessage()));
log.info("消费成功");
}
}
}
注意点:我们的mq中注册了连个队列,netty服务不需要监听多个队列,只需要其中一个,避免重复发送消息
service-nettyOne的配置:
spring:
application:
name: nettyOne
datasource:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/netty?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
username: root
password: 123456
hikari:
connection-test-query: SELECT 1
connection-timeout: 600000
idle-timeout: 500000
max-lifetime: 540000
minimum-idle: 10
maximum-pool-size: 12
pool-name: GuliHikariPool
redis:
database: 0 # 数据库索引
host: 1.14.208.155
port: 6379
password: linjiahao
timeout: 1000
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
server:
port: 8211
netty:
name: nettyService
port: 8212
url: ws://127.0.0.1:8212/ws
模拟netty集群版:
我们复制一个service-nettyOne的工程的,修改里面的netty启动端口
方便测试的前端websocket:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>本地websocket测试</title>
<meta name="robots" content="all" />
<meta name="keywords" content="本地,websocket,测试工具" />
<meta name="description" content="本地,websocket,测试工具" />
<style>
.btn-group{
display: inline-block;
}
</style>
</head>
<body>
<input type='text' value='通信地址, ws://开头..' class="form-control" style='width:390px;display:inline'
id='wsaddr' />
<div class="btn-group" >
<button type="button" class="btn btn-default" onclick='addsocket();'>连接</button>
<button type="button" class="btn btn-default" onclick='closesocket();'>断开</button>
<button type="button" class="btn btn-default" onclick='$("#wsaddr").val("")'>清空</button>
</div>
<div class="row">
<div id="output" style="border:1px solid #ccc;height:365px;overflow: auto;margin: 20px 0;"></div>
<input type="text" id='message' class="form-control" style='width:810px' placeholder="待发信息" onkeydown="en(event);">
<span class="input-group-btn">
<button class="btn btn-default" type="button" onclick="doSend();">发送</button>
</span>
</div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.1.1.min.js"></script>
<script language="javascript" type="text/javascript">
function formatDate(now) {
var year = now.getFullYear();
var month = now.getMonth() + 1;
var date = now.getDate();
var hour = now.getHours();
var minute = now.getMinutes();
var second = now.getSeconds();
return year + "-" + (month = month < 10 ? ("0" + month) : month) + "-" + (date = date < 10 ? ("0" + date) : date) +
" " + (hour = hour < 10 ? ("0" + hour) : hour) + ":" + (minute = minute < 10 ? ("0" + minute) : minute) + ":" + (
second = second < 10 ? ("0" + second) : second);
}
var output;
var websocket;
function init() {
output = document.getElementById("output");
testWebSocket();
}
function addsocket() {
var wsaddr = $("#wsaddr").val();
if (wsaddr == '') {
alert("请填写websocket的地址");
return false;
}
StartWebSocket(wsaddr);
}
function closesocket() {
websocket.close();
}
function StartWebSocket(wsUri) {
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {
onOpen(evt)
};
websocket.onclose = function(evt) {
onClose(evt)
};
websocket.onmessage = function(evt) {
onMessage(evt)
};
websocket.onerror = function(evt) {
onError(evt)
};
}
function onOpen(evt) {
writeToScreen("<span style='color:red'>连接成功,现在你可以发送信息啦!!!</span>");
}
function onClose(evt) {
writeToScreen("<span style='color:red'>websocket连接已断开!!!</span>");
websocket.close();
}
function onMessage(evt) {
writeToScreen('<span style="color:blue">服务端回应 ' + formatDate(new Date()) + '</span><br/><span class="bubble">' +
evt.data + '</span>');
}
function onError(evt) {
writeToScreen('<span style="color: red;">发生错误:</span> ' + evt.data);
}
function doSend() {
var message = $("#message").val();
if (message == '') {
alert("请先填写发送信息");
$("#message").focus();
return false;
}
if (typeof websocket === "undefined") {
alert("websocket还没有连接,或者连接失败,请检测");
return false;
}
if (websocket.readyState == 3) {
alert("websocket已经关闭,请重新连接");
return false;
}
console.log(websocket);
$("#message").val('');
writeToScreen('<span style="color:green">你发送的信息 ' + formatDate(new Date()) + '</span><br/>' + message);
websocket.send(message);
}
function writeToScreen(message) {
var div = "<div class='newmessage'>" + message + "</div>";
var d = $("#output");
var d = d[0];
var doScroll = d.scrollTop == d.scrollHeight - d.clientHeight;
$("#output").append(div);
if (doScroll) {
d.scrollTop = d.scrollHeight - d.clientHeight;
}
}
function en(event) {
var evt = evt ? evt : (window.event ? window.event : null);
if (evt.keyCode == 13) {
doSend()
}
}
</script>
</html>
最后效果:
nacos中有我们启动的四个服务
通过gateway来访问我们的netty服务:
ws://127.0.0.1:8201/ws?token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2NDA1MDk1OTYsInVzZXJJZCI6IjEyMzQ1NiJ9.6zVmY-WvTJmhnrMKeD6K34KiLpyAS-MCON4wdi25dY0
连接到第二个节点service-nettyTwo
ws://127.0.0.1:8201/ws?token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2NDA1MzA1MzMsInVzZXJJZCI6IjEyMzQ1NjcifQ.cdkv98W-W66fWFN5sEUbbMcAz-GJ5mqfNfNlQ3895vM
连接到第一个节点service-nettyOne
发送消息:目前两个用户所在channel不在同一个服务中
消息成功的发送的对应的用户上
总结
以上就是简单的通过netty搭建webscoket,解决了集群和负载均衡的问题,还是有很多去优化的点,后续在拓展功能上完善代码,希望大家多多提出意见:
代码传送门:nettyChatCloud: 采用springclopud gateway+nacos+netty搭建基础的websocket服务,解决集群无法共享channel的问题
更多推荐
所有评论(0)