websocket+redis动态订阅和动态取消订阅
原理websocket的订阅就是在前后端建立ws连接之后,前端通过发送一定格式的消息,后端解析出来去订阅或者取消订阅redis频道。订阅频道消息格式:{"cmd":"subscribe","topic":["topic_name"]}取消订阅格式{"cmd":"unsubscribe","topic":["topic_name"]}两个核心类,一个是redis的订阅监听类,一个是websock
原理
websocket的订阅就是在前后端建立ws连接之后,前端通过发送一定格式的消息,后端解析出来去订阅或者取消订阅redis频道。
订阅频道消息格式:
{
"cmd":"subscribe",
"topic":[
"topic_name"
]
}
模糊订阅格式
{
"cmd":"psubscribe",
"topic":[
"topic_name"
]
}
取消订阅格式
{
"cmd":"unsubscribe",
"topic":[
"topic_name"
]
}
两个核心类,一个是redis的订阅监听类,一个是websocket的发布订阅类。
redis订阅监听类
package com.curtain.core;
import com.curtain.config.GetBeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
import java.util.Arrays;
/**
* @Author Curtain
* @Date 2021/6/7 14:27
* @Description
*/
@Component
@Slf4j
public class RedisPubSub extends JedisPubSub {
private JedisPool jedisPool = GetBeanUtil.getBean(JedisPool.class);
private Jedis jedis;
//订阅
public void subscribe(String... channels) {
jedis = jedisPool.getResource();
try {
jedis.subscribe(this, channels);
} catch (Exception e) {
log.error(e.getMessage());
if (jedis != null)
jedis.close();
//遇到异常后关闭连接重新订阅
log.info("监听遇到异常,四秒后重新订阅频道:");
Arrays.asList(channels).forEach(s -> {log.info(s);});
try {
Thread.sleep(4000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
subscribe(channels);
}
}
//模糊订阅
public void psubscribe(String... channels) {
Jedis jedis = jedisPool.getResource();
try {
jedis.psubscribe(this, channels);
} catch (ArithmeticException e) {//取消订阅故意造成的异常
if (jedis != null)
jedis.close();
} catch (Exception e) {
log.error(e.getMessage());
if (jedis != null)
jedis.close();
//遇到异常后关闭连接重新订阅
log.info("监听遇到异常,四秒后重新订阅频道:");
Arrays.asList(channels).forEach(s -> {log.info(s);});
try {
Thread.sleep(4000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
psubscribe(channels);
}
}
public void unsubscribeAndClose(String... channels){
unsubscribe(channels);
if (jedis != null && !isSubscribed())
jedis.close();
}
public void punsubscribeAndClose(String... channels){
punsubscribe(channels);
if (jedis != null && !isSubscribed())
jedis.close();
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
log.info("subscribe redis channel:" + channel + ", 线程id:" + Thread.currentThread().getId());
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
log.info("psubscribe redis channel:" + pattern + ", 线程id:" + Thread.currentThread().getId());
}
@Override
public void onPMessage(String pattern, String channel, String message) {
log.info("receive from redis channal: " + channel + ",pattern: " + pattern + ",message:" + message + ", 线程id:" + Thread.currentThread().getId());
WebSocketServer.publish(message, pattern);
WebSocketServer.publish(message, channel);
}
@Override
public void onMessage(String channel, String message) {
log.info("receive from redis channal: " + channel + ",message:" + message + ", 线程id:" + Thread.currentThread().getId());
WebSocketServer.publish(message, channel);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
log.info("unsubscribe redis channel:" + channel);
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
log.info("punsubscribe redis channel:" + pattern);
}
}
ps:
1.jedis监听redis频道的时候如果遇见异常会关闭连接导致后续没有监听该频道,所以这里在subscribe捕获到异常的时候会重新创建一个jedis连接订阅该redis频道。
webSocket订阅推送类
这个类会有两个ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>>
类型类变量,分别存储订阅
和模糊订阅
的信息。
外面一层的String对应的值是topic_name
,里面一层的String对应的值是sessionId
。前端发送过来的消息里面对应的这三类操作其实就是对这两个map里面的。
还有个ConcurrentHashMap<String, RedisPubSub>
类型的变量,存储的是事件-RedisPubSub
,便于取消订阅的时候找到监听该频道(事件)的RedisPubSub对象。
信息进行增加
或者删除
;后端往前端推送数据也会根据不同的topic_name推送到不同的订阅者这边。
package com.curtain.core;
import com.alibaba.fastjson.JSON;
import com.curtain.config.WebsocketProperties;
import com.curtain.service.Cancelable;
import com.curtain.service.impl.TaskExecuteService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Author Curtain
* @Date 2021/5/14 16:49
* @Description
*/
@ServerEndpoint("/ws")
@Component
@Slf4j
public class WebSocketServer {
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
/**
* 存放psub的事件
**/
private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> pWebSocketMap = new ConcurrentHashMap<>();
/**
* 存放topic(pattern)-对应的RedisPubsub
*/
private static volatile ConcurrentHashMap<String, RedisPubSub> redisPubSubMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
private String sessionId = "";
//要注入的对象
private static TaskExecuteService executeService;
private static WebsocketProperties properties;
private Cancelable cancelable;
@Autowired
public void setTaskExecuteService(TaskExecuteService taskExecuteService) {
WebSocketServer.executeService = taskExecuteService;
}
@Autowired
public void setWebsocketProperties(WebsocketProperties properties) {
WebSocketServer.properties = properties;
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
this.session = session;
this.sessionId = session.getId();
//构造推送数据
Map pubHeader = new HashMap();
pubHeader.put("name", "connect_status");
pubHeader.put("type", "create");
pubHeader.put("from", "pubsub");
pubHeader.put("time", new Date().getTime() / 1000);
Map pubPayload = new HashMap();
pubPayload.put("status", "success");
Map pubMap = new HashMap();
pubMap.put("header", pubHeader);
pubMap.put("payload", pubPayload);
sendMessage(JSON.toJSONString(pubMap));
cancelable = executeService.runPeriodly(() -> {
try {
if (cancelable != null && !session.isOpen()) {
log.info("断开连接,停止发送ping");
cancelable.cancel();
} else {
String data = "ping";
ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
session.getBasicRemote().sendPing(payload);
}
} catch (IOException e) {
e.printStackTrace();
}
}, properties.getPeriod());
}
@OnMessage
public void onMessage(String message) {
synchronized (session) {
Map msgMap = (Map) JSON.parse(message);
String cmd = (String) msgMap.get("cmd");
//订阅消息
if ("subscribe".equals(cmd)) {
List<String> topics = (List<String>) msgMap.get("topic");
//本地记录订阅信息
for (int i = 0; i < topics.size(); i++) {
String topic = topics.get(i);
log.info("============================subscribe-start============================");
log.info("sessionId:" + this.sessionId + ",开始订阅:" + topic);
if (webSocketMap.containsKey(topic)) {//有人订阅过了
webSocketMap.get(topic).put(this.sessionId, this);
} else {//之前还没人订阅过,所以需要订阅redis频道
ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
map.put(this.sessionId, this);
webSocketMap.put(topic, map);
new Thread(() -> {
RedisPubSub redisPubSub = new RedisPubSub();
//存入map
redisPubSubMap.put(topic, redisPubSub);
redisPubSub.subscribe(topic);
}).start();
}
log.info("sessionId:" + this.sessionId + ",完成订阅:" + topic);
log();
log.info("============================subscribe-end============================");
}
}
//psubscribe
if ("psubscribe".equals(cmd)) {
List<String> topics = (List<String>) msgMap.get("topic");
//本地记录订阅信息
for (int i = 0; i < topics.size(); i++) {
String topic = topics.get(i);
log.info("============================psubscribe-start============================");
log.info("sessionId:" + this.sessionId + ",开始模糊订阅:" + topic);
if (pWebSocketMap.containsKey(topic)) {//有人订阅过了
pWebSocketMap.get(topic).put(this.sessionId, this);
} else {//之前还没人订阅过,所以需要订阅redis频道
ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
map.put(this.sessionId, this);
pWebSocketMap.put(topic, map);
new Thread(() -> {
RedisPubSub redisPubSub = new RedisPubSub();
//存入map
redisPubSubMap.put(topic, redisPubSub);
redisPubSub.psubscribe(topic);
}).start();
}
log.info("sessionId:" + this.sessionId + ",完成模糊订阅:" + topic);
log();
log.info("============================psubscribe-end============================");
}
}
//取消订阅
if ("unsubscribe".equals(cmd)) {
List<String> topics = (List<String>) msgMap.get("topic");
//删除本地对应的订阅信息
for (String topic : topics) {
log.info("============================unsubscribe-start============================");
log.info("sessionId:" + this.sessionId + ",开始删除订阅:" + topic);
if (webSocketMap.containsKey(topic)) {
ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
map.remove(this.sessionId);
if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
webSocketMap.remove(topic);
redisPubSubMap.get(topic).unsubscribeAndClose(topic);
redisPubSubMap.remove(topic);
}
}
if (pWebSocketMap.containsKey(topic)) {
ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
map.remove(this.sessionId);
if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
pWebSocketMap.remove(topic);
redisPubSubMap.get(topic).punsubscribeAndClose(topic);
redisPubSubMap.remove(topic);
}
}
log.info("sessionId:" + this.sessionId + ",完成删除订阅:" + topic);
log();
log.info("============================unsubscribe-end============================");
}
}
}
}
@OnMessage
public void onPong(PongMessage pongMessage) {
try {
log.debug(new String(pongMessage.getApplicationData().array(), "utf-8") + "接收到pong");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
synchronized (session) {
log.info("============================onclose-start============================");
//删除订阅
Iterator iterator = webSocketMap.keySet().iterator();
while (iterator.hasNext()) {
String topic = (String) iterator.next();
ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
map.remove(this.sessionId);
if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
webSocketMap.remove(topic);
redisPubSubMap.get(topic).unsubscribeAndClose(topic);
redisPubSubMap.remove(topic);
}
}
//删除模糊订阅
Iterator iteratorP = pWebSocketMap.keySet().iterator();
while (iteratorP.hasNext()) {
String topic = (String) iteratorP.next();
ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
map.remove(this.sessionId);
if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
pWebSocketMap.remove(topic);
redisPubSubMap.get(topic).punsubscribeAndClose(topic);
redisPubSubMap.remove(topic);
}
}
log.info("sessionId:" + this.sessionId + ",断开连接:");
//debug
log();
log.info("============================onclose-end============================");
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
synchronized (session) {
log.info("============================onError-start============================");
log.error("用户错误,sessionId:" + session.getId() + ",原因:" + error.getMessage());
error.printStackTrace();
log.info("关闭错误用户对应的连接");
//删除订阅
Iterator iterator = webSocketMap.keySet().iterator();
while (iterator.hasNext()) {
String topic = (String) iterator.next();
ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
map.remove(this.sessionId);
if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
webSocketMap.remove(topic);
redisPubSubMap.get(topic).unsubscribeAndClose(topic);
redisPubSubMap.remove(topic);
}
}
//删除模糊订阅
Iterator iteratorP = pWebSocketMap.keySet().iterator();
while (iteratorP.hasNext()) {
String topic = (String) iteratorP.next();
ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
map.remove(this.sessionId);
if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
pWebSocketMap.remove(topic);
redisPubSubMap.get(topic).punsubscribeAndClose(topic);
redisPubSubMap.remove(topic);
}
}
log.info("完成错误用户对应的连接关闭");
//debug
log();
log.info("============================onError-end============================");
}
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) {
synchronized (session) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void publish(String msg, String topic) {
ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
if (map != null && map.values() != null) {
for (WebSocketServer webSocketServer : map.values())
webSocketServer.sendMessage(msg);
}
map = pWebSocketMap.get(topic);
if (map != null && map.values() != null) {
for (WebSocketServer webSocketServer : map.values())
webSocketServer.sendMessage(msg);
}
}
private void log() {
log.info("<<<<<<<<<<<完成操作后,打印订阅信息开始>>>>>>>>>>");
Iterator iterator1 = webSocketMap.keySet().iterator();
while (iterator1.hasNext()) {
String topic = (String) iterator1.next();
log.info("topic:" + topic);
Iterator iterator2 = webSocketMap.get(topic).keySet().iterator();
while (iterator2.hasNext()) {
String session = (String) iterator2.next();
log.info("订阅" + topic + "的sessionId:" + session);
}
}
log.info("<<<<<<<<<<<完成操作后,打印订阅信息结束>>>>>>>>>>");
}
}
项目地址
上面介绍了核心代码,下面是完整代码地址
https://github.com/Curtain-Wang/websocket-redis-subscribe.git
Update20220415
参考评论区老哥的建议,将redis订阅监听类
里面的subscribe
和psubscribe
方法调整如下:
//订阅
@Override
public void subscribe(String... channels) {
boolean done = true;
while (done){
Jedis jedis = jedisPool.getResource();
try {
jedis.subscribe(this, channels);
done = false;
} catch (Exception e) {
log.error(e.getMessage());
if (jedis != null)
jedis.close();
//遇到异常后关闭连接重新订阅
log.info("监听遇到异常,四秒后重新订阅频道:");
Arrays.asList(channels).forEach(s -> {log.info(s);});
try {
Thread.sleep(4000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
//模糊订阅
@Override
public void psubscribe(String... channels) {
boolean done = true;
while (done){
Jedis jedis = jedisPool.getResource();
try {
jedis.psubscribe(this, channels);
done = false;
} catch (Exception e) {
log.error(e.getMessage());
if (jedis != null)
jedis.close();
//遇到异常后关闭连接重新订阅
log.info("监听遇到异常,四秒后重新订阅频道:");
Arrays.asList(channels).forEach(s -> {log.info(s);});
try {
Thread.sleep(4000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
更多推荐
所有评论(0)