前言

提示:这里可以添加本文要记录的大概内容:

SpringBoot中使用Websocket


提示:以下是本篇文章正文内容,下面案例可供参考

一、先创建好SpringBoot框架

二、使用步骤

1.使用maven引入依赖

代码如下:

<!--webSocket-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
 <dependency>
 <groupId>org.apache.commons</groupId>
     <artifactId>commons-lang3</artifactId>
     <version>3.8.1</version>
 </dependency>

2.创建服务端

创建WebSocketServer

package com.panbl.websocketdemo.websocket;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
 * websocket的处理类。
 * 作用相当于HTTP请求
 * 中的controller
 */
@Component
@Slf4j
@ServerEndpoint("/ws/{userId}")
public class WebSocketServer {

    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
    private static int onlineCount = 0;
    /**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。*/
    private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    private static ConcurrentHashMap<String, List<WebSocketServer>> devWebSocketMap = new ConcurrentHashMap<>();
    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
    private Session session;
    /**接收userId*/
    private String userId = "";

    /**
     * 连接建立成
     * 功调用的方法
     */
    @OnOpen
    public void onOpen(Session session,@PathParam("userId") String userId) {
        this.session = session;
        //userId = session.getId()+"_"+userId; //拼接
        this.userId=userId;
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            //加入set中
            webSocketMap.put(userId,this);
        }else{
            //加入set中
            webSocketMap.put(userId,this);
            //在线数加1
            addOnlineCount();
        }
        log.info("用户连接:"+userId+",当前在线用户为:" + getOnlineCount());
        sendMessage("{\"status\":0,\"msg\":\"连接成功\"}");
    }

    /**
     * 连接关闭
     * 调用的方法
     */
    @OnClose
    public void onClose() {
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            //从set中删除
            subOnlineCount();
        }
        log.info("用户退出:"+userId+",当前在线用户为:" + getOnlineCount());
    }

    /**
     * 收到客户端消
     * 息后调用的方法
     * @param message
     * 客户端发送过来的消息
     **/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:"+userId+",报文:"+message);
        //可以群发消息
        //消息保存到数据库、redis
        if(StringUtils.isNotBlank(message)){
            try {

            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }


    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {

        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
        error.printStackTrace();
    }

    /**
     * 实现服务
     * 器主动推送
     */
    public void sendMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     *发送自定
     *义消息
     **/
    public static void sendInfo(String message, String userId) {
        log.info("发送消息到:"+userId+",报文:"+message);
        if(StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)){
            webSocketMap.get(userId).sendMessage(message);
        }else{
            log.error("用户"+userId+",不在线!");
        }
    }

    /**
     * 获得此时的
     * 在线用户数
     * @return
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * 在线人
     * 数加1
     */
    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    /**
     * 在线人
     * 数减1
     */
    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }

}


创建WebSocketConfig

package com.lst.cabinet.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author Panbanglin
 * @create 2022/4/11
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

以上为服务端代码。


3.创建客户端-web版本

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>websocket通讯</title>
</head>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
<script>
    let socket;

    function openSocket() {

        const socketUrl = "ws://localhost:8080/ws/" + $("#userId").val();
        console.log(socketUrl);
        if (socket != null) {
            socket.close();
            socket = null;
        }
        socket = new WebSocket(socketUrl);
        //打开事件
        socket.onopen = function () {
            console.log("websocket已打开");
        };
        //获得消息事件
        socket.onmessage = function (msg) {
            console.log(msg.data);
            //发现消息进入,开始处理前端触发逻辑
        };
        //关闭事件
        socket.onclose = function () {
            console.log("websocket已关闭");
        };
        //发生了错误事件
        socket.onerror = function () {
            console.log("websocket发生了错误");
        }
    }

    function sendMessage() {

        // socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
        // console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
    }
</script>
<body>
<p>【socket开启者的ID信息】:
<div><input id="userId" name="userId" type="text" value="3000000842"></div>
<p>【客户端向服务器发送的内容】:
<div><input id="toUserId" name="toUserId" type="text" value="20">
    <input id="contentText" name="contentText" type="text" value="hello websocket"></div>
<p>【操作】:
<div><a style="color: red" onclick="openSocket()">开启socket</a></div>
<p>【操作】:
<div><a onclick="sendMessage()">发送消息</a></div>
</body>

</html>

</html>

web版连接演示

在这里插入图片描述
web页面客户端连接演示
后端服务器演示

4.SpringBoot作为客户端 带断线重连

新建SpringBoot项目

<!--websocket作为客户端-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.7.22</version>
</dependency>
<dependency>
    <groupId>org.java-websocket</groupId>
    <artifactId>Java-WebSocket</artifactId>
    <version>1.5.2</version>
</dependency>

1.创建MyWebSocketClient

package com.panbl.springbootwebsocketclient.websocket;

import com.panbl.springbootwebsocketclient.tool.ByteUtils;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.drafts.Draft;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Map;

/**
 * @author PanBangLin
 * @version 1.0
 * @date 2022/4/19 22:38
 */
@Slf4j
public class MyWebSocketClient extends org.java_websocket.client.WebSocketClient {



    public MyWebSocketClient(URI serverUri) {
        super(serverUri);
    }

    public MyWebSocketClient(URI serverUri, Draft protocolDraft) {
        super(serverUri, protocolDraft);
    }

    public MyWebSocketClient(URI serverUri, Draft protocolDraft, Map<String, String> httpHeaders, int connectTimeout) {
        super(serverUri, protocolDraft, httpHeaders, connectTimeout);
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        log.info("[websocket] 连接成功");
        //devOrder.subscribeDev("33255773800487108280");
    }

    @Override
    public void onMessage(String message) {
        log.info("[websocket] 收到消息={}", message);
    }

    @Override
    public void onMessage(ByteBuffer bytes) {
        log.info("[websocket] 收到消息={}", ByteUtils.getString(bytes));

    }

    @Override
    public void onClose(int i, String s, boolean b) {
        log.info("[websocket] 退出连接");
    }

    @Override
    public void onError(Exception e) {
        e.printStackTrace();
        log.info("[websocket] 连接错误={}", e.getMessage());
    }
}

2.新建工具类解析ByteBuffer 数据 ByteUtils

package com.cw.devsocket.Tool;

import org.springframework.stereotype.Component;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

/**
 * @author Panbanglin
 * @create 2022/4/19
 */
public class ByteUtils {

    /**
     * String 转换 ByteBuffer
     * @param str
     * @return
     */
    public static ByteBuffer getByteBuffer(String str)
    {
        return ByteBuffer.wrap(str.getBytes());
    }

    /**
     * ByteBuffer 转换 String
     * @param buffer
     * @return
     */
    public static String getString(ByteBuffer buffer)
    {
        Charset charset = null;
        CharsetDecoder decoder = null;
        CharBuffer charBuffer = null;
        try
        {
            charset = Charset.forName("UTF-8");
            decoder = charset.newDecoder();
            // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空
            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
            return charBuffer.toString();
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
            return "";
        }
    }
}

新建WebSocketConfig

package com.panbl.springbootwebsocketclient.config;

import com.panbl.springbootwebsocketclient.websocket.MyWebSocketClient;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;

/**
 * @author PanBangLin
 * @version 1.0
 * @date 2022/4/19 22:40
 */
@Component
@Slf4j
public class WebSocketConfig {

    @Autowired
    private Environment env;

    @Bean
    public WebSocketClient webSocketClient() {
        //String ws=env.getProperty("dev.webSocket")+ UUID.randomUUID().toString();
        String ws="ws://localhost:8080/ws/"+ UUID.randomUUID().toString();
        //String ws="ws://127.0.0.1:8181/ws/v1/cabinet/status/"+ UUID.randomUUID().toString();
        try {
            WebSocketClient webSocketClient = new MyWebSocketClient(new URI(ws));
            webSocketClient.connect();
            Timer t = new Timer();
            t.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    System.out.println("进入定时器");
                    if(webSocketClient.isClosed()){
                        log.error("断线重连");
                        webSocketClient.reconnect();
                    }
                }
            },1000,5000);
            return webSocketClient;
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
        return null;
    }

}

4.演示

断线重连
服务端

总结

源码码云地址:https://gitee.com/panbanglin/spring-boot-websocket-usage.git

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐