redis订阅发布方式快速实现分布式websocket
文章主题WebSocket常用于做后台消息推送,也可以做简易的IM聊天,由于WebSocket中的Session没有实现序列化接口的,我们无法将session序列化实现分布式部署,今天就来记录一种通过redis订阅和发布实现分布式websocket通信的方案。实现原理首先我们讲的这种方式是利用redis订阅和发布模式来实现,大致过程:每个服务器记录连接,保存在内存当中当需要推送websocket消
文章主题
WebSocket常用于做后台消息推送,也可以做简易的IM聊天,由于WebSocket中的Session没有实现序列化接口的,我们无法将session序列化实现分布式部署,今天就来记录一种通过redis订阅和发布实现分布式websocket通信的方案。
实现原理
首先我们讲的这种方式是利用redis订阅和发布模式来实现,大致过程:
- 每个服务器记录连接,保存在内存当中
- 当需要推送websocket消息的时候,同时在redis发布一个消息
- 每个服务器订阅redis的消息,当监听到有消息时,每台服务器遍历自己内存当中的连接进行发送
这样我们就可以实现websocket的分布式部署,当然redis订阅和发布也可以用其他消息队列工具类实现。
实现过程
这里并不打算贴所有代码,只记录关键的一些代码,其余的可以网上查看相关资料,基于SpringBoot2.1.8实现。
pom文件引入redis和websocket
1 2 3 4 5 6 7 8 9 | <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!--websocket--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> |
写一个redis发布器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | @Component public class PublishService { @Autowired StringRedisTemplate redisTemplate; /** * 发布方法 * * @param channel 消息发布订阅 主题 * @param message 消息信息 */ public void publish(String channel, Object message) { redisTemplate.convertAndSend(channel, message); } } |
写一个redis监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | public class SubscribeListener implements MessageListener { /** * 订阅接收发布者的消息 */ @Override public void onMessage(Message message, byte[] pattern) { String msg = new String(message.getBody()); System.out.println(new String(pattern) + "接收消息:" + msg); //遍历本地内存当中的websocket连接... //拿到对应的websocket session就可以进行推送消息 } } |
配置下websocket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | @Configuration public class WebSocketConfig extends ServerEndpointConfig.Configurator{ @Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { // 主要为了能在websocket打开连接时获取httpsession和当前登陆用户,此处跟本文内容没有关系 HttpSession httpSession=(HttpSession) request.getHttpSession(); //存入httpsession sec.getUserProperties().put(HttpSession.class.getName(),httpSession); //存入当前用户 sec.getUserProperties().put("user", ShiroUtils.getCurrentUser()); super.modifyHandshake(sec, request, response); } /** * 自动注册使用了@ServerEndpoint注解声明的Websocket endpoint * @return */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } } |
配置redis消息发布订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 | /** * redis 消息监听 用于websocket 分布式处理 * @param redisConnectionFactory * @return */ @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory){ RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); //设置订阅topic redisMessageListenerContainer.addMessageListener(new SubscribeListener(), new ChannelTopic("socket_topic")); return redisMessageListenerContainer; } |
再写一个简易的存储工具类,这个就基于ConcurrentHashMap就能实现,不记录了。
最后来看websocket ServerEndpoint 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | @OnOpen public void onOpen(Session session, EndpointConfig config) { log.debug("websocket:打开连接"); //将连接存入我们的缓存工具,这个工具就是简单的存储,代码自己写一个 CacheSessionMap.put("可以是sessionId,也可以实当前用户ID",session); } @OnClose public void onClose(Session session) { log.debug("websocket:关闭连接"); //关闭的连接,我们将其移除 CacheSessionMap.remove("可以是sessionId,也可以实当前用户ID"); } @OnMessage public void onMessage(Session session, String message) throws IOException { log.debug("websocket:消息来了"); //用我们之前写的redis消息发布器,将这个消息发布到redis publishService.publish("socket_topic", message); } |
关键代码就已经结束了,赶紧试试吧!
github代码地址:GitHub - liguoquan/redis-subscription-publishing: redis的消息订阅和发布
遇到问题(无法订阅)
redis可以成功发布消息,但是实现MessageListener的onMessage方式监听不到发布的消息。通过调试发现:RedisMessageListenerContainer这个bean根本就没有注入。原因在于:依赖spring-data-redis已经注入了RedisMessageListenerContainer对象,自己再去注入的类同名的对象则无法注入。解决方法是对bean进行显示命名,例如@Bean("bean1")。
更多推荐
所有评论(0)