使用redis做websocket分布式消息推送服务
websocket实现同一账户多点登录、websocket服务多节点部署推送方案。简单架构图用户A两个地方登录连接到两个websocketServer节点,用户B连接到2节点。消息生产者发布消息的时候应为map格式receive=userid,msg="msg....",将消息推送给对应的userid。1.该项目为springboot项目,先引入w......
应用场景说明:
由于redis并非专业的MQ中间件,消息的防丢失策略并不完整,存在丢失消息的可能。该方案为在再pc web管理平台的右下角弹出,显示新接收到的消息数,哪怕没有收到这个通知,也可以自己在消息中心看看。所以对可靠性要求不高。如果业务场景要求可靠性高,还是请使用专业的MQ中间件。该方案已在多个实际项目中运行。
流程架构:
websocket实现同一账户多点登录、websocket服务多节点部署推送方案。
简单架构图
假设用户A在两个地方登录,连接到两个websocketServer服务节点1和2,用户B连接到2节点。
websocketServer将websocket session保存在各自的Map<String,Session>中,key为userid,value为websocket Session。节点1保存了用户A的websocket session,节点2保存了用户A、B的websocket session。
消息生产者发布消息的时候为json格式,如:[{"receive"="userid_a","msg"="您有1个未读消息"},{"receive"="userid_b","msg"="您有3个未读消息"}],将消息发到redis的一个Channel,如showNewestMsg。
websocketServer中订阅redis的channel=showNewestMsg,收到消息后根据消息中receive冲map中找到对应的websocket session,发消息给客户端。
核心代码:
1.该项目为springboot项目,先引入jar包,由于是从实际项目中抽出来写的记录,可能还缺jar请自行导入。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 工具类 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.6</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
2.websocket配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* spring websocket组件初始化
* @author csf
*
*/
//war包启动tomcat7及以下版本要关闭@Configuration注解,否则将无法启动websocket服务
@Configuration
public class WebSocketConfig
{
@Bean
public ServerEndpointExporter serverEndpointExporter()
{
return new ServerEndpointExporter();
}
}
注意:war包启动tomcat7及以下版本要关闭@Configuration注解,否则将无法启动websocket服务。
3.websocket服务端实现
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.kingengine.plug.service.MessageService;
import cn.hutool.core.util.StrUtil;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
/**
* WebSocket服务类
* @author csf
* @date 2020年8月10日
*/
@ServerEndpoint("/websocket/{custId}")
@Component
public class WebSocketServer
{
@Resource
private MessageService messageService;
Logger log = LoggerFactory.getLogger(this.getClass());
// 当前在线连接数
private static int onlineCount = 0;
// 存放每个用户对应的WebSocket连接对象,key为custId_HHmmss,确保一个登录用户只建立一个连接
private static Map<String, Session> webSocketSessionMap = new ConcurrentHashMap<String, Session>();
// 与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
// 接收用户id
private String custId = "";
private static WebSocketServer webSocketServer;
// 通过@PostConstruct实现初始化bean之前进行的操作
@PostConstruct
public void init()
{
// 初使化时将已静态化的webSocketServer实例化
webSocketServer = this;
webSocketServer.messageService = this.messageService;
}
/**
* 连接建立成功调用的方法
* @param session 连接会话,由框架创建
* @param custId 用户id, 为处理用户多点登录都能收到消息,需传该格式custId_HHmmss
* @author csf
* @date 2020年8月10日
*/
@OnOpen
public void onOpen(Session session, @PathParam("custId") String custId)
{
if (!webSocketSessionMap.containsKey(custId))
{
this.session = session;
webSocketSessionMap.put(custId, session);
addOnlineCount(); // 在线数加1
log.info("有新连接[{}]接入,当前websocket连接数为:{}", custId, getOnlineCount());
}
this.custId = custId;
try
{
// 第一次建立连接,推送消息给客户端,只会执行一次。后续的新消息由com.kingengine.plug.redis.RedisReceiver接收到redis订阅消息推送
// 获取未读消息数
// 由于前端传进来的custId是有时间后缀的,查询时需要去掉后缀。
String qryCustId = custId.split("_")[0];
JSONObject unreadMsg = webSocketServer.messageService.getUnreadCount(qryCustId);
// 获取最新消息
/* JSONObject newMsg = webSocketServer.messageService.getNewestMsg(qryCustId);
// 发送消息
JSONArray msgArr = new JSONArray();
if (newMsg!=null)
{
msgArr.add(newMsg);
}*/
JSONArray msgArr = new JSONArray();
msgArr.add(unreadMsg);
sendMessage(custId, msgArr.toString());
}
catch (Exception e)
{
log.error("客户端连接websocket服务异常");
e.printStackTrace();
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(@PathParam("custId") String sessionKey)
{
if (webSocketSessionMap.containsKey(sessionKey))
{
try
{
webSocketSessionMap.get(sessionKey).close();
webSocketSessionMap.remove(sessionKey);
}
catch (IOException e)
{
log.error("连接[{}]关闭失败。", sessionKey);
e.printStackTrace();
}
subOnlineCount();
log.info("连接[{}]关闭,当前websocket连接数:{}", sessionKey, onlineCount);
}
}
/**
* 接收客户端发送的消息
* @param message 客户端发送过来的消息
* @param session websocket会话
*/
@OnMessage
public void onMessage(String message, Session session)
{
log.info("收到来自客户端" + custId + "的信息:" + message);
}
/**
* 连接错误时触发
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error)
{
try
{
session.close();
}
catch (IOException e)
{
log.error("发生错误,连接[{}]关闭失败。");
e.printStackTrace();
}
// log.error("websocket发生错误");
// error.printStackTrace();
}
/**
* 给指定的客户端推送消息,可单发和群发
* @param sessionKeys 发送消息给目标客户端sessionKey,多个逗号“,”隔开1234,2345...
* @param message
* @throws IOException
* @author csf
* @date 2020年8月11日
*/
public void sendMessage(String sessionKeys, String message)
{
if (StrUtil.isNotBlank(sessionKeys))
{
String[] sessionKeyArr = sessionKeys.split(",");
for (String key : sessionKeyArr)
{
try
{
// 可能存在一个账号多点登录
List<Session> sessionList = getLikeByMap(webSocketSessionMap, key);
for (Session session : sessionList)
{
session.getBasicRemote().sendText(message);
}
}
catch (IOException e)
{
e.printStackTrace();
continue;// 某个客户端发送异常,不影响其他客户端发送
}
}
}
else
{
log.info("sessionKeys为空,没有目标客户端");
}
}
/**
* 给当前客户端推送消息,首次建立连接时调用
*/
public void sendMessage(String message)
throws IOException
{
this.session.getBasicRemote().sendText(message);
}
/**
* 检查webSocket连接是否在线
* @param sesstionKey webSocketMap中维护的key
* @return 是否在线
*/
public static boolean checkOnline(String sesstionKey)
{
if (webSocketSessionMap.containsKey(sesstionKey))
{
return true;
}
else
{
return false;
}
}
/**
* 获取包含key的所有map值
* @param map
* @param keyLike
* @return
* @author csf
* @date 2020年8月13日
*/
private List<Session> getLikeByMap(Map<String, Session> map, String keyLike)
{
List<Session> list = new ArrayList<>();
for (String key : map.keySet())
{
if (key.contains(keyLike))
{
list.add(map.get(key));
}
}
return list;
}
public static synchronized int getOnlineCount()
{
return onlineCount;
}
public static synchronized void addOnlineCount()
{
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount()
{
WebSocketServer.onlineCount--;
}
}
4.redis消息订阅配置
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
@EnableCaching
public class RedisCacheConfig
{
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter)
{
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多个 messageListener,配置不同的交换机
container.addMessageListener(listenerAdapter, new PatternTopic("showNewestMsg"));// 订阅最新消息频道
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(RedisReceiver receiver)
{
// 消息监听适配器
return new MessageListenerAdapter(receiver, "onMessage");
}
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory)
{
return new StringRedisTemplate(connectionFactory);
}
}
5.redis配置,直接放在springboot项目application.properties或application.yml中
# 数据库索引(默认为0)
spring.redis.database=0
spring.redis.host=192.168.1.100
spring.redis.port=6379
spring.redis.password=123456
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.pool.max-idle=8
# 连接池中的最小空闲连接
spring.redis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=5000
6.接收消息生产者发布的消息,推送给对应的客户端
import java.io.UnsupportedEncodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import com.kingengine.plug.websocket.WebSocketServer;
import cn.hutool.core.codec.Base64;
import cn.hutool.core.util.StrUtil;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
/**
* 消息监听对象,接收订阅消息
* @author csf
* @date 2020年8月13日
*/
@Component
public class RedisReceiver implements MessageListener
{
Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
WebSocketServer webSocketServer;
/**
* 处理接收到的订阅消息
*/
@Override
public void onMessage(Message message, byte[] pattern)
{
String channel = new String(message.getChannel());// 订阅的频道名称
String msg = "";
try
{
msg = new String(message.getBody(), "GBK");//注意与发布消息编码一致,否则会乱码
if (StrUtil.isNotBlank(msg)){
if ("showNewestMsg".endsWith(channel))// 最新消息
{
JSONObject json = JSONObject.fromObject(msg);
webSocketServer.sendMessage(json.get("receive"),json.get("msg"));
}else{
//TODO 其他订阅的消息处理
}
}else{
log.info("消息内容为空,不处理。");
}
}
catch (Exception e)
{
log.error("处理消息异常:"+e.toString())
e.printStackTrace();
}
}
}
7.消息发布测试
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import net.sf.json.JSONObject;
@RequestMapping("redis")
@RestController
public class RedisTestController
{
@Autowired
StringRedisTemplate template;
/**
* 发布消息测试
*@param userid
* @param msg
* @return
*/
@PostMapping("sendMessage")
public String sendMessage(String userid,String msg)
{
try
{
String newMessge=new String(msg.getBytes("GBK"),"GBK");
Map<String,String> map = new HashMap<String, String>();
map.put("receive", userid);
map.put("msg", newMessge);
template.convertAndSend("showNewestMsg",
JSONObject.fromObject(map).toString());
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
return "消息发布成功!";
}
}
8.客户端代码
<!DOCTYPE html>
<html>
<head>
<title>WebSocket测试</title>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
</head>
<body>
<div>
来自服务端消息:
<p id="message"></p>
</div>
</body>
<script src="http://apps.bdimg.com/libs/jquery/1.6.4/jquery.min.js"></script>
<script>
let webSocketClient;
if (window.WebSocket)
{
let custid="132456_" + Math.random();//该参数会作为websocketServer中存储session的key,要保证唯一。
webSocketClient = new WebSocket("ws://127.0.0.1:8082/bootapp/websocket/" + custid);
//连通之后的回调事件
webSocketClient.onopen = function () {
webSocketClient.send("这里是地球,收到请回答。。。");
// webSocket.send('{"type":"1","data":"121"}');
};
//接收后台服务端的消息
webSocketClient.onmessage = function (evt) {
console.log("数据已接收:" + evt.data);
showMessage("未读消息:" + evt.data);
};
//连接关闭的回调事件
webSocketClient.onclose = function () {
alert("连接已关闭...");
};
}else{
alert("浏览器不支持websocket");
}
function showMessage(message) {
$("#message").html(message);
}
</script>
</html>
核心代码至此。
更多推荐
所有评论(0)