SpringBoot集成websocket能力(stomp)
序之前有分享过springBoot集成Websocket推送信息。今天主要是来继续分享升级版,这次是采用STOMP协议。用这个的好处有很多,比如可以屏蔽浏览器之间的差异,更方便对接消息中间件等。一、协议理解HTTP、WebSocket 等应用层协议,都是基于 TCP 协议来传输数据的。HTTP不足在于它与服务器的全双工通信依靠轮询实现,对于需要从服务器主动发送数据的情境,会给服务器资源造成很大的浪
序
之前有分享过springBoot集成Websocket推送信息。今天主要是来继续分享升级版,这次是采用STOMP协议。用这个的好处有很多,比如可以屏蔽浏览器之间的差异,更方便对接消息中间件等。
一、协议理解
HTTP、WebSocket 等应用层协议,都是基于 TCP 协议来传输数据的。
HTTP不足在于它与服务器的全双工通信依靠轮询实现,对于需要从服务器主动发送数据的情境,会给服务器资源造成很大的浪费,WebSocket是针对HTTP在这种情况下的补充。
对于 WebSocket 来说,它必须依赖 HTTP 协议进行一次握手 ,握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了。
WebSocket是一个完整的应用层协议,包含一套标准的 API 。
STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互。 STOMP协议可以建立在WebSocket之上,也可以建立在其他应用层协议之上。
二、依赖包
<!-- websocket支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- 有说需要依赖这个的,我这里实际没有引入
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</dependency>
-->
三、能力集成核心代码
websocket用户类
import java.security.Principal;
/**
* @author zhengwen
**/
public class WebSocketUser implements Principal {
/**
* 用户信息
*/
private final String name;
public WebSocketUser(String name) {
this.name = name;
}
@Override
public String getName() {
return name;
}
}
消息对象类
WebSocketMsgVo
import lombok.Data;
import java.time.LocalDateTime;
/**
* websocket信息vo对象
*
* @author zhengwen
**/
@Data
public class WebSocketMsgVo<T> {
/**
* 发送方
*/
private String from;
/**
* 接收方
*/
private String to;
/**
* 时间
*/
private LocalDateTime time = LocalDateTime.now();
/**
* 平台来源
*/
private String platform;
/**
* 主题通道
*/
private String topicChannel;
/**
* 信息业务对象
*/
private T data;
}
这里data定义为抽象类,由业务系统自行定义。因为我们这是提供能力,所以尽量不要固定死。
配置类
import com.easylinkin.bm.handler.MyHandshakeHandler;
import com.easylinkin.bm.interceptor.MyHandshakeInterceptor;
import com.easylinkin.bm.interceptor.WebSocketUserInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
/**
* websocket stomp协议配置类
*
* @author zhengwen
**/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
/**
* 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,
* 也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
*
* @param registry
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
/*
* 1. 将 /stomp/websocketJs路径注册为STOMP的端点,
* 用户连接了这个端点后就可以进行websocket通讯,支持socketJs
* 2. setAllowedOriginPatterns("*")表示可以跨域
* 3. withSockJS()表示支持socktJS访问
* 4. addInterceptors 添加自定义拦截器,这个拦截器是上一个demo自己定义的获取httpsession的拦截器
* 5. addInterceptors 添加拦截处理,这里MyPrincipalHandshakeHandler 封装的认证用户信息
*/
//配置客户端连接地址
registry.addEndpoint("/stomp/websocketJS").setAllowedOriginPatterns("*").addInterceptors(new MyHandshakeInterceptor()).setHandshakeHandler(new MyHandshakeHandler()).withSockJS();
/*
* 添加多个端点
* 它的实现类是WebMvcStompEndpointRegistry ,
* addEndpoint是添加到WebMvcStompWebSocketEndpointRegistration的集合中,
* 所以可以添加多个端点
*/
registry.addEndpoint("/stomp/websocket");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 自定义调度器,用于控制心跳线程
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
// 线程池线程数,心跳连接开线程
taskScheduler.setPoolSize(1);
// 线程名前缀
taskScheduler.setThreadNamePrefix("websocket-heartbeat-thread-");
// 初始化
taskScheduler.initialize();
// 设置广播节点
registry.enableSimpleBroker("/ad", "/device", "/pay", "/data", "/warn", "/alone").setHeartbeatValue(new long[]{10000, 10000})
.setTaskScheduler(taskScheduler);
// 客户端向服务端发送消息需有/app 前缀
registry.setApplicationDestinationPrefixes("/app");
// 指定用户发送(一对一)的前缀 /user/
registry.setUserDestinationPrefix("/user");
}
/**
* 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间
*
* @param registration
*/
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
/*
* 1. setMessageSizeLimit 设置消息缓存的字节数大小 字节
* 2. setSendBufferSizeLimit 设置websocket会话时,缓存的大小 字节
* 3. setSendTimeLimit 设置消息发送会话超时时间,毫秒
*/
registration.setMessageSizeLimit(10240)
.setSendBufferSizeLimit(10240)
.setSendTimeLimit(10000);
}
/**
* 配置客户端入站通道拦截器
* 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
*
* @param registration
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
/*
* 配置消息线程池
* 1. corePoolSize 配置核心线程池,当线程数小于此配置时,不管线程中有无空闲的线程,都会产生新线程处理任务
* 2. maxPoolSize 配置线程池最大数,当线程池数等于此配置时,不会产生新线程
* 3. keepAliveSeconds 线程池维护线程所允许的空闲时间,单位秒
*/
registration.taskExecutor().corePoolSize(10)
.maxPoolSize(20)
.keepAliveSeconds(60);
registration.interceptors(new WebSocketUserInterceptor());
}
/**
* 设置输出消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
*
* @param registration
*/
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(10)
.maxPoolSize(20)
.keepAliveSeconds(60);
//registration.interceptors(new WebSocketUserInterceptor());
}
}
sebsocket的http握手拦截器
MyHandshakeInterceptor
import com.easylinkin.bm.vo.websocket.WebSocketUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
/**
* @author zhengwen
**/
@Slf4j
public class MyHandshakeInterceptor implements HandshakeInterceptor {
/**
* websocket握手之前
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
log.info("--websocket的http连接握手之前--");
ServletServerHttpRequest req = (ServletServerHttpRequest) request;
WebSocketUser user = null;
//获取token认证
String token = req.getServletRequest().getParameter("token");
//解析token获取用户信息
//鉴权,我的方法是,前端把token传过来,解析token,判断正确与否,return true表示通过,false请求不通过。
//TODO 鉴权设置用户
if (StringUtils.isNotBlank(token)) {
user = new WebSocketUser(token);
}
//如果token认证失败user为null,返回false拒绝握手
if (user == null) {
return false;
}
//保存认证用户
attributes.put("user", user);
return true;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
}
}
websocket的握手之后拦截器
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
import java.security.Principal;
import java.util.Map;
/**
* @author zhengwen
**/
@Slf4j
public class MyHandshakeHandler extends DefaultHandshakeHandler {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
log.info("--websocket的http连接握手之后--");
//设置认证用户
return (Principal) attributes.get("user");
}
}
websocket设置自定义的连接通道拦截器
WebSocketUserInterceptor
import com.easylinkin.bm.vo.websocket.WebSocketUser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import java.util.Map;
/**
* @author zhengwen
**/
@Slf4j
public class WebSocketUserInterceptor implements ChannelInterceptor {
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
log.info("--websocket信息发送后--");
ChannelInterceptor.super.afterSendCompletion(message, channel, sent, ex);
}
/**
* 获取包含在stomp中的用户信息
*/
@SuppressWarnings("rawtypes")
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
log.info("--websocket信息发送前--");
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor != null) {
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
if (raw instanceof Map) {
Object nameObj = ((Map) raw).get("name");
if (nameObj != null) {
// 设置当前访问器的认证用户,或者做其他业务
WebSocketUser webSocketUser = new WebSocketUser(String.valueOf(nameObj));
accessor.setUser(webSocketUser);
}
}
}
}
return message;
}
}
WebSocketStompController
import com.easylinkin.bm.core.Result;
import com.easylinkin.bm.service.WebSocketService;
import com.easylinkin.bm.vo.websocket.WebSocketMsgVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* websocket stomp协议controller
*
* @author zhengwen
**/
@Slf4j
@RestController
@RequestMapping("/web/socket/stomp")
public class WebSocketStompController {
@Autowired
private WebSocketService webSocketService;
/**
* 发送信息 stomp
*
* @param webSocketMsgVo 信息对象vo
* @return 统一出参
*/
@PostMapping("/sendStompMsg")
@MessageMapping("/sendStompMsg")
public Result<?> sendStompMsg(@RequestBody WebSocketMsgVo webSocketMsgVo) {
log.info("--发送信息--");
return webSocketService.sendStompMsg(webSocketMsgVo);
}
}
WebSocketService实现类
import com.alibaba.fastjson.JSON;
import com.easylinkin.bm.core.Result;
import com.easylinkin.bm.core.ResultGenerator;
import com.easylinkin.bm.service.WebSocketService;
import com.easylinkin.bm.vo.websocket.WebSocketMsgVo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* @author zhengwen
**/
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class WebSocketServiceImpl implements WebSocketService {
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
@Override
public Result<?> sendStompMsg(WebSocketMsgVo webSocketMsgVo) {
String topicChannel = webSocketMsgVo.getTopicChannel();
if (StringUtils.isNotBlank(topicChannel)) {
topicChannel = "/" + topicChannel;
}
String message = JSON.toJSONString(webSocketMsgVo);
String to = webSocketMsgVo.getTo();
try {
if (StringUtils.isNotBlank(to)) {
//MD 不明原因用convertAndSendToUser不能收到,确认订阅没有问题
//simpMessagingTemplate.convertAndSendToUser(to, topicChannel, message);
simpMessagingTemplate.convertAndSend(topicChannel + "/" + to, message);
} else {
simpMessagingTemplate.convertAndSend(topicChannel, message);
}
return ResultGenerator.genSuccessResult();
} catch (Exception e) {
return ResultGenerator.genFailResult("发送失败");
}
}
}
这里一对一发送按道理应该用template的convertAndSendToUser方法,但是死活没效果。这里先用这种方式实现的。
四、测试html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
<link rel="stylesheet" href="http://cdn.static.runoob.com/libs/bootstrap/3.3.7/css/bootstrap.min.css">
<script src="http://cdn.static.runoob.com/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="http://cdn.static.runoob.com/libs/bootstrap/3.3.7/js/bootstrap.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
<script type="text/javascript">
var userName = "zs";
var sendTopic = "ad";
//var subsc = '/ad';//广播
//var subsc = '/user/' + userName + '/' + sendTopic;//一对一 /user/zs/ad
//var subsc = "/user/zs/ad";
var subsc = "/ad/zs";
// 建立连接对象(还未发起连接)
var socket = new SockJS("http://localhost:8099/stomp/websocketJS?token=zw");
// 获取 STOMP 子协议的客户端对象
var stompClient = Stomp.over(socket);
stompClient.debug = function(str) {
console.log("DEBUG---->" + str);
};
// 向服务器发起websocket连接并发送CONNECT帧
stompClient.connect({name:userName,token:userName},
function connectCallback(frame) {
// 连接成功时(服务器响应 CONNECTED 帧)的回调方法
setMessageInnerHTML("连接成功");
console.log("---订阅:" + subsc);
stompClient.subscribe(subsc, function (res) {
console.log("----res:"+res);
re = JSON.parse(res.body);
console.log(re);
setMessageInnerHTML("")
setMessageInnerHTML("你接收到的消息为:" + re.data);
});
},
function errorCallBack(error) {
// 连接失败时(服务器响应 ERROR 帧)的回调方法
setMessageInnerHTML("连接失败");
}
);
//发送消息
function send() {
var message = $("#content").val();
var msg = {
"data":message,
"topicChannel":sendTopic,
"to":"zs"
};
var messageJson = JSON.stringify(msg);
stompClient.send("/app/sendStompMsg", {}, messageJson);
sendMessageInnerHTML("/app/sendStompMsg 你发送的消息:" + message);
}
//将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
$("#in").html(innerHTML + '<br/>');
}
function sendMessageInnerHTML(innerHTML) {
$("#out").append(innerHTML + '<br/>');
}
$(function(){
$("#btn").click(function(){
send();
});
})
</script>
</head>
<body>
<input id="content" class="form-control">
<button id="btn" class="btn btn-info">发送</button>
<div id="in"></div>
<div id="out"></div>
</body>
</html>
上面的代码我就不多解释,我写了一些注释,上面也是找的博友的改改就开测了。
五、看效果
一对一发送
接口发送
六、总结
1、同事有springBoot直接集成websocket的,页面用的定时器做心跳重连,但是还是会出现被nginx断掉。这里用stomp是可以设置心跳的,用到项目上生产环境,让运维设置nginx放行ws协议的超时 + 设置指定url的超时限制,目前看是ok的。
2、controller上的注解标签很有迷惑性,postman请求的url很特别,可以自己去探索下。
3、stomp、webscoket的集成网上很多博友都有写,各有特色。我觉得写的不错的,可以看博友老郑来了的分享,还是我本家哦。
希望可以帮到到家。
更多推荐
所有评论(0)