文章主题

WebSocket常用于做后台消息推送,也可以做简易的IM聊天,由于WebSocket中的Session没有实现序列化接口的,我们无法将session序列化实现分布式部署,今天就来记录一种通过redis订阅和发布实现分布式websocket通信的方案。

实现原理

首先我们讲的这种方式是利用redis订阅和发布模式来实现,大致过程:

  • 每个服务器记录连接,保存在内存当中
  • 当需要推送websocket消息的时候,同时在redis发布一个消息
  • 每个服务器订阅redis的消息,当监听到有消息时,每台服务器遍历自己内存当中的连接进行发送

这样我们就可以实现websocket的分布式部署,当然redis订阅和发布也可以用其他消息队列工具类实现。

实现过程

这里并不打算贴所有代码,只记录关键的一些代码,其余的可以网上查看相关资料,基于SpringBoot2.1.8实现。

pom文件引入redis和websocket

1
2
3
4
5
6
7
8
9
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--websocket-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

写一个redis发布器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class PublishService {
    @Autowired
    StringRedisTemplate redisTemplate;

    /**
     * 发布方法
     *
     * @param channel 消息发布订阅 主题
     * @param message 消息信息
     */
    public void publish(String channel, Object message) {
        redisTemplate.convertAndSend(channel, message);
    }
}

写一个redis监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SubscribeListener implements MessageListener {

    /**
     * 订阅接收发布者的消息
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String msg = new String(message.getBody());
        System.out.println(new String(pattern) + "接收消息:" + msg);
        //遍历本地内存当中的websocket连接...
        //拿到对应的websocket session就可以进行推送消息
    }
   
}

配置下websocket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator{
    @Override
    public void modifyHandshake(ServerEndpointConfig sec,
                                HandshakeRequest request, HandshakeResponse response) {
        // 主要为了能在websocket打开连接时获取httpsession和当前登陆用户,此处跟本文内容没有关系
        HttpSession httpSession=(HttpSession) request.getHttpSession();
        //存入httpsession
        sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
        //存入当前用户
        sec.getUserProperties().put("user", ShiroUtils.getCurrentUser());
        super.modifyHandshake(sec, request, response);

    }

    /**
     * 自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

配置redis消息发布订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * redis 消息监听 用于websocket 分布式处理
 * @param redisConnectionFactory
 * @return
 */
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory){
    RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
    redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
    //设置订阅topic
    redisMessageListenerContainer.addMessageListener(new SubscribeListener(), new ChannelTopic("socket_topic"));
    return redisMessageListenerContainer;
}

再写一个简易的存储工具类,这个就基于ConcurrentHashMap就能实现,不记录了。

最后来看websocket ServerEndpoint 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
    log.debug("websocket:打开连接");
    //将连接存入我们的缓存工具,这个工具就是简单的存储,代码自己写一个
    CacheSessionMap.put("可以是sessionId,也可以实当前用户ID",session);

}

@OnClose
public void onClose(Session session) {
    log.debug("websocket:关闭连接");
    //关闭的连接,我们将其移除
    CacheSessionMap.remove("可以是sessionId,也可以实当前用户ID");
 
}

@OnMessage
public void onMessage(Session session, String message) throws IOException {
    log.debug("websocket:消息来了");
    //用我们之前写的redis消息发布器,将这个消息发布到redis
    publishService.publish("socket_topic", message);
}

关键代码就已经结束了,赶紧试试吧!

github代码地址:GitHub - liguoquan/redis-subscription-publishing: redis的消息订阅和发布

遇到问题(无法订阅)

redis可以成功发布消息,但是实现MessageListener的onMessage方式监听不到发布的消息。通过调试发现:RedisMessageListenerContainer这个bean根本就没有注入。原因在于:依赖spring-data-redis已经注入了RedisMessageListenerContainer对象,自己再去注入的类同名的对象则无法注入。解决方法是对bean进行显示命名,例如@Bean("bean1")。

转载地址:关于WebSocket分布式实现的一种方案 - IT浮云 - 一个专注技术分享的博客

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐