• 假设有这样一个需求,前端需要实时提醒数据库进入的数据,或者监听数据库里面的数据里面有什么变动,需要后端及时的提醒前端消息,这里可以使用websocket实现后端主动推送数据给前端,可以用redis保存每个用户的session,然后分别给不同用户发送不同的消息,这个例子我用的是redis中的发布订阅的功能(subscribe,publish命令)
  • redis中可以用subscribe …(例如 subscribe channer) 订阅了channer这个频道,换句话说就是创建了这个频道,哪个用户订阅了channer这个频道,就可以接收到里面的数据,可以用publish…(例如:sublish channer “往channer频道存入数据”)往频道中发送数据
  • 这里是看别人的代码拉取下来学习看的,这里做一个笔记
  • 下面先看代码
  • 先把项目结构贴出来
    在这里插入图片描述
  • pom.xml文件的依赖
    <dependencies>
<!--        web应用-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
<!--        测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
<!--        操作redis第三方工具-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
<!--        websoket依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>
  • 项目启动的时候就加载,将订阅的频道给创建出来,就是相当于我已经订阅 了这三个频道了(订阅哪几个频道可以自己进行设置)
package com.demo.redisandwebsocket.config;

import com.demo.redisandwebsocket.listener.SubscribeListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;

import java.util.ArrayList;
import java.util.List;

/**
 * 注入spring容器中,在项目启动的时候就会加载,理解成这个类的对象已经创建好了
 * 放在spring容器当中了
 */
@Configuration //相当于xml中的beans
public class RedisConfig {
    /**
     * 需要手动注册RedisMessageListenerContainer加入IOC容器
     * @author lijt
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅频道 的通道
        // 原来的写法   订阅了ZHOU这个频道
//        container.addMessageListener(new MessageListener(){
//            @Override
//            public void onMessage(Message message, byte[] pattern) {
//                String msg = new String(message.getBody());
//                System.out.println(new String(pattern) + "主题发布:" + msg);
//            }
//        }, new PatternTopic("ZHOU"));


        // 改成lambda表达式的写法,更加的简洁   订阅了ZHOU这个频道
//        container.addMessageListener((a,b) -> {
//            String msg = new String(a.getBody());
//            System.out.println(new String(b) + "主题发布:" + msg);
//        },new PatternTopic("ZHOU"));


        /**
         * 上面的有局限性 不能订阅多个频道
         * 这里订阅了三个频道ZHOU  DA  TOU
         */
        List<Topic> list = new ArrayList<>();
        list.add(new PatternTopic("ZHOU"));
        list.add(new PatternTopic("DA"));
        list.add(new PatternTopic("TOU"));
        container.addMessageListener(new SubscribeListener(),list);
        return container;
    }

}

  • 然后自定义SubscribeListener类,实现MessageListener,这个类是监听redis中的频道是否有数据进来了,如果有数据进来这个监听器就会监听到,并且触发里面onMessage方法
package com.demo.redisandwebsocket.listener;

import com.demo.redisandwebsocket.web.WebSocketServer;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

import javax.annotation.Resource;
import javax.websocket.Session;
import java.io.IOException;

/**
 *  描述:消息订阅监听类
 */
public class SubscribeListener implements MessageListener {

    @Resource
    private WebSocketServer webSocketServer;

    private Session session;
    public Session getSession() {   return session;  }
    public void setSession(Session session) {
        this.session = session;
    }

    /**
     * 接收发布者的消息
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String msg = new String(message.getBody());
        System.out.println(new String(pattern) + "主题发布:" + msg);
        if (null != session && session.isOpen()) {
            try {
                session.getBasicRemote().sendText(msg);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

  • 然后是WebSocketConfig类,这个类是初始化websocket的配置,也是初始化的时候将他对象注入到sping容器当中,因为websocket连接是基于http协议连接的,连接上以后就跟http协议没有关系了,然后就基于websocket进行连接了
package com.demo.redisandwebsocket.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 首先要注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。
 * 要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,
 * 因为它将由容器自己提供和管理
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
  • springUtils工具类,目前没有能力解释,希望有人帮我解释一下
package com.demo.redisandwebsocket.util;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.stereotype.Component;

@Component
public final class SpringUtils implements BeanFactoryPostProcessor {

    private static ConfigurableListableBeanFactory beanFactory; // Spring应用上下文环境

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        SpringUtils.beanFactory = beanFactory;
    }

    public static ConfigurableListableBeanFactory getBeanFactory() {
        return beanFactory;
    }

    /**
     * 获取对象
     *
     * @param name
     * @return Object 一个以所给名字注册的bean的实例
     * @throws org.springframework.beans.BeansException
     *
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException {
        return (T) getBeanFactory().getBean(name);
    }

    /**
     * 获取类型为requiredType的对象
     *
     * @param clz
     * @return
     * @throws org.springframework.beans.BeansException
     *
     */
    public static <T> T getBean(Class<T> clz) throws BeansException {
        T result = (T) getBeanFactory().getBean(clz);
        return result;
    }

    /**
     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name) {
        return getBeanFactory().containsBean(name);
    }

    /**
     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
        return getBeanFactory().isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注册对象的类型
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
        return getBeanFactory().getType(name);
    }

    /**
     * 如果给定的bean名字在bean定义中有别名,则返回这些别名
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
        return getBeanFactory().getAliases(name);
    }

}
  • 这里就是将消息发送到前端的websocket的类了,里面有四个方法(四个注解)
package com.demo.redisandwebsocket.web;

import com.demo.redisandwebsocket.listener.SubscribeListener;
import com.demo.redisandwebsocket.util.SpringUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,
 * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
 * 使用springboot的唯一区别是要@Component声明下,而使用独立容器是由容器自己管理websocket的,但在springboot中连容器都是spring管理的。
 * 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
 */
@Component
@ServerEndpoint("/websocket/server")
public class WebSocketServer {
    /**
     * 因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例
     */
    private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);

    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
     private static  AtomicInteger onlineCount=new AtomicInteger(0);
     //concurrent包的线程安全Set,用来存放每个客户端对应的webSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
     private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

     //与某个客户端的连接会话,需要通过它来给客户端发送数据
     private Session session;

     private SubscribeListener subscribeListener;

    /**
     * 连接建立成功调用的方法
     * @param session  可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @OnOpen
    public void onOpen(Session session){
        this.session = session;
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在线数加1
        System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
        subscribeListener = new SubscribeListener();
        subscribeListener.setSession(session);
        List<Topic> list = new ArrayList<>();
        list.add(new PatternTopic("ZHOU"));
        list.add(new PatternTopic("DA"));
        list.add(new PatternTopic("TOU"));
        //设置订阅topic
        redisMessageListenerContainer.addMessageListener(subscribeListener, list);
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() throws IOException {
        webSocketSet.remove(this);  //从set中删除
        subOnlineCount();           //在线数减1
        redisMessageListenerContainer.removeMessageListener(subscribeListener);
        System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("来自客户端的消息:" + message);
        //群发消息
        for(WebSocketServer item: webSocketSet){
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
                continue;
            }
        }
    }

    /**
     * 发生错误时调用
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error){
        System.out.println("发生错误");
        error.printStackTrace();
    }

    /**
     * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    public   int getOnlineCount() {
        return onlineCount.get();
    }

    public   void addOnlineCount() {
        WebSocketServer.onlineCount.getAndIncrement();
    }

    public   void subOnlineCount() {
        WebSocketServer.onlineCount.getAndDecrement();
    }

}

  • 把前端也贴出来
在这里插入代码片<!doctype html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="utf-8"></meta>
    <title>websocket</title>
</head>
<h4>
使用redis订阅消息和websocket实现消息推送
</h4>
<br/>
<h5>收到的订阅消息:</h5>
<div id="message_id"></div>
</body>
<script type="text/javascript">
    var websocket = null;
    //当前浏览前是否支持websocket
    if("WebSocket" in window){
        var url = "ws://localhost:8080/demo/websocket/server";
        websocket = new WebSocket(url);
    }else{
        alert("浏览器不支持websocket");
    }

    websocket.onopen = function(event){
        setMessage("打开连接");
    }

    websocket.onclose = function(event){
        setMessage("关闭连接");
    }

    websocket.onmessage = function(event){
        setMessage(event.data);
    }

    websocket.onerror = function(event){
        setMessage("连接异常");
    }

    //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
    window.onbeforeunload = function(){
        closeWebsocket();
    }

    //关闭websocket
    function closeWebsocket(){
        //3代表已经关闭
        if(3!=websocket.readyState){
            websocket.close();
        }else{
            alert("websocket之前已经关闭");
        }
    }

    //将消息显示在网页上
    function setMessage(message){
        document.getElementById('message_id').innerHTML += message + '<br/>';
    }

</script>
</html>

如果我推送消息到ZHOU或者DA或者TOU,前端就会立马的收到消息
有兴趣的人可以用这个做一个聊天室,因为都是实时提醒的

  • 后面贴出效果图
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
  • 这里需要注意一下,如果在项目初始化的时候并没有创建的频道,消息是推送不进去的
    在这里插入图片描述
  • 这里顺便把配置文件贴出来application.properties
server.port=8080
server.servlet.context-path=/demo
server.tomcat.uri-encoding=utf-8

#reids config
spring.redis.host=127.0.0.1
spring.redis.port=6379

spring.redis.jedis.pool.max-active=30

启动项目访问localhost:8080/demo/websocket.html就可以测试了
到这里就已经结束了
这个是一个redis中的(消息的发布/订阅)和websocket的消息推送整合

源码:https://gitee.com/zhou-datou/redis-websocket

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐