我提倡的是用白话文,和经验去传承分享程序,让程序更加开源话,共享化,让后来者,初学者,遇到此类困难者等,少走弯路,提高效率,不喜勿喷。有更好的建议,可以留言探讨。。。。

首先说下此案例当时做的时候有点复杂,最后还是克服完成。拿出来广大网友分享,但愿能帮助你在java程序的道路上越走越远。。。
背景:
客户通过网页,在通过websocket协议,和Java后端创建连接,在通过Java后端和语音交互接口创建连接,接收到的返回数据,返回给前端在页面展示出来。Java就是做了一个中间件的作用。即是服务端又是客户端。

  1. 说完背景,直接上干货
  • maven依赖
  <dependency>
            <groupId>org.java-websocket</groupId>
            <artifactId>Java-WebSocket</artifactId>
            <version>1.4.0</version>
        </dependency>
  <!-- webSocket 开始-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <!-- webSocket 结束-->
  • 代码

  • Java后端接口与前端交互(服务端)

package com.datago.robot.common.utils;

import com.datago.robot.entity.DatagoApiWithBLOBs;
import com.datago.robot.service.DatagoApiService;
import org.java_websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 *
 */
@ServerEndpoint("/serverVue/{apiCode}")
@Component
public class SocketServerUtils {

    private Logger log = LoggerFactory.getLogger(SocketServerUtils.class);
    private Session session;
    private WebSocketClient client;
    private String apiCode;
    /**
     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
     */
    private static int onlineCount = 0;
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     * 在外部可以获取此连接的所有websocket对象,并能对其触发消息发送功能,我们的定时发送核心功能的实现在与此变量
     */
    private static CopyOnWriteArraySet<SocketServerUtils> webSocketMap = new CopyOnWriteArraySet<SocketServerUtils>();

    private static DatagoApiService datagoApiService;

    @Autowired
    public void setDatagoApiService(DatagoApiService datagoApiService) {
        SocketServerUtils.datagoApiService = datagoApiService;
    }

    /**
     * 建立连接
     *
     * @param apiCode
     * @param session
     */
    @OnOpen
    public void onOpen(@PathParam(value = "apiCode") String apiCode, Session session) {
        this.session = session;
        addOnlineCount();
        webSocketMap.add(this);
        //连接调用第三方服务
        client = SocketResultUtils.getClient("ws://192.168.0.109:8013/websocket");
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的音频数据
     */
    @OnMessage
    public void onMessage(@PathParam(value = "apiCode") String apiCode, byte[] message) {
        log.info("音频数据报文:" + message);
        try {
            //在线
            client.send(message);
            ExceptionLog.isSuccess(apiCode, null,null);
        } catch (Exception e) {
            //离线
            isconnct(apiCode);
            client.send(message);
            ExceptionLog.isSuccess(apiCode, null,null);
        }
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的字符数据
     */
    @OnMessage
    public void onMessage(@PathParam(value = "apiCode") String apiCode, String message) {
        log.info("字符数据报文:" + message);
        try {
            //在线
            client.send(message);
            ExceptionLog.isSuccess(apiCode, message,null);
        } catch (Exception e) {
            //离线
            isconnct(apiCode);
            client.send(message);
            ExceptionLog.isSuccess(apiCode, message,null);
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(@PathParam(value = "apiCode") String apiCode) {
        //请求第三方关闭连接
        webSocketMap.remove(this);
        subOnlineCount();
        log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error, @PathParam(value = "apiCode") String apiCode) {
        log.error("错误原因:" + error.getMessage());
    }


    /**
     * 新加
     * 判断连接(在线,离线)
     *
     * @param apiCode
     */
    private void isconnct(String apiCode) {
        DatagoApiWithBLOBs datagoApi = datagoApiService.selectByApiCode(apiCode);
        if (Utils.isNotEmpty(datagoApi)) {
            //判断在线(true),离线(false)
           //  在线
            String apiUrl = datagoApi.getApiUrl();
            if (apiUrl.startsWith("ws")) {
                client = SocketResultUtils.getClient(apiUrl);
                if (Utils.isEmpty(client)) {
                    //离线
                    String apiOffUrl = datagoApi.getApiOffUrl();
                    if (apiOffUrl.startsWith("ws")) {
                        client = SocketResultUtils.getClient(apiOffUrl);
                        if (Utils.isEmpty(client)) {
                            ExceptionLog.isConnect(apiCode, null);
                        }
                    } else {
                        ExceptionLog.isUrl(apiCode, null);
                    }
                }
            } else {
                ExceptionLog.isUrl(apiCode, null);
            }
        }
    }

    /**
     * 实现服务器主动推送
     *
     * @param message
     */
    public void sendMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        SocketServerUtils.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        SocketServerUtils.onlineCount--;
    }

    public static CopyOnWriteArraySet<SocketServerUtils> getWebSocketSet() {
        return webSocketMap;
    }

    public static void setWebSocketSet(CopyOnWriteArraySet<SocketServerUtils> webSocketSet) {
        SocketServerUtils.webSocketMap = webSocketSet;
    }

}

  • Java与第三方服务交互代码(客户端),接收第三方服务的返回数据
package com.datago.robot.common.utils;

import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.net.*;

import java.util.concurrent.CopyOnWriteArraySet;

@Slf4j
@Component
public class SocketResultUtils {

    public static Logger log = LoggerFactory.getLogger(SocketResultUtils.class);


    /**
     * 获取客户端连接实例
     *
     * @param uri
     * @return
     */
    public static WebSocketClient getClient(String uri) {

        try {

            //创建客户端连接对象
            WebSocketClient client = new WebSocketClient(new URI(uri), new Draft_6455()) {

                /**
                 * 建立连接调用
                 * @param serverHandshake
                 */
                @Override
                public void onOpen(ServerHandshake serverHandshake) {
                    log.info("建立连接");
                }

                /**
                 * 收到服务端消息调用
                 * @param s
                 */
                @Override
                public void onMessage(String s) {
                    log.info("处理结束返回消息:" + s);
                    //接收第三方websocket服务发送数据
                    CopyOnWriteArraySet<SocketServerUtils> webSocketSet =
                            SocketServerUtils.getWebSocketSet();
                    if (Utils.isNotEmpty(webSocketSet)) {
                        int i = 0;
                        synchronized (webSocketSet) {
                            webSocketSet.forEach(c -> {
                                //返回前端数据
                                log.info("返回结果数据, data = {}", s);
                                for (int j = 0; j < 10; j++) {
                                    c.sendMessage(s);
                                }
                            });
                            log.info("收到来自服务端的消息:::" + s);
                        }
                    }
                }

                /**
                 * 断开连接调用
                 * @param i
                 * @param s
                 * @param b
                 */
                @Override
                public void onClose(int i, String s, boolean b) {
                    log.info("关闭连接:::" + "i = " + i + ":::s = " + s + ":::b = " + b);
                }

                /**
                 * 连接报错调用
                 * @param e
                 */
                @Override
                public void onError(Exception e) {
                    log.error("报错了:::" + e.getMessage());
                }
            };
            //请求与服务端建立连接
            client.connect();
            //判断连接状态,0为请求中  1为已建立  其它值都是建立失败
            while (client.getReadyState().ordinal() == 0) {
                try {
                    Thread.sleep(200);
                } catch (Exception e) {
                    log.warn("延迟操作出现问题,但并不影响功能");
                }
                log.info("连接中。。。");
                break;
            }
            //连接状态不再是0请求中,判断建立结果是不是1已建立
            if (client.getReadyState().ordinal() == 1) {
                return client;
            }
        } catch (URISyntaxException e) {

            log.error(e.getMessage());
        }
        return null;
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐