服务器推送消息到前端实现页面数据实时刷新-分布式Vue+Websocket

背景

项目上有个新的需求,需要在系统数据发生改变时,前端页面要实时刷新页面数据。

简单的方案一:

  最简单的方式就是直接在前端页面使用定时器定时刷新数据。

  这个方案除非是定时的时间设置很短,否则还是会存在页面刷新不及时的情况。但是如果定时时间设置得过短,一旦客户端使用量变多,整个系统的请求数量会变的非常多,需要消耗许多服务器资源。故放弃这个方案。

方案二:

  服务端推送的方式,通过使用Websocket,进入页面时,前端就与服务端建立起socket通道,当系统数据发生改变时,在服务端选中需要刷新的页面的socket会话,主动发送消息到前端,通知前端重新请求数据。

  这个方案能达到实时刷新的需求,但考虑到的是客户端数量增长上来,建立的socket太多,会不会对占用太多的服务器资源。然后经过自己开发环境的简单测试,建立几千个socket对服务器资源消耗不大,就暂时决定用这个方案了。最终效果如何还是要 看生产环境的表现的。

实现

前端

前端使用的是vue框架

在methods中添加websocket相关函数

    data(){
        return {
            ws:null
        }
    },
    
    methods: {
     websocket() {
                //建立socket通道
                //process.env.VUE_APP_URL为服务端地址
                //code为自定义的参数,主要是用于服务端标识当前会话是哪个用户
                this.ws = new WebSocket(
                    'ws:' +
                        process.env.VUE_APP_URL +
                        '/websocket?identity=identity'
                );
                //socket连接成功后的回调函数
                this.ws.onopen = () => {
                    console.log('websocket连接成功!');
                    //若项目中没有使用nginx转发请求则忽略这步
                    //设置定时器,每过55秒传一次数据
                    //以防长时间不通信被nginx自动关闭会话
                    //也可以通过修改nginx配置文件延长关闭时间
                    setInterval(() => {
                        this.keepAlive(ws);
                    }, 55000);
                };
                //接收来自服务端消息的回调函数
                //fluseData() 为自定义的数据刷新函数
                this.ws.onmessage = evt => {
                    console.log('已接收来自后台的消息:', evt);
                    // 刷新数据
                    this.fluseData();
                };
                //关闭socket的回调函数
                this.ws.onclose = function() {
                    // 关闭 websocket
                    console.log('连接已关闭...');
                };
                // 路由跳转时结束websocket链接
                this.$router.afterEach(function() {
                    this.ws.close();
                });
            },
        //持续向后台发送消息,用于维护socket通道不被nginx关闭  
        keepAlive(webSocket) {
            if (webSocket) {
                if (webSocket.readyState == webSocket.OPEN) {
                    webSocket.send('');
                }
            }
        }
    }

在页面加载时调用函数建立socket连接

mounted() {
    this.websocket();
}

在页面关闭或销毁时关闭socket

beforeDestroy() {
        this.ws.close();
    },

后端

引入jar包

    <dependency>
		<groupId>org.apache.tomcat.embed</groupId>
		<artifactId>tomcat-embed-websocket</artifactId>
		<version>9.0.38</version>
	</dependency>

设置websocket配置类


@Configuration
public class MyWebsocketConfig implements ServerApplicationConfig {
    @Override
    public Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> set) {
        return null;
    }

    @Override
    public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> set) {
        return set;
    }


}

设置MyWebsocket


@ServerEndpoint(value = "/websocket")
@Component
@Slf4j
public class MyWebsocket {

    /**
     * 存放每个客户端对应的Session对象。
     */
    private static Map<String,Session> socketMap = new ConcurrentHashMap<>();


    public MyWebsocket() {
    }


    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        try {
            String key = getKey(session);
            if(key != null){
                // 存放 session
                socketMap.put(key, session);
                sendMessageToUser(key, "后台建立成功!key:"+ key);
                log.info("连接成功");
            }else{
                log.error("socket链接session保存失败!当前key为空");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 根据session获取key,自定义,用于标示每个session
     * @param session
     * @return
     */
    public String getKey(Session session){
        String key = null;
        try {
            String identity = String.valueOf(session.getRequestParameterMap().get("identity").get(0));
            key = identity
        } catch (Exception e) {
            log.error("根据session获取key失败:",e.getMessage());
            e.printStackTrace();
        }
        return key;
    }

    /**
     * 根据identity获取key
     * @return
     */
    public String getKey(String identity) {
        return identity;
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session) {
        try {
            String key = getKey(session);
            if(key != null){
                socketMap.remove(key);
            }
            log.info("有一连接关闭 ");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        String key = getKey(session);
        log.info("收到客户端:" + key + "的消息:" + message + " ,当前socket数量:" + socketMap.size());
    }


    /**
     * 发生错误时调用
     */
    @OnError
    public void onError(Session session, Throwable error) {
        onClose(session);
        log.error("websocket连接发生错误",e.getMessage());
    }

    /**
     * 发送信息给指定用户
     * @param key
     * @param message
     * @return
     */
    public boolean sendMessageToUser(String key) {
        if(key == null){
            return false;
        }
        Session session = socketMap.get(key);
        if(session==null) {
            return false;
        }
        if (!session.isOpen()) {
            return false;
        }
        try {
            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.info("发送消息失败----->{}",e.getMessage());
        }
        return true;
    }
}

  由于socket的session是不支持序列化的,所以不能将session存在redis,在线上有多台服务器的情况下就无法共享session。

  所以这里采用redis的消息订阅功能,当有一台服务器监听到系统数据发生变更,需要向前端发送消息时,会向redis发送消息,然后每台服务器的websocket那边会收到redis的 消息,检查自己拥有的session是否有满足相关条件的,若满足条件则向前端发送消息。

redis配置

redis消息监听

@Configuration
public class RedisPublishConfig {
    /**
     * redis消息监听器容器
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("testChannel"));
        return container;
    }

    /**
     * 消息监听器适配器
     * @param redisMsg
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter(RedisMsg redisMsg) {
        return new MessageListenerAdapter(redisMsg, "receiveMessage");
    }
}

redis消息接收接口


@Component
public interface RedisMsg {
    /**
     * 接收信息
     * @param message
     */
    public void receiveMessage(String message);
}

在MyWebsocket中继承RedisMsg接口

public class MyWebsocket implements RedisMsg 

在MyWebsocket中实习RedisMsg接口的receiveMessage函数


    /**
     * 广播接收信息
     * @param message
     */
    @Override
    public void receiveMessage(String message) {
        JSONObject jsonObject = JSONObject.parseObject(message);
        String identity = jsonObject.getString("identity");
        String news = jsonObject.getString("message");
        if(StringUtils.isNotBlank(identity)){
            String key = getKey(identity);
            sendMessageToUser(key, news);
        }else {
            socketMap.forEach((k, v) -> {
                sendMessageToUser(k, news);
            });
        }
    }

MyWebsocket类的完整代码


@ServerEndpoint(value = "/websocket")
@Component
@Slf4j
public class MyWebsocket implements RedisMsg {
    private static CopyOnWriteArraySet<Session> sessionSet=new CopyOnWriteArraySet<>();
    // 这里使用静态,让 service 属于类
    private static RedisTemplate redisTemplate;

    // 注入的时候,给类的 service 注入
    @Autowired
    public void setRedisTemplate(RedisTemplate redisTemplate) {
        MyWebsocket.redisTemplate = redisTemplate;
    }


    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
     */
    private static Map<String,Session> socketMap = new ConcurrentHashMap<>();


    public MyWebsocket() {
    }


    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        try {
            String key = getKey(session);
            if(key != null){
                // 存放 session
                socketMap.put(key, session);
                sendMessageToUser(key, "后台建立成功!key:"+ key + " ,当前socket数量:" + socketMap.size());
                log.info("连接成功");
            }else{
                log.error("socket链接session保存失败!当前key为空");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 根据session获取key
     * @param session
     * @return
     */
    public String getKey(Session session){
        String key = null;
        try {
            String token = String.valueOf(session.getRequestParameterMap().get("token").get(0));
            String pageType = String.valueOf(session.getRequestParameterMap().get("pageType").get(0));
            Map<String, String> loginContext = SsoStoreManageUtil.getInstance().get(token);
            if(loginContext != null){
                String userId = loginContext.get("userId");
                key = pageType + '_' + userId;
            }
        } catch (Exception e) {
            log.error("根据session获取key失败:",e.getMessage());
            e.printStackTrace();
        }
        return key;
    }

    /**
     * 根据页面类型和用户id获取key
     * @param pageType
     * @param userId
     * @return
     */
    public String getKey(String pageType,String userId) {
        return pageType + '_' + userId;
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session) {
        try {
            String key = getKey(session);
            if(key != null){
                socketMap.remove(key);
            }
            log.info("有一连接关闭 ,当前socket数量:" + socketMap.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        String key = getKey(session);
        log.info("收到客户端:" + key + "的消息:" + message + " ,当前socket数量:" + socketMap.size());
    }


    /**
     * 发生错误时调用
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误",error.getMessage());
        onClose(session);
        error.printStackTrace();
    }

    /**
     * 发送信息给指定用户
     * @param key
     * @param message
     * @return
     */
    public boolean sendMessageToUser(String key, String message) {
        if(key == null){
            return false;
        }
        Session session = socketMap.get(key);
        if(session==null) {
            return false;
        }
        if (!session.isOpen()) {
            return false;
        }
        try {
            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.info("发送消息失败----->{}",e.getMessage());
        }
        return true;
    }

    /**
     * 广播接收信息
     * @param message
     */
    @Override
    public void receiveMessage(String message) {
        JSONObject jsonObject = JSONObject.parseObject(message);
        String identity = jsonObject.getString("identity");
        String news = jsonObject.getString("message");
        if(StringUtils.isNotBlank(identity)){
            String key = getKey(identity);
            sendMessageToUser(key, news);
        }else {
            socketMap.forEach((k, v) -> {
                sendMessageToUser(k, news);
            });
        }
    }
}

业务场景下的使用

public class Test{

    @Autowired
    RedisTemplate redisTemplate;

    
    public void webSocketTest()  {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("identity","");
        jsonObject.put("message","");
        //	 业务逻辑
        //广播消息到各个订阅者 testChannel
        redisTemplate.convertAndSend("testChannel", jsonObject.toJSONString());

    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐