之前有分享过springBoot集成Websocket推送信息。今天主要是来继续分享升级版,这次是采用STOMP协议。用这个的好处有很多,比如可以屏蔽浏览器之间的差异,更方便对接消息中间件等。

一、协议理解

HTTP、WebSocket 等应用层协议,都是基于 TCP 协议来传输数据的。
HTTP不足在于它与服务器的全双工通信依靠轮询实现,对于需要从服务器主动发送数据的情境,会给服务器资源造成很大的浪费,WebSocket是针对HTTP在这种情况下的补充。
对于 WebSocket 来说,它必须依赖 HTTP 协议进行一次握手 ,握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了。
WebSocket是一个完整的应用层协议,包含一套标准的 API 。
STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互。 STOMP协议可以建立在WebSocket之上,也可以建立在其他应用层协议之上。

二、依赖包

<!-- websocket支持 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- 有说需要依赖这个的,我这里实际没有引入
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-messaging</artifactId>
</dependency>
-->

三、能力集成核心代码

websocket用户类


import java.security.Principal;

/**
 * @author zhengwen
 **/
public class WebSocketUser implements Principal {

    /**
     * 用户信息
     */
    private final String name;

    public WebSocketUser(String name) {
        this.name = name;
    }
    @Override
    public String getName() {
        return name;
    }
}

消息对象类
WebSocketMsgVo


import lombok.Data;

import java.time.LocalDateTime;

/**
 * websocket信息vo对象
 *
 * @author zhengwen
 **/
@Data
public class WebSocketMsgVo<T> {
    /**
     * 发送方
     */
    private String from;

    /**
     * 接收方
     */
    private String to;

    /**
     * 时间
     */
    private LocalDateTime time = LocalDateTime.now();

    /**
     * 平台来源
     */
    private String platform;
    /**
     * 主题通道
     */
    private String topicChannel;

    /**
     * 信息业务对象
     */
    private T data;
}

这里data定义为抽象类,由业务系统自行定义。因为我们这是提供能力,所以尽量不要固定死。

配置类


import com.easylinkin.bm.handler.MyHandshakeHandler;
import com.easylinkin.bm.interceptor.MyHandshakeInterceptor;
import com.easylinkin.bm.interceptor.WebSocketUserInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;

/**
 * websocket stomp协议配置类
 *
 * @author zhengwen
 **/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {

    /**
     * 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,
     * 也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
     *
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        /*
         * 1. 将 /stomp/websocketJs路径注册为STOMP的端点,
         *    用户连接了这个端点后就可以进行websocket通讯,支持socketJs
         * 2. setAllowedOriginPatterns("*")表示可以跨域
         * 3. withSockJS()表示支持socktJS访问
         * 4. addInterceptors 添加自定义拦截器,这个拦截器是上一个demo自己定义的获取httpsession的拦截器
         * 5. addInterceptors 添加拦截处理,这里MyPrincipalHandshakeHandler 封装的认证用户信息
         */
        //配置客户端连接地址
        registry.addEndpoint("/stomp/websocketJS").setAllowedOriginPatterns("*").addInterceptors(new MyHandshakeInterceptor()).setHandshakeHandler(new MyHandshakeHandler()).withSockJS();
        /*
         * 添加多个端点
         * 它的实现类是WebMvcStompEndpointRegistry ,
         * addEndpoint是添加到WebMvcStompWebSocketEndpointRegistration的集合中,
         * 所以可以添加多个端点
         */
        registry.addEndpoint("/stomp/websocket");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 自定义调度器,用于控制心跳线程
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 线程池线程数,心跳连接开线程
        taskScheduler.setPoolSize(1);
        // 线程名前缀
        taskScheduler.setThreadNamePrefix("websocket-heartbeat-thread-");
        // 初始化
        taskScheduler.initialize();
        // 设置广播节点
        registry.enableSimpleBroker("/ad", "/device", "/pay", "/data", "/warn", "/alone").setHeartbeatValue(new long[]{10000, 10000})
                .setTaskScheduler(taskScheduler);
        // 客户端向服务端发送消息需有/app 前缀
        registry.setApplicationDestinationPrefixes("/app");
        // 指定用户发送(一对一)的前缀 /user/
        registry.setUserDestinationPrefix("/user");
    }

    /**
     * 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间
     *
     * @param registration
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        /*
         * 1. setMessageSizeLimit 设置消息缓存的字节数大小 字节
         * 2. setSendBufferSizeLimit 设置websocket会话时,缓存的大小 字节
         * 3. setSendTimeLimit 设置消息发送会话超时时间,毫秒
         */
        registration.setMessageSizeLimit(10240)
                .setSendBufferSizeLimit(10240)
                .setSendTimeLimit(10000);
    }

    /**
     * 配置客户端入站通道拦截器
     * 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
     *
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {

        /*
         * 配置消息线程池
         * 1. corePoolSize 配置核心线程池,当线程数小于此配置时,不管线程中有无空闲的线程,都会产生新线程处理任务
         * 2. maxPoolSize 配置线程池最大数,当线程池数等于此配置时,不会产生新线程
         * 3. keepAliveSeconds 线程池维护线程所允许的空闲时间,单位秒
         */
        registration.taskExecutor().corePoolSize(10)
                .maxPoolSize(20)
                .keepAliveSeconds(60);

        registration.interceptors(new WebSocketUserInterceptor());
    }

    /**
     * 设置输出消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
     *
     * @param registration
     */
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        registration.taskExecutor().corePoolSize(10)
                .maxPoolSize(20)
                .keepAliveSeconds(60);
        //registration.interceptors(new WebSocketUserInterceptor());
    }


}

sebsocket的http握手拦截器
MyHandshakeInterceptor


import com.easylinkin.bm.vo.websocket.WebSocketUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import java.util.Map;

/**
 * @author zhengwen
 **/
@Slf4j
public class MyHandshakeInterceptor implements HandshakeInterceptor {
    /**
     * websocket握手之前
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        log.info("--websocket的http连接握手之前--");
        ServletServerHttpRequest req = (ServletServerHttpRequest) request;
        WebSocketUser user = null;
        //获取token认证
        String token = req.getServletRequest().getParameter("token");
        //解析token获取用户信息
        //鉴权,我的方法是,前端把token传过来,解析token,判断正确与否,return true表示通过,false请求不通过。
        //TODO 鉴权设置用户
        if (StringUtils.isNotBlank(token)) {
            user = new WebSocketUser(token);
        }

        //如果token认证失败user为null,返回false拒绝握手
        if (user == null) {
            return false;
        }
        //保存认证用户
        attributes.put("user", user);
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {

    }
}

websocket的握手之后拦截器


import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

import java.security.Principal;
import java.util.Map;

/**
 * @author zhengwen
 **/
@Slf4j
public class MyHandshakeHandler extends DefaultHandshakeHandler {
    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        log.info("--websocket的http连接握手之后--");
        //设置认证用户
        return (Principal) attributes.get("user");
    }
}

websocket设置自定义的连接通道拦截器
WebSocketUserInterceptor


import com.easylinkin.bm.vo.websocket.WebSocketUser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;

import java.util.Map;

/**
 * @author zhengwen
 **/
@Slf4j
public class WebSocketUserInterceptor implements ChannelInterceptor {



    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        log.info("--websocket信息发送后--");
        ChannelInterceptor.super.afterSendCompletion(message, channel, sent, ex);
    }


    /**
     * 获取包含在stomp中的用户信息
     */
    @SuppressWarnings("rawtypes")
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        log.info("--websocket信息发送前--");
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (accessor != null) {
            if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
                if (raw instanceof Map) {
                    Object nameObj = ((Map) raw).get("name");
                    if (nameObj != null) {
                        // 设置当前访问器的认证用户,或者做其他业务
                        WebSocketUser webSocketUser = new WebSocketUser(String.valueOf(nameObj));
                        accessor.setUser(webSocketUser);
                    }
                }
            }
        }
        return message;
    }
}

WebSocketStompController


import com.easylinkin.bm.core.Result;
import com.easylinkin.bm.service.WebSocketService;
import com.easylinkin.bm.vo.websocket.WebSocketMsgVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * websocket stomp协议controller
 *
 * @author zhengwen
 **/
@Slf4j
@RestController
@RequestMapping("/web/socket/stomp")
public class WebSocketStompController {

    @Autowired
    private WebSocketService webSocketService;

    /**
     * 发送信息 stomp
     *
     * @param webSocketMsgVo 信息对象vo
     * @return 统一出参
     */
    @PostMapping("/sendStompMsg")
    @MessageMapping("/sendStompMsg")
    public Result<?> sendStompMsg(@RequestBody WebSocketMsgVo webSocketMsgVo) {
        log.info("--发送信息--");
        return webSocketService.sendStompMsg(webSocketMsgVo);
    }

}

WebSocketService实现类


import com.alibaba.fastjson.JSON;
import com.easylinkin.bm.core.Result;
import com.easylinkin.bm.core.ResultGenerator;
import com.easylinkin.bm.service.WebSocketService;
import com.easylinkin.bm.vo.websocket.WebSocketMsgVo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * @author zhengwen
 **/
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class WebSocketServiceImpl implements WebSocketService {
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @Override
    public Result<?> sendStompMsg(WebSocketMsgVo webSocketMsgVo) {
        String topicChannel = webSocketMsgVo.getTopicChannel();
        if (StringUtils.isNotBlank(topicChannel)) {
            topicChannel = "/" + topicChannel;
        }
        String message = JSON.toJSONString(webSocketMsgVo);
        String to = webSocketMsgVo.getTo();
        try {
            if (StringUtils.isNotBlank(to)) {
                //MD 不明原因用convertAndSendToUser不能收到,确认订阅没有问题
                //simpMessagingTemplate.convertAndSendToUser(to, topicChannel, message);
                simpMessagingTemplate.convertAndSend(topicChannel + "/" + to, message);
            } else {
                simpMessagingTemplate.convertAndSend(topicChannel, message);
            }
            return ResultGenerator.genSuccessResult();
        } catch (Exception e) {
            return ResultGenerator.genFailResult("发送失败");
        }

    }
}

这里一对一发送按道理应该用template的convertAndSendToUser方法,但是死活没效果。这里先用这种方式实现的。

四、测试html

<!DOCTYPE html>
<html>
  <head>
	<meta charset="UTF-8">
	<title>Insert title here</title>
	<link rel="stylesheet" href="http://cdn.static.runoob.com/libs/bootstrap/3.3.7/css/bootstrap.min.css">
	<script src="http://cdn.static.runoob.com/libs/jquery/2.1.1/jquery.min.js"></script>
	<script src="http://cdn.static.runoob.com/libs/bootstrap/3.3.7/js/bootstrap.min.js"></script>
	<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
	<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
	
	<script type="text/javascript">
	    var userName = "zs";
		var sendTopic = "ad";
		//var subsc = '/ad';//广播
		//var subsc = '/user/' + userName + '/' + sendTopic;//一对一  /user/zs/ad
		//var subsc = "/user/zs/ad";
        var subsc = "/ad/zs";
	    // 建立连接对象(还未发起连接)
	    var socket = new SockJS("http://localhost:8099/stomp/websocketJS?token=zw");
	    // 获取 STOMP 子协议的客户端对象
	    var stompClient = Stomp.over(socket);
	    stompClient.debug = function(str) {
			console.log("DEBUG---->" + str);
		};
	    // 向服务器发起websocket连接并发送CONNECT帧
	    stompClient.connect({name:userName,token:userName},
	    	function connectCallback(frame) {
	            // 连接成功时(服务器响应 CONNECTED 帧)的回调方法
	            setMessageInnerHTML("连接成功");
				
				console.log("---订阅:" + subsc);
				stompClient.subscribe(subsc, function (res) {
					
					 console.log("----res:"+res);
	                re = JSON.parse(res.body);
	                console.log(re);
	                setMessageInnerHTML("")
	                setMessageInnerHTML("你接收到的消息为:" + re.data);
	            });
	        },
	        function errorCallBack(error) {
	            // 连接失败时(服务器响应 ERROR 帧)的回调方法
	            setMessageInnerHTML("连接失败");
	        }
	    );
	
	    //发送消息
	    function send() {
			
	        var message = $("#content").val();
			var msg = {
			  "data":message,
			  "topicChannel":sendTopic,
			  "to":"zs"
			};
	        var messageJson = JSON.stringify(msg);
			
	        stompClient.send("/app/sendStompMsg", {}, messageJson);
	        sendMessageInnerHTML("/app/sendStompMsg 你发送的消息:" + message);
	    }
	    
	    //将消息显示在网页上
	    function setMessageInnerHTML(innerHTML) {
	        $("#in").html(innerHTML + '<br/>');
	    }
	    
	    function sendMessageInnerHTML(innerHTML) {
	        $("#out").append(innerHTML + '<br/>');
	    }
	    
		$(function(){
			$("#btn").click(function(){
				send();
			});
			
		})
	</script>
 
  </head>
<body>
	<input id="content" class="form-control">
	<button id="btn" class="btn btn-info">发送</button>
	<div id="in"></div>
	<div id="out"></div>
</body>
</html>

上面的代码我就不多解释,我写了一些注释,上面也是找的博友的改改就开测了。

五、看效果

一对一发送
在这里插入图片描述
接口发送
在这里插入图片描述
六、总结
1、同事有springBoot直接集成websocket的,页面用的定时器做心跳重连,但是还是会出现被nginx断掉。这里用stomp是可以设置心跳的,用到项目上生产环境,让运维设置nginx放行ws协议的超时 + 设置指定url的超时限制,目前看是ok的。
2、controller上的注解标签很有迷惑性,postman请求的url很特别,可以自己去探索下。
3、stomp、webscoket的集成网上很多博友都有写,各有特色。我觉得写的不错的,可以看博友老郑来了的分享,还是我本家哦。
希望可以帮到到家。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐