常见的都是springboot应用做服务,前端页面做客户端,进行websocket通信进行数据传输交互。但其实springboot服务也能做客户端去连接别的webSocket服务提供者。
刚好最近在项目中就使用到了,需求背景大概就是我们作为一个java段应用需要和一个C语言应用进行通信。在项目需求及环境等多方面的考量之下,最后放了使用http协议和C程序进行通信转而使用webSocket,然后在C侧开发人员的要求下,由他们做服务端,我们做客户端。

引入pom依赖

<dependency>
    <groupId>org.java-websocket</groupId>
    <artifactId>Java-WebSocket</artifactId>
    <version>1.5.2</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>

配置类

ws连接配置

cancan:
  websocket:
    client:
      config:
        - wsUrl: ws://127.0.0.1:8080/websocket/${cancan.websocket.client.config[0].wsName}
          wsName: ws-01
          enableHeartbeat: true
          heartbeatInterval: 20000
          enableReconnection: true
#        - wsUrl: ws://localhost:8083
#          wsName: ws-02
#          enableHeartbeat: true
#          heartbeatInterval: 20000
#          enableReconnection: true
server:
  port: 8099

WebsocketClientConfiguration读取配置文件配置

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/10/13 14:11
 * @Description:
 */
@Configuration
@ConfigurationProperties(prefix = "cancan.websocket.client")
public class WebsocketClientConfiguration {

    private List<ServerProperties> config;

    public static class ServerProperties {
        /**
         * websocket server ws://ip:port
         */
        private String wsUrl;
        /**
         * websocket server name,用于区分不同的服务端
         */
        private String wsName;
        /**
         * 是否启用心跳监测 默认开启
         */
        private Boolean enableHeartbeat;
        /**
         * 心跳监测间隔 默认20000毫秒
         */
        private Integer heartbeatInterval;
        /**
         * 是否启用重连接 默认启用
         */
        private Boolean enableReconnection;

        public String getWsUrl() {
            return wsUrl;
        }

        public void setWsUrl(String wsUrl) {
            this.wsUrl = wsUrl;
        }

        public Boolean getEnableHeartbeat() {
            return enableHeartbeat;
        }

        public void setEnableHeartbeat(Boolean enableHeartbeat) {
            this.enableHeartbeat = enableHeartbeat;
        }

        public Integer getHeartbeatInterval() {
            return heartbeatInterval;
        }

        public void setHeartbeatInterval(Integer heartbeatInterval) {
            this.heartbeatInterval = heartbeatInterval;
        }

        public Boolean getEnableReconnection() {
            return enableReconnection;
        }

        public void setEnableReconnection(Boolean enableReconnection) {
            this.enableReconnection = enableReconnection;
        }

        public String getWsName() {
            return wsName;
        }

        public void setWsName(String wsName) {
            this.wsName = wsName;
        }
    }

    public List<ServerProperties> getConfig() {
        return config;
    }

    public void setConfig(List<ServerProperties> config) {
        this.config = config;
    }
}

WebsocketMultipleBeanConfig连接服务端并保存到bean中

import com.example.demo.service.WebsocketRunClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/10/13 14:14
 * @Description: Websocket多客户端配置
 */
@Slf4j
@Configuration
public class WebsocketMultipleBeanConfig {

    @Bean
    public Map<String, WebsocketRunClient> websocketRunClientMap(WebsocketClientConfiguration websocketClientConfiguration){

        Map<String, WebsocketRunClient> retMap = new HashMap<>(5);

        List<WebsocketClientConfiguration.ServerProperties> config = websocketClientConfiguration.getConfig();

        for (WebsocketClientConfiguration.ServerProperties serverProperties : config) {

            String wsUrl = serverProperties.getWsUrl();
            String wsName = serverProperties.getWsName();
            Boolean enableReconnection = serverProperties.getEnableReconnection();
            Boolean enableHeartbeat = serverProperties.getEnableHeartbeat();
            Integer heartbeatInterval = serverProperties.getHeartbeatInterval();

            try {
                WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(wsUrl),wsName);
                websocketRunClient.connect();
                websocketRunClient.setConnectionLostTimeout(0);

                new Thread(()->{
                    while (true){
                        try {
                            Thread.sleep(heartbeatInterval);
                            if(enableHeartbeat){
                                websocketRunClient.send("[websocket "+wsName+"] 心跳检测");
                                log.info("[websocket {}] 心跳检测",wsName);
                            }
                        } catch (Exception e) {
                            log.error("[websocket {}] 发生异常{}",wsName,e.getMessage());
                            try {
                                if(enableReconnection){
                                    log.info("[websocket {}] 重新连接",wsName);
                                    websocketRunClient.reconnect();
                                    websocketRunClient.setConnectionLostTimeout(0);
                                }
                            }catch (Exception ex){
                                log.error("[websocket {}] 重连异常,{}",wsName,ex.getMessage());
                            }
                        }
                    }
                }).start();

                retMap.put(wsName,websocketRunClient);
            } catch (URISyntaxException ex) {
                log.error("[websocket {}] 连接异常,{}",wsName,ex.getMessage());
            }
        }
        return retMap;

    }

}

客户端

import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;
import java.nio.ByteBuffer;

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/10/13 14:13
 * @Description:
 */
@Slf4j
public class WebsocketRunClient extends WebSocketClient {
    /**
     * websocket连接名称
     */
    private String wsName;


    public WebsocketRunClient(URI serverUri,String wsName) {
        super(serverUri);
        this.wsName = wsName;
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        log.info("[websocket {}] Websocket客户端连接成功",wsName);
    }

    @Override
    public void onMessage(String msg) {
        log.info("[websocket {}] 收到String消息:{}",wsName,msg);
    }

    @Override
    public void onMessage(ByteBuffer bytes) {
        log.info("[websocket {}] 收到ByteBuffer消息:{}",wsName);
    }


    @Override
    public void onClose(int code, String reason, boolean remote) {
        log.info("[websocket {}] Websocket客户端关闭",wsName);
        System.out.println("Connection closed by " + (remote ? "remote peer" : "us") + " Code: " + code + " Reason: " + reason);
    }

    @Override
    public void onError(Exception e) {
        log.info("[websocket {}] Websocket客户端出现异常, 异常原因为:{}",wsName,e.getMessage());
    }


}

发送消息

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/10/13 11:55
 * @Description:
 */
@RestController
public class ctrl {

    @Value("${cancan.websocket.client.config[0].wsName}")
    private String ws1Name;


    @Autowired
    private Map<String, WebsocketRunClient> websocketRunClientMap;

    @RequestMapping("/send")
    public String send(String text){
        System.out.println(ws1Name);
        WebsocketRunClient websocketRunClient = websocketRunClientMap.get(ws1Name);
        websocketRunClient.send(text);
        return "发送成功";
    }

    @RequestMapping("/close")
    public String close(){
        WebsocketRunClient websocketRunClient = websocketRunClientMap.get("ws-01");
        websocketRunClient.close();
        return "关闭成功";
    }
    
}

改造

上面是一个客户端的例子,但是这次的需求有些特殊。ws的连接需要在数据库表中读取,而且这个表的数据用户在系统页面上可以随时进行增删改查的操作。这个时候上面读配置文件,在项目启动时就发起ws连接,然后保存连接的方法就行不通了。
根据需求改造之后的设计是这样的,ws连接依旧保存再config注册bean的map中。但是注册bean哪里我们只是将这个map初始化,并不初始化ws连接。ws连接使用定时任务初始,定时读取表中的数据,如果表中的ws连接再map中没有,则初始化后放入map,如果存在表中没有而map中有的ws连接,则移除map中的这条ws连接。

WebsocketMultipleBeanConfig配置类

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/10/18 9:20
 * @Description: websocket配置,用于开启websocket支持
 */
@Configuration
@Slf4j
public class WebsocketMultipleBeanConfig {


    @Bean
    public Map<String, WebsocketRunClient> websocketRunClientMap(){
        Map<String, WebsocketRunClient> retMap = new HashMap<>(8);
        return retMap;
    }


}

ws初始化定时任务

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/10/18 11:26
 * @Description: 读取配置,保持ws连接
 */
@Component
@Slf4j
public class KeepConnectTask {

    @Value("${signaling.tracking.webSocket.uri:/websocket/}")
    private String uri;

    @Autowired
    private ServerConfigService serverConfigService;


    @Autowired
    private Map<String, WebsocketRunClient> websocketRunClientMap;

//    @Scheduled(cron = "0/20 * * * * ? ")
    public void execute() throws Exception {
        InetAddress adds = InetAddress.getLocalHost();
        String hostIp = adds.getHostAddress();
        //查询服务器配置
        //这里就是查询表的方法 测试方便先用固定数据代替了
//        List<ServerConfigModel> serverConfigs = serverConfigService.selectAll();
        List<ServerConfigModel> serverConfigs = new ArrayList<>();
        ServerConfigModel serverConfigModel = new ServerConfigModel();
        serverConfigModel.setServerIp("127.0.0.1");
        serverConfigModel.setServerPort("8080");
        serverConfigs.add(serverConfigModel);
        Map<String,String> configs = new HashMap<>();
        for(ServerConfigModel serverConfig:serverConfigs){
            String key = "ws://" + serverConfig.getServerIp() + ":" + serverConfig.getServerPort();
            String wsName = "ws-" + serverConfig.getServerIp() + ":" + serverConfig.getServerPort();
            String value = key + uri + wsName;
            configs.put(wsName,value);
        }
        //删除、关闭已不在配置中的连接
        //注意使用keySet边遍历边删除会报错,建议使用迭代器遍历删除
        Iterator<Map.Entry<String, WebsocketRunClient>> it = websocketRunClientMap.entrySet().iterator();
        while(it.hasNext()){
            Map.Entry<String, WebsocketRunClient> enter = it.next();
            if(!configs.containsKey(enter.getKey())){
                enter.getValue().close();
                it.remove();
            }
        }
        //已有连接心跳发送
        for(String key:websocketRunClientMap.keySet()){
            WebsocketRunClient client = websocketRunClientMap.get(key);
            try{
                client.send("websocket[" + hostIp + "]心跳检测");
                log.info("websocket[" + hostIp + "]心跳检测");
            } catch (Exception e){
                log.error("websocket[{}] 发生异常{}",hostIp,e.getLocalizedMessage());
                e.printStackTrace();
                try {
                    client.reconnect();
                    client.setConnectionLostTimeout(0);
                } catch (Exception ex){
                    log.error("websocket[{}] 重连异常,{}",hostIp,ex.getMessage());
                }
            }
        }
        //新配置增加连接
        for(String key:configs.keySet()){
            if(!websocketRunClientMap.containsKey(key)){
                String url = configs.get(key);
                try{
                    WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(url),key);
                    websocketRunClient.connect();
                    websocketRunClient.setConnectionLostTimeout(0);
                    websocketRunClientMap.put(key,websocketRunClient);
                } catch (Exception e){
                    log.error("websocket[{}] 新增连接异常,{}",key,e.getMessage());
                }
            }
        }
    }

}

发送消息

发送消息仍然和之前一样先注入

@Autowired
private Map<String, WebsocketRunClient> websocketRunClientMap;

然后map.get拿到对应想发送消息的WebsocketRunClient,调用其中的send方法即可

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐