SpringBoot实现WebSocket心跳机制
SpringBoot实现WebSocket心跳检测,检测数据写入CSV文件
·
一、WebSocket
1、什么是websocket
是HTML5的一种新的协议,WebSocket是真正实现了全双工通信的服务器向客户端的互联网技术,是单个TCP连接上进行全双工通信的协议。
2、与socket、HTTP的区别
与socket的区别
WebSocket拥有完整的应用层协议,包含一套标准的API
Socket是一组接口,是应用层与TCP/IP协议通信的中间软件抽象层
与HTTP的区别
http是短链接,请求之后会关闭连接。
WebSocket是长连接,只需要通过一次请求初始化连接,然后所有的请求和响应都是通过这个TCP连接进行通信。
二、代码应用
1、服务端
pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
拦截器
负责在握手之前,处理客户端的URL请求
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
import java.util.Map;
@Slf4j
public class MyWebSocketInterceptor extends HttpSessionHandshakeInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());
/**
* 握手之前,做参数处理
* @param request the current request
* @param response the current response
* @param wsHandler the target WebSocket handler
* @param attributes the attributes from the HTTP handshake to associate with the WebSocket
* session; the provided attributes are copied, the original map is not used.
* @return
* @throws Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
logger.info("[MyWebSocketInterceptor#BeforeHandshake] Request from " + request.getRemoteAddress().getHostString());
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
String id = serverHttpRequest.getServletRequest().getParameter("id");
attributes.put("id", id);
}
return super.beforeHandshake(request, response, wsHandler, attributes);
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
logger.info("[MyWebSocketInterceptor#afterHandshake] Request from " + request.getRemoteAddress().getHostString());
}
}
Handler
负责websocket的消息处理、关闭等操作
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class MyWebsocketHandler extends AbstractWebSocketHandler {
public static final Map<String, WebSocketBean> webSocketBeanMap;
/**
* 仅用用于标识客户端编号
*/
private static final AtomicInteger clientIdMaker;
static {
webSocketBeanMap = new ConcurrentHashMap<>();
clientIdMaker = new AtomicInteger(0);
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 当WebSocket连接正式建立后,将该Session加入到Map中进行管理
Map<String, Object> attributes = session.getAttributes();
WebSocketBean webSocketBean = new WebSocketBean();
webSocketBean.setWebSocketSession(session);
webSocketBean.setClientId(clientIdMaker.getAndIncrement());
webSocketBean.setId(attributes.get("id").toString());
webSocketBeanMap.put(session.getId(), webSocketBean);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
//当连接关闭后,从Map中移除session实例
webSocketBeanMap.remove(session.getId());
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
//传输过程中出现了错误
if (session.isOpen()) {
session.close();
}
webSocketBeanMap.remove(session.getId());
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
//处理接收到的消息
log.info("Received message from client[ID:" + webSocketBeanMap.get(session.getId()).getClientId() +
"]; Content is [" + message.getPayload() + "].");
TextMessage textMessage = new TextMessage("pong");
session.sendMessage(textMessage);
}
}
实体
负责接受参数,可要可不要
import lombok.Data;
import org.springframework.web.socket.WebSocketSession;
@Data
public class WebSocketBean {
private WebSocketSession webSocketSession;
private int clientId;
private String Id;
}
配置类
负责开启websocket、加载处理器、拦截器
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
@EnableWebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {
@Bean
public MyWebSocketInterceptor myWebSocketInterceptor(){
return new MyWebSocketInterceptor();
}
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
/**
* 注册handler 不同的请求,前往不同的handler
* 添加拦截器,做握手前的数据处理
* @param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new MyWebsocketHandler(),"/websocket").addInterceptors(myWebSocketInterceptor());
}
}
2、客户端
pom依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.1</version>
</dependency>
核心配置
save.path=C:\\Users\\LZKJ\\Desktop\\1
monitor.websocket.address= ws://127.0.0.1:9090/websocket?id=1;ws://127.0.0.1:9091/websocket?id=2;ws://127.0.0.1:9092/websocket?id=3
monitor.name=WebScoket1;WebScoket2;WebScoket3
monitor.interval=5
Client代码
import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.enums.ReadyState;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
@Slf4j
public class HardWareMonitorWebSocketClient extends WebSocketClient {
private List<Map<String,String>> logList;
private final String monitorName;
public HardWareMonitorWebSocketClient(URI serverUri, String monitorName) {
super(serverUri);
this.monitorName = monitorName;
logList = new CopyOnWriteArrayList<>();
// 初始化建立连接
this.connect();
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
//log.info("{} websocket开启中", this.uri.toString());
}
@Override
public void onMessage(String s) {
logList.add(buildLogMap("服务正常"));
log.info("{} 发送心跳信息成功", this.uri.toString());
}
@Override
public void onClose(int i, String s, boolean b) {
// log.info("{} 服务端连接关闭,重连中。。。。。", this.uri.toString());
// logList.add(buildLogMap("硬件宕机,重连中......"));
try {
if (this.getReadyState() != ReadyState.OPEN) {
if (this.getReadyState() == ReadyState.NOT_YET_CONNECTED) {
if (this.isClosed()) {
this.reconnect();
} else {
this.connect();
}
} else if (this.getReadyState() == ReadyState.CLOSED) {
this.reconnect();
}
}
} catch (Exception e) {
// log.error("重连异常",e);
}
}
@Override
public void onError(Exception e) {
log.info("{} 服务宕机:", this.uri.toString());
logList.add(buildLogMap("服务宕机"));
}
private Map<String,String> buildLogMap(String msg){
Map<String,String> map = new HashMap<>();
map.put("url",this.uri.toString());
map.put("monitorName",monitorName);
map.put("monitorTime", DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
map.put("msg",msg);
return map;
}
public List<Map<String, String>> getLogList(){
return logList;
}
public void clearLogList(List<Map<String, String>> removeList){
this.logList.removeAll(removeList);
}
}
心跳检测启动类
启动、关闭心跳检测
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class Monitor implements CommandLineRunner {
@Value("${monitor.interval}")
private int monitorInterval;
@Value("${monitor.websocket.address}")
private String monitorWebSocketAddress;
@Value("${monitor.name}")
private String monitorName;
@Value("${save.path}")
private String savePath;
@Override
public void run(String... args) throws Exception {
new MonitorUtil().start(monitorInterval,monitorWebSocketAddress,monitorName,savePath);
}
}
心跳检测工具类
定时心跳检测、导出CSV文件
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.text.csv.CsvWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.io.File;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@Slf4j
@Component
public class MonitorUtil{
private static boolean canEveryDayExportCSV = false;
private static Map<String, monitorWebSocketClient> monitorWebSocketMap;
private ScheduledFuture<?> monitorAliveTask = null;
private static int monitorInterval;
private static String monitorWebSocketAddress;
private static String monitorName;
private static String savePath;
// 创建任务队列 10 为线程数量
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
public void start(int monitorInterval, String monitorWebSocketAddress, String monitorName, String savePath) {
monitorInterval = monitorInterval;
monitorWebSocketAddress = monitorWebSocketAddress;
monitorName = monitorName;
savePath = savePath;
// 监控硬件 websocket
if (StringUtils.hasText(monitorWebSocketAddress) && StringUtils.hasText(monitorName)) {
// 初始化监控map
monitorWebSocketMap = new ConcurrentHashMap<>();
String[] monitorWebSocketAddressArray = monitorWebSocketAddress.split(";");
String[] monitorNameArray = monitorName.split(";");
for (int i = 0; i < monitorWebSocketAddressArray.length; i++) {
// 初始化监控map,key为硬件名称,value为硬件socket连接对象
if(StringUtils.hasText(monitorNameArray[i])){
monitorWebSocketMap.put(monitorNameArray[i], new monitorWebSocketClient(URI.create(monitorWebSocketAddressArray[i]), monitorNameArray[i]));
}
}
log.info("监控数据:{}", monitorWebSocketMap);
// 执行任务 立即执行,间隔时间由配置monitor.interval决定
monitorAliveTask = scheduledExecutorService.scheduleWithFixedDelay(monitorAliveRunnable(), 0, monitorInterval, TimeUnit.SECONDS);
// 每天24点导出数据开关
canEveryDayExportCSV = true;
}
}
public String stop(boolean needSave) {
if (!needSave){
return "not need save";
}
// 暂停心跳检测任务
monitorAliveTask.cancel(true);
// 关闭每天00:10:00 导出CSV 定时任务
canEveryDayExportCSV = false;
// 导出csv文件
exportCSV(true,null,true);
return "";
}
/**
* 导出监测记录到CSV文件
* @param exportAll true 导出全部,false 导出exportDate指定的日期
* @param exportDate 指定导出数据的日期
* @param closeWebSocket true 关闭webSocket连接,false不关闭
*/
private void exportCSV(boolean exportAll,Date exportDate,boolean closeWebSocket){
// 导出全部,日期就写今天
if(exportAll){
exportDate = new Date();
}
for (Map.Entry<String, monitorWebSocketClient> entry : monitorWebSocketMap.entrySet()) {
// 文件名称 年月日+硬件名称+hardwareMonitor.csv
String csvFileName = DateUtil.format(exportDate, "yyyyMMdd") + "-" + entry.getKey() + "-" + "-monitor.csv";
String filePath = savePath + File.separator + csvFileName;
log.info("导出的文件地址:{}",filePath);
File file = new File(filePath);
// 文件存在,就追加数据,不存在,使用覆盖数据
boolean fileExists = file.exists();
try (CsvWriter csvWriter = new CsvWriter(file, StandardCharsets.UTF_8, fileExists)) {
if (!fileExists) {
csvWriter.write(new String[]{"监测地址", "检测名称", "监测时间", "监测信息"});
}
Date finalExportDate = exportDate;
// 获取需要导出的数据 exportAll为true导出全部,false,导出日期和exportDate相同的数据
List<Map<String, String>> logWriteCSVList = entry.getValue().getLogList().stream().filter(logMap -> exportAll || DateUtil.isSameDay(finalExportDate, DateUtil.parse(logMap.get("monitorTime")))).collect(Collectors.toList());
for (Map<String, String> logMap : logWriteCSVList) {
csvWriter.write(new String[]{logMap.get("url"), logMap.get("monitorName"), logMap.get("monitorTime"), logMap.get("msg")});
}
// 清空数据-将写入csv文件的数据删除
entry.getValue().clearLogList(logWriteCSVList);
if(closeWebSocket){
// 关闭socket连接
entry.getValue().close();
}
} catch (Exception e) {
log.error("导出CSV文件异常:", e);
}
}
}
/**
* 每天00:10:00导出前一天数据
*/
@Scheduled(cron = "0 10 0 * * ?")
private void exportCSVTask(){
if(canEveryDayExportCSV){
try {
Date date = DateUtil.offsetDay(new Date(),-1);
log.info("定时任务导出监控数据:{}",DateUtil.format(date,"yyyy-MM-dd HH:mm:ss"));
// 导出前一天的数据
exportCSV(false,date,false);
} catch (Exception e) {
log.error("每天定时导出数据失败:",e);
}
}
}
/**
* 定时任务-发送websocket心跳
*/
private Runnable monitorAliveRunnable(){
return ()->{
for (Map.Entry<String, monitorWebSocketClient> entry : monitorWebSocketMap.entrySet()) {
try {
entry.getValue().send("ping");
} catch (Exception e) {
// 发送不了消息,连接异常,调用重连方法
entry.getValue().onClose(0, null, true);
}
}
};
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)