服务器推送消息到前端实现页面数据实时刷新-分布式Vue+Websocket
背景
项目上有个新的需求,需要在系统数据发生改变时,前端页面要实时刷新页面数据。
简单的方案一:
最简单的方式就是直接在前端页面使用定时器定时刷新数据。
这个方案除非是定时的时间设置很短,否则还是会存在页面刷新不及时的情况。但是如果定时时间设置得过短,一旦客户端使用量变多,整个系统的请求数量会变的非常多,需要消耗许多服务器资源。故放弃这个方案。
方案二:
服务端推送的方式,通过使用Websocket,进入页面时,前端就与服务端建立起socket通道,当系统数据发生改变时,在服务端选中需要刷新的页面的socket会话,主动发送消息到前端,通知前端重新请求数据。
这个方案能达到实时刷新的需求,但考虑到的是客户端数量增长上来,建立的socket太多,会不会对占用太多的服务器资源。然后经过自己开发环境的简单测试,建立几千个socket对服务器资源消耗不大,就暂时决定用这个方案了。最终效果如何还是要 看生产环境的表现的。
实现
前端
前端使用的是vue框架
在methods中添加websocket相关函数
data(){
return {
ws:null
}
},
methods: {
websocket() {
//建立socket通道
//process.env.VUE_APP_URL为服务端地址
//code为自定义的参数,主要是用于服务端标识当前会话是哪个用户
this.ws = new WebSocket(
'ws:' +
process.env.VUE_APP_URL +
'/websocket?identity=identity'
);
//socket连接成功后的回调函数
this.ws.onopen = () => {
console.log('websocket连接成功!');
//若项目中没有使用nginx转发请求则忽略这步
//设置定时器,每过55秒传一次数据
//以防长时间不通信被nginx自动关闭会话
//也可以通过修改nginx配置文件延长关闭时间
setInterval(() => {
this.keepAlive(ws);
}, 55000);
};
//接收来自服务端消息的回调函数
//fluseData() 为自定义的数据刷新函数
this.ws.onmessage = evt => {
console.log('已接收来自后台的消息:', evt);
// 刷新数据
this.fluseData();
};
//关闭socket的回调函数
this.ws.onclose = function() {
// 关闭 websocket
console.log('连接已关闭...');
};
// 路由跳转时结束websocket链接
this.$router.afterEach(function() {
this.ws.close();
});
},
//持续向后台发送消息,用于维护socket通道不被nginx关闭
keepAlive(webSocket) {
if (webSocket) {
if (webSocket.readyState == webSocket.OPEN) {
webSocket.send('');
}
}
}
}
在页面加载时调用函数建立socket连接
mounted() {
this.websocket();
}
在页面关闭或销毁时关闭socket
beforeDestroy() {
this.ws.close();
},
后端
引入jar包
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-websocket</artifactId>
<version>9.0.38</version>
</dependency>
设置websocket配置类
@Configuration
public class MyWebsocketConfig implements ServerApplicationConfig {
@Override
public Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> set) {
return null;
}
@Override
public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> set) {
return set;
}
}
设置MyWebsocket
@ServerEndpoint(value = "/websocket")
@Component
@Slf4j
public class MyWebsocket {
/**
* 存放每个客户端对应的Session对象。
*/
private static Map<String,Session> socketMap = new ConcurrentHashMap<>();
public MyWebsocket() {
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
try {
String key = getKey(session);
if(key != null){
// 存放 session
socketMap.put(key, session);
sendMessageToUser(key, "后台建立成功!key:"+ key);
log.info("连接成功");
}else{
log.error("socket链接session保存失败!当前key为空");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 根据session获取key,自定义,用于标示每个session
* @param session
* @return
*/
public String getKey(Session session){
String key = null;
try {
String identity = String.valueOf(session.getRequestParameterMap().get("identity").get(0));
key = identity
} catch (Exception e) {
log.error("根据session获取key失败:",e.getMessage());
e.printStackTrace();
}
return key;
}
/**
* 根据identity获取key
* @return
*/
public String getKey(String identity) {
return identity;
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
try {
String key = getKey(session);
if(key != null){
socketMap.remove(key);
}
log.info("有一连接关闭 ");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
String key = getKey(session);
log.info("收到客户端:" + key + "的消息:" + message + " ,当前socket数量:" + socketMap.size());
}
/**
* 发生错误时调用
*/
@OnError
public void onError(Session session, Throwable error) {
onClose(session);
log.error("websocket连接发生错误",e.getMessage());
}
/**
* 发送信息给指定用户
* @param key
* @param message
* @return
*/
public boolean sendMessageToUser(String key) {
if(key == null){
return false;
}
Session session = socketMap.get(key);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.info("发送消息失败----->{}",e.getMessage());
}
return true;
}
}
由于socket的session是不支持序列化的,所以不能将session存在redis,在线上有多台服务器的情况下就无法共享session。
所以这里采用redis的消息订阅功能,当有一台服务器监听到系统数据发生变更,需要向前端发送消息时,会向redis发送消息,然后每台服务器的websocket那边会收到redis的 消息,检查自己拥有的session是否有满足相关条件的,若满足条件则向前端发送消息。
redis配置
redis消息监听
@Configuration
public class RedisPublishConfig {
/**
* redis消息监听器容器
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("testChannel"));
return container;
}
/**
* 消息监听器适配器
* @param redisMsg
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(RedisMsg redisMsg) {
return new MessageListenerAdapter(redisMsg, "receiveMessage");
}
}
redis消息接收接口
@Component
public interface RedisMsg {
/**
* 接收信息
* @param message
*/
public void receiveMessage(String message);
}
在MyWebsocket中继承RedisMsg接口
public class MyWebsocket implements RedisMsg
在MyWebsocket中实习RedisMsg接口的receiveMessage函数
/**
* 广播接收信息
* @param message
*/
@Override
public void receiveMessage(String message) {
JSONObject jsonObject = JSONObject.parseObject(message);
String identity = jsonObject.getString("identity");
String news = jsonObject.getString("message");
if(StringUtils.isNotBlank(identity)){
String key = getKey(identity);
sendMessageToUser(key, news);
}else {
socketMap.forEach((k, v) -> {
sendMessageToUser(k, news);
});
}
}
MyWebsocket类的完整代码
@ServerEndpoint(value = "/websocket")
@Component
@Slf4j
public class MyWebsocket implements RedisMsg {
private static CopyOnWriteArraySet<Session> sessionSet=new CopyOnWriteArraySet<>();
// 这里使用静态,让 service 属于类
private static RedisTemplate redisTemplate;
// 注入的时候,给类的 service 注入
@Autowired
public void setRedisTemplate(RedisTemplate redisTemplate) {
MyWebsocket.redisTemplate = redisTemplate;
}
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
*/
private static Map<String,Session> socketMap = new ConcurrentHashMap<>();
public MyWebsocket() {
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
try {
String key = getKey(session);
if(key != null){
// 存放 session
socketMap.put(key, session);
sendMessageToUser(key, "后台建立成功!key:"+ key + " ,当前socket数量:" + socketMap.size());
log.info("连接成功");
}else{
log.error("socket链接session保存失败!当前key为空");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 根据session获取key
* @param session
* @return
*/
public String getKey(Session session){
String key = null;
try {
String token = String.valueOf(session.getRequestParameterMap().get("token").get(0));
String pageType = String.valueOf(session.getRequestParameterMap().get("pageType").get(0));
Map<String, String> loginContext = SsoStoreManageUtil.getInstance().get(token);
if(loginContext != null){
String userId = loginContext.get("userId");
key = pageType + '_' + userId;
}
} catch (Exception e) {
log.error("根据session获取key失败:",e.getMessage());
e.printStackTrace();
}
return key;
}
/**
* 根据页面类型和用户id获取key
* @param pageType
* @param userId
* @return
*/
public String getKey(String pageType,String userId) {
return pageType + '_' + userId;
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
try {
String key = getKey(session);
if(key != null){
socketMap.remove(key);
}
log.info("有一连接关闭 ,当前socket数量:" + socketMap.size());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
String key = getKey(session);
log.info("收到客户端:" + key + "的消息:" + message + " ,当前socket数量:" + socketMap.size());
}
/**
* 发生错误时调用
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误",error.getMessage());
onClose(session);
error.printStackTrace();
}
/**
* 发送信息给指定用户
* @param key
* @param message
* @return
*/
public boolean sendMessageToUser(String key, String message) {
if(key == null){
return false;
}
Session session = socketMap.get(key);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.info("发送消息失败----->{}",e.getMessage());
}
return true;
}
/**
* 广播接收信息
* @param message
*/
@Override
public void receiveMessage(String message) {
JSONObject jsonObject = JSONObject.parseObject(message);
String identity = jsonObject.getString("identity");
String news = jsonObject.getString("message");
if(StringUtils.isNotBlank(identity)){
String key = getKey(identity);
sendMessageToUser(key, news);
}else {
socketMap.forEach((k, v) -> {
sendMessageToUser(k, news);
});
}
}
}
业务场景下的使用
public class Test{
@Autowired
RedisTemplate redisTemplate;
public void webSocketTest() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("identity","");
jsonObject.put("message","");
// 业务逻辑
//广播消息到各个订阅者 testChannel
redisTemplate.convertAndSend("testChannel", jsonObject.toJSONString());
}
}
更多推荐