一、WebSocket

1、什么是websocket

是HTML5的一种新的协议,WebSocket是真正实现了全双工通信的服务器向客户端的互联网技术,是单个TCP连接上进行全双工通信的协议。

2、与socket、HTTP的区别

与socket的区别

WebSocket拥有完整的应用层协议,包含一套标准的API
Socket是一组接口,是应用层与TCP/IP协议通信的中间软件抽象层

与HTTP的区别

http是短链接,请求之后会关闭连接。
WebSocket是长连接,只需要通过一次请求初始化连接,然后所有的请求和响应都是通过这个TCP连接进行通信。

二、代码应用

1、服务端

pom依赖
<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-websocket</artifactId>
		<version>2.7.5</version>
</dependency>
<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
</dependency>
拦截器

负责在握手之前,处理客户端的URL请求


import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

import java.util.Map;

@Slf4j
public class MyWebSocketInterceptor extends HttpSessionHandshakeInterceptor {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * 握手之前,做参数处理
     * @param request the current request
     * @param response the current response
     * @param wsHandler the target WebSocket handler
     * @param attributes the attributes from the HTTP handshake to associate with the WebSocket
     * session; the provided attributes are copied, the original map is not used.
     * @return
     * @throws Exception
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        logger.info("[MyWebSocketInterceptor#BeforeHandshake] Request from " + request.getRemoteAddress().getHostString());
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
            String id = serverHttpRequest.getServletRequest().getParameter("id");
            attributes.put("id", id);
        }
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
        logger.info("[MyWebSocketInterceptor#afterHandshake] Request from " + request.getRemoteAddress().getHostString());
    }
}

Handler

负责websocket的消息处理、关闭等操作


import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class MyWebsocketHandler extends AbstractWebSocketHandler {

    public static final Map<String, WebSocketBean> webSocketBeanMap;

    /**
     * 仅用用于标识客户端编号
     */
    private static final AtomicInteger clientIdMaker;

    static {
        webSocketBeanMap = new ConcurrentHashMap<>();
        clientIdMaker = new AtomicInteger(0);
    }


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        // 当WebSocket连接正式建立后,将该Session加入到Map中进行管理
        Map<String, Object> attributes = session.getAttributes();
        WebSocketBean webSocketBean = new WebSocketBean();
        webSocketBean.setWebSocketSession(session);
        webSocketBean.setClientId(clientIdMaker.getAndIncrement());
        webSocketBean.setId(attributes.get("id").toString());
        webSocketBeanMap.put(session.getId(), webSocketBean);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        //当连接关闭后,从Map中移除session实例
        webSocketBeanMap.remove(session.getId());
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        //传输过程中出现了错误
        if (session.isOpen()) {
            session.close();
        }
        webSocketBeanMap.remove(session.getId());
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        //处理接收到的消息
        log.info("Received message from client[ID:" + webSocketBeanMap.get(session.getId()).getClientId() +
                "]; Content is [" + message.getPayload() + "].");
        TextMessage textMessage = new TextMessage("pong");
        session.sendMessage(textMessage);
    }
}

实体

负责接受参数,可要可不要


import lombok.Data;
import org.springframework.web.socket.WebSocketSession;

@Data
public class WebSocketBean {
    private WebSocketSession webSocketSession;
    private int clientId;
    private String Id;
}
配置类

负责开启websocket、加载处理器、拦截器


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
@EnableWebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {


    @Bean
    public MyWebSocketInterceptor myWebSocketInterceptor(){
        return new MyWebSocketInterceptor();
    }

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    /**
     * 注册handler 不同的请求,前往不同的handler
     * 添加拦截器,做握手前的数据处理
     * @param registry
     */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new MyWebsocketHandler(),"/websocket").addInterceptors(myWebSocketInterceptor());
    }
}

2、客户端

pom依赖
<dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
</dependency>
<dependency>
      <groupId>cn.hutool</groupId>
      <artifactId>hutool-all</artifactId>
      <version>5.5.1</version>
</dependency>
<dependency>
      <groupId>org.java-websocket</groupId>
      <artifactId>Java-WebSocket</artifactId>
      <version>1.5.1</version>
</dependency>
核心配置
save.path=C:\\Users\\LZKJ\\Desktop\\1
monitor.websocket.address= ws://127.0.0.1:9090/websocket?id=1;ws://127.0.0.1:9091/websocket?id=2;ws://127.0.0.1:9092/websocket?id=3
monitor.name=WebScoket1;WebScoket2;WebScoket3
monitor.interval=5
Client代码
import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.enums.ReadyState;
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

@Slf4j
public class HardWareMonitorWebSocketClient extends WebSocketClient {

    private List<Map<String,String>> logList;
    private final String monitorName;

    public HardWareMonitorWebSocketClient(URI serverUri, String monitorName) {
        super(serverUri);
        this.monitorName = monitorName;
        logList = new CopyOnWriteArrayList<>();
        // 初始化建立连接
        this.connect();
    }


    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        //log.info("{} websocket开启中", this.uri.toString());
    }

    @Override
    public void onMessage(String s) {
        logList.add(buildLogMap("服务正常"));
        log.info("{} 发送心跳信息成功", this.uri.toString());
    }

    @Override
    public void onClose(int i, String s, boolean b) {
        // log.info("{} 服务端连接关闭,重连中。。。。。", this.uri.toString());
        // logList.add(buildLogMap("硬件宕机,重连中......"));
        try {
            if (this.getReadyState() != ReadyState.OPEN) {
                if (this.getReadyState() == ReadyState.NOT_YET_CONNECTED) {
                    if (this.isClosed()) {
                        this.reconnect();
                    } else {
                        this.connect();
                    }
                } else if (this.getReadyState() == ReadyState.CLOSED) {
                    this.reconnect();
                }
            }
        } catch (Exception e) {
            // log.error("重连异常",e);
        }
    }

    @Override
    public void onError(Exception e) {
        log.info("{} 服务宕机:", this.uri.toString());
        logList.add(buildLogMap("服务宕机"));
    }

    private Map<String,String> buildLogMap(String msg){
        Map<String,String> map = new HashMap<>();
        map.put("url",this.uri.toString());
        map.put("monitorName",monitorName);
        map.put("monitorTime", DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
        map.put("msg",msg);
        return map;
    }

    public List<Map<String, String>> getLogList(){
        return logList;
    }

    public void clearLogList(List<Map<String, String>> removeList){
        this.logList.removeAll(removeList);
    }
}
心跳检测启动类

启动、关闭心跳检测


import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class Monitor implements CommandLineRunner {

    @Value("${monitor.interval}")
    private int monitorInterval;

    @Value("${monitor.websocket.address}")
    private String monitorWebSocketAddress;

    @Value("${monitor.name}")
    private String monitorName;

    @Value("${save.path}")
    private String savePath;

    @Override
    public void run(String... args) throws Exception {
        new MonitorUtil().start(monitorInterval,monitorWebSocketAddress,monitorName,savePath);
    }
}

心跳检测工具类

定时心跳检测、导出CSV文件


import cn.hutool.core.date.DateUtil;
import cn.hutool.core.text.csv.CsvWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.io.File;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

@Slf4j
@Component
public class MonitorUtil{

    private static boolean canEveryDayExportCSV = false;
    private static Map<String, monitorWebSocketClient> monitorWebSocketMap;
    private ScheduledFuture<?> monitorAliveTask = null;

    private static int monitorInterval;

    private static String monitorWebSocketAddress;

    private static String monitorName;
    private static String savePath;

    // 创建任务队列 10 为线程数量
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);

    public void start(int monitorInterval, String monitorWebSocketAddress, String monitorName, String savePath) {
        monitorInterval = monitorInterval;
        monitorWebSocketAddress = monitorWebSocketAddress;
        monitorName = monitorName;
        savePath = savePath;
        // 监控硬件 websocket
        if (StringUtils.hasText(monitorWebSocketAddress) && StringUtils.hasText(monitorName)) {
            // 初始化监控map
            monitorWebSocketMap = new ConcurrentHashMap<>();
            String[] monitorWebSocketAddressArray = monitorWebSocketAddress.split(";");
            String[] monitorNameArray = monitorName.split(";");
            for (int i = 0; i < monitorWebSocketAddressArray.length; i++) {
                // 初始化监控map,key为硬件名称,value为硬件socket连接对象
                if(StringUtils.hasText(monitorNameArray[i])){
                    monitorWebSocketMap.put(monitorNameArray[i], new monitorWebSocketClient(URI.create(monitorWebSocketAddressArray[i]), monitorNameArray[i]));
                }
             }
            log.info("监控数据:{}", monitorWebSocketMap);
            // 执行任务 立即执行,间隔时间由配置monitor.interval决定
            monitorAliveTask = scheduledExecutorService.scheduleWithFixedDelay(monitorAliveRunnable(), 0, monitorInterval, TimeUnit.SECONDS);
            // 每天24点导出数据开关
            canEveryDayExportCSV = true;
        }
    }

    public String stop(boolean needSave) {
        if (!needSave){
            return "not need save";
        }
        // 暂停心跳检测任务
        monitorAliveTask.cancel(true);
        // 关闭每天00:10:00 导出CSV 定时任务
        canEveryDayExportCSV = false;
        // 导出csv文件
        exportCSV(true,null,true);
        return "";
    }

    /**
     * 导出监测记录到CSV文件
     * @param exportAll true 导出全部,false 导出exportDate指定的日期
     * @param exportDate 指定导出数据的日期
     * @param closeWebSocket true 关闭webSocket连接,false不关闭
     */
    private void exportCSV(boolean exportAll,Date exportDate,boolean closeWebSocket){
        // 导出全部,日期就写今天
        if(exportAll){
            exportDate = new Date();
        }
        for (Map.Entry<String, monitorWebSocketClient> entry : monitorWebSocketMap.entrySet()) {
            // 文件名称 年月日+硬件名称+hardwareMonitor.csv
            String csvFileName = DateUtil.format(exportDate, "yyyyMMdd") + "-" + entry.getKey() + "-" + "-monitor.csv";
            String filePath = savePath + File.separator + csvFileName;
            log.info("导出的文件地址:{}",filePath);
            File file = new File(filePath);
            // 文件存在,就追加数据,不存在,使用覆盖数据
            boolean fileExists = file.exists();
            try (CsvWriter csvWriter = new CsvWriter(file, StandardCharsets.UTF_8, fileExists)) {
                if (!fileExists) {
                    csvWriter.write(new String[]{"监测地址", "检测名称", "监测时间", "监测信息"});
                }
                Date finalExportDate = exportDate;
                // 获取需要导出的数据 exportAll为true导出全部,false,导出日期和exportDate相同的数据
                List<Map<String, String>> logWriteCSVList = entry.getValue().getLogList().stream().filter(logMap -> exportAll || DateUtil.isSameDay(finalExportDate, DateUtil.parse(logMap.get("monitorTime")))).collect(Collectors.toList());
                for (Map<String, String> logMap : logWriteCSVList) {
                    csvWriter.write(new String[]{logMap.get("url"), logMap.get("monitorName"), logMap.get("monitorTime"), logMap.get("msg")});
                }
                // 清空数据-将写入csv文件的数据删除
                entry.getValue().clearLogList(logWriteCSVList);
                if(closeWebSocket){
                    // 关闭socket连接
                    entry.getValue().close();
                }
            } catch (Exception e) {
                log.error("导出CSV文件异常:", e);
            }
        }
    }

    /**
     * 每天00:10:00导出前一天数据
     */
    @Scheduled(cron = "0 10 0 * * ?")
    private void exportCSVTask(){
        if(canEveryDayExportCSV){
            try {
                Date date = DateUtil.offsetDay(new Date(),-1);
                log.info("定时任务导出监控数据:{}",DateUtil.format(date,"yyyy-MM-dd HH:mm:ss"));
                // 导出前一天的数据
                exportCSV(false,date,false);
            } catch (Exception e) {
                log.error("每天定时导出数据失败:",e);
            }
        }
    }

    /**
     * 定时任务-发送websocket心跳
     */
    private Runnable monitorAliveRunnable(){
        return ()->{
            for (Map.Entry<String, monitorWebSocketClient> entry : monitorWebSocketMap.entrySet()) {
                try {
                    entry.getValue().send("ping");
                } catch (Exception e) {
                    // 发送不了消息,连接异常,调用重连方法
                    entry.getValue().onClose(0, null, true);
                }
            }
        };
    }

}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐