前言:最近项目上需要用到这个技术,但是真正集成到SpringCloud项目运行时,遇到各种问题。查了很多博客也没有一篇相对完整的,大多数是demo代码。下面将完整地分享从 Client-->Nginx-->gateway-->server 到返回的整个功能实现。

一、基本概念

1.websocket基础概念

WebSocket是一种通信协议,可在单个TCP连接上进行全双工通信。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就可以建立持久性的连接,并进行双向数据传输。

简单易懂:WebSocket可以实现客户端与服务端的双向通讯,最大也是最明显的区别就是可以做到服务端主动将消息推送给客户端

 

2.websocket的特点

  • 握手阶段采用 HTTP 协议。
  • 数据格式轻量,性能开销小。客户端与服务端进行数据交换时,服务端到客户端的数据包头只有2到10字节,客户端到服务端需要加上另外4字节的掩码。HTTP每次都需要携带完整头部。
  • 更好的二进制支持,可以发送文本,和二进制数据
  • 没有同源限制,客户端可以与任意服务器通信
  • 协议标识符是ws(如果加密,则是wss),请求的地址就是后端支持websocket的API。

 

3.什么场景下使用

在项目没有使用websocket时,如果客户端(前端)想要实时获取后端的数据变化,需要定一个定时器,一直轮询地调用后端接口。这样开销太大,也不是真的实时,而且是很被动的。

  • 定时任务时间间隔,多久调用一次。太长达不到效果,太短又请求太频繁
  • 假设并发很高的话,这对服务端也是个考验

WebSocket一次握手,持久连接,以及主动推送的特点可以解决上边的问题,又不至于损耗性能。

真实使用场景:日志刷新、监控调度平台、以及一些也和业务相关的需要服务端主动发消息的场景

 

二、版本信息和配置

SpringCloud: Hoxton.SR6

Gateway: 2.2.3.RELEASE

<dependencyManagement>
    <dependencies>
	    <dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-dependencies</artifactId>
			<version>Hoxton.SR6</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
    </dependencies>
</dependencyManagement>


<dependencies>
    <dependency>
		<groupId>org.springframework.cloud</groupId>
		<artifactId>spring-cloud-starter-gateway</artifactId>
	</dependency>
</dependencies>

 

SpringBoot: 2.3.0.RELEASE

Spring-boot-starter-websocket: 2.3.0.RELEASE

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.0.RELEASE</version>
</parent>

<dependencies>
    <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
</dependencies>

Nginx: 1.19.0

 

三、功能实现

1.编写Websocket服务端

有一个Sprinboot服务:端口8086,在项目里引入websocket依赖

package com.yonjar.demo.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author luoyj
 * @date 2021/5/17.
 * @description @ServerEndpoint(value = "/test/websocket") 这个地址要和前端调用保持一致
 */
@Slf4j
@ServerEndpoint(value = "/test/websocket")
@Component
public class WebSocketServer {

    /** 记录当前在线连接数 */
    private static final AtomicInteger onlineCount = new AtomicInteger(0);

    /** 存放所有在线的客户端 */
    private static final Map<String, Session> clients = new ConcurrentHashMap<>();

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        onlineCount.incrementAndGet(); // 在线数加1
        clients.put(session.getId(), session);
        log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session) {
        onlineCount.decrementAndGet(); // 在线数减1
        clients.remove(session.getId());
        log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
    }

    /**
     * 收到客户端消息后调用的方法
     * @param message
     * 客户端发送过来的消息
     * 当业务改动数据时,可以主动发消息(不需要客户端主动请求)
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
        this.sendMessage(message);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }

    /**
     * 群发消息
     * @param message
     * 消息内容
     */
    public void sendMessage(String message) {
        for (Map.Entry<String, Session> sessionEntry : clients.entrySet()) {
            Session toSession = sessionEntry.getValue();
            /* 排除掉自己
            if (!fromSession.getId().equals(toSession.getId())) {
                log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
                toSession.getAsyncRemote().sendText(message);
            }*/
            toSession.getAsyncRemote().sendText(message);
        }
    }
}

2.添加配置类

package com.yonjar.demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author luoyj
 * @date 2021/5/17.
 * @description 因为使用的是原生API,不需要另外实现接口或集成类
 */
@Configuration
public class WebSocketConfig {

    /**
     * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3.编写客户端index.html(前端请求)

demo测试时,可以放在Springboot项目的resources/static/index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>My WebSocket</title>
</head>

<body>
    <input id="text" type="text" />
    <button onclick="send()">Send</button>
    <button onclick="closeWebSocket()">Close</button>
    <div id="message"></div>
</body>

<script type="text/javascript">
    var websocket = null;

    //判断当前浏览器是否支持WebSocket, 主要此处要更换为自己的地址
    if ('WebSocket' in window) {
        websocket = new WebSocket("ws://localhost:8086/test/websocket");
    } else {
        alert('Not support websocket')
    }

    //连接发生错误的回调方法
    websocket.onerror = function() {
        setMessageInnerHTML("error");
    };

    //连接成功建立的回调方法
    websocket.onopen = function(event) {
        setMessageInnerHTML("open");
    }

    //接收到消息的回调方法
    websocket.onmessage = function(event) {
        setMessageInnerHTML(event.data);
    }

    //连接关闭的回调方法
    websocket.onclose = function() {
        setMessageInnerHTML("close");
    }

    //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
    window.onbeforeunload = function() {
        websocket.close();
    }

    //将消息显示在网页上
    function setMessageInnerHTML(innerHTML) {
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }

    //关闭连接
    function closeWebSocket() {
        websocket.close();
    }

    //发送消息
    function send() {
        var message = document.getElementById('text').value;
        websocket.send(message);
    }
</script>
</html>

4.在线请求测试

  1. 启动server项目,浏览器访问:http://localhost:8086/index.html
     
  2. 也可以使用在线连接工具进行测试

    http://www.jsons.cn/websocket/
  3. 以上就是demo的测试使用。但是真正集成到企业项目时,就会遇到各种问题
     

四、集成到真实项目

首先考虑Nginx转发websocket是否支持,其次是gateway进行路由转发ws请求到具体的服务,然后是请求到服务连接成功进行业务处理,最后还要考虑鉴权以及并发问题。

1.Nginx配置:升级,让它支持websocket转发

注意配置正确的位置(看图)

map $http_upgrade $connection_upgrade {
     default upgrade;
     ''      close;
}


#升级http1.1到 websocket协议
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection  $connection_upgrade;


2.gateway配置:配置路由,支持转发ws请求

server:
  port: 8080

spring:
  application:
    name: app-gateway
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true
      enabled: true
      routes:
        #表示websocket的转发
        - id: app-metadata-websocket
          uri: lb:ws://app-metadata
          predicates: Path=/api/web/**
          filters: StripPrefix=2
        #正常接口转发
        - id: app-metadata
          uri: lb://app-metadata
          predicates: Path=/api/**
          filters: StripPrefix=1

3.websocket基于协议头传递token

建议前端使用原生websocket API请求

  • 使用封装过的api,例如 SocketJS 会有跨域,后端也需另外配置。
  • websocket连接成功后,如果没有进行通信,过一段时间后会断开连接。所以还需要前端隔5或10秒发送一个心跳请求后台。
  • 页面初始化没有成功后,还需要处理重试连接。

websocket请求头中可以包含Sec-WebSocket-Protocol这个属性,该属性是一个自定义的子协议。它从客户端发送到服务器并返回从服务器到客户端确认子协议。我们可以利用这个属性添加token。

var token='jlllwei68jj776'

var  ws = new WebSocket("ws://" + url+ "/webSocketServer",[token]);

4.WebsocketServer服务

01.编写过滤器获取token

拿到token可以解析判断,set 到response里面,否则Gateway源码WebSocketClientHandshaker会报异常,因为response没拿到子协议

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.servlet.*;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

/**
 * @Author luoyj
 * @Date 2021/6/7.
 */
@Slf4j
@Order(1)
@Component
@WebFilter(filterName = "WebSocketFilter", urlPatterns = "/websocket/app/edit")
public class WebSocketFilter implements Filter {

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletResponse response = (HttpServletResponse) servletResponse;
        String token = ((HttpServletRequest) servletRequest).getHeader("Sec-WebSocket-Protocol");
        log.info("【WebSocketFilter】response.setHeader = key:{},value:{}","Sec-WebSocket-Protocol",token);
        /*if (StringUtils.isNotBlank(token)) {
            response.setHeader("Sec-WebSocket-Protocol",token);
            filterChain.doFilter(servletRequest, servletResponse);
        }else {
            throw new BizException(ErrorCode.TEAMWORK_WS_NOT_TOKEN,"websocket请求没有携带token,无法请求!");
        }*/
        if (StringUtils.isNotBlank(token)) response.setHeader("Sec-WebSocket-Protocol",token);
        filterChain.doFilter(servletRequest, servletResponse);

    }
}

02.编写前后端请求数据结构实体Message,Encoder编码器,Decoder解码器。这样发送和接收信息可以更好的处理 。

import com.authine.mvp.app.metadata.domain.enums.TeamworkEditEvent;
import com.authine.mvp.app.metadata.domain.enums.TeamworkEditType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @Author luoyj
 * @Date 2021/5/19.
 * 业务数据结构,根据实际业务场景定义
 * 空参构造、有参构造不可少
 */
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TeamworkEditMessage {

    
    private String appCode;
    private TeamworkEditType teamworkEditType;
    private String code;

    /**
     * 必传
     * HEART_BEAT(0,"心跳"),
     * COMPETING_LOCK(1,"抢锁"),
     * CLEARING_LOCK(2,"释放锁"),
     * REFRESH_EXPIRE_TIME(3,"刷新锁的有效时间"),
     * LOCK_STATUS(4,"查看锁状态"),
     * SAVE(5,"保存数据,群发通知"),
     * EDIT(6,"编辑操作,群发通知"),
     * DELETE(7,"删除操作,群发通知"),
     * 例子参数传:COMPETING_LOCK
     */
    private TeamworkEditEvent teamworkEditEvent;

    private Boolean haveLock;
    private int expireTime;
    private String editUserId;
    private String editUserName;
    private String remark;

    /**
     * 标识当前code是锁定 还是未锁定
     */
    private Boolean codeLock;

}
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;

import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;

/**
 * @Author luoyj
 * @Date 2021/5/18.
 */
@Slf4j
public class TeamworkEditEncoder implements Encoder.Text<TeamworkEditMessage> {
    @Override
    public String encode(TeamworkEditMessage teamworkEditMessage) throws EncodeException {
        try {
            return JSON.toJSONString(teamworkEditMessage);
        } catch (Exception e) {
            e.printStackTrace();
            log.info("服务端数据转换json结构失败!");
            return "";
        }
    }

    @Override
    public void init(EndpointConfig endpointConfig) {

    }

    @Override
    public void destroy() {

    }
}
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import javax.websocket.EndpointConfig;

/**
 * @Author luoyj
 * @Date 2021/5/19.
 */
@Slf4j
public class TeamworkEditDecoder implements Decoder.Text<TeamworkEditMessage> {

    @Override
    public TeamworkEditMessage decode(String jsonMessage) throws DecodeException {
        return JSONObject.parseObject(jsonMessage, TeamworkEditMessage.class);
    }

    @Override
    public boolean willDecode(String jsonMessage) {
        try {
            JSONObject.parseObject(jsonMessage, TeamworkEditMessage.class);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            log.info("客户端发送消息到服务端,数据解析失败!");
            return false;
        }
    }

    @Override
    public void init(EndpointConfig endpointConfig) {

    }

    @Override
    public void destroy() {

    }
}
import com.authine.mvp.app.metadata.domain.enums.TeamworkEditEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author luoyj
 * @Date 2021/5/18.
 *
 * protocol
 */
@Slf4j
@ServerEndpoint(value = "/websocket/app/edit", encoders = TeamworkEditEncoder.class, decoders = TeamworkEditDecoder.class, subprotocols = {"sec-webSocket-protocol"})
@Component
public class WebSocketServer {

    /** 记录当前在线连接数 */
    private static final AtomicInteger onlineCount = new AtomicInteger(0);

    /** 存放所有在线的客户端 */
//    private static final Map<String, Session> clients = new ConcurrentHashMap<>();
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();
    private Session session;

    @OnOpen
    public void onOpen(Session session) throws IOException, EncodeException {
        onlineCount.incrementAndGet();
//        clients.put(session.getId(),session);
        this.session  = session;
        webSocketSet.add(this);
        log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
    }
    
    @OnClose
    public void onClose(Session session) {
        onlineCount.decrementAndGet();
        webSocketSet.remove(this);
//        clients.remove(session.getId());
        log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
    }

    @OnMessage
    public void onMessage(TeamworkEditMessage message, Session session) throws IOException, EncodeException {
        //TODO 参数校验
        log.info("服务端收到客户端【{}】的消息:{}", session.getId(), message.toString());
        TeamworkEditEvent teamworkEditEvent = message.getTeamworkEditEvent();
        if (teamworkEditEvent == null) {
            log.info("teamworkEditEvent 为空!");
            return;
        }
        log.info("处理客户端的请求:{}",teamworkEditEvent.getName());
        this.handleEvent(message, session);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }

    /**
     * 群发消息
     * @param message
     * 消息内容
     */
    public void sendMessage(TeamworkEditMessage message) throws IOException, EncodeException {
        for (WebSocketServer webSocketServer : webSocketSet) {
            Session toSession = webSocketServer.session;
            synchronized (toSession){
                toSession.getBasicRemote().sendObject(message);
            }
        }
        /*for (Map.Entry<String, Session> sessionEntry : clients.entrySet()) {
            Session toSession = sessionEntry.getValue();
            synchronized(toSession){
                toSession.getBasicRemote().sendObject(message);
            }
        }*/
        log.info("服务端给客户端群发发送消息{}",message.toString());

    }

    /**
     * 对特定客户端发送消息
     * @param message
     * @param toSession
     */
    public void sendMessageToOne(TeamworkEditMessage message, Session toSession) throws IOException, EncodeException {
        log.info("服务端给指定客户端【{}】 发送消息{}",toSession.getId(),message.toString());
        synchronized(toSession){
            toSession.getBasicRemote().sendObject(message);
        }
    }

    /**
     * 处理客户请求
     * @param message
     */
    private void handleEvent(TeamworkEditMessage message, Session session) throws IOException, EncodeException {
        TeamworkEditEvent teamworkEditEvent = message.getTeamworkEditEvent();
        String editUserId = message.getEditUserId();
        String editUserName = message.getEditUserName();
        log.info("【websocket】LoginId:{},LoginName:{}",editUserId,editUserName);
        if (teamworkEditEvent.getIndex() != 0){
            if (editUserId == null || editUserName == null) {
                message.setRemark("editUserId 和 editUserName 不能为空");
                this.sendMessageToOne(message,session);
                return;
            }
        }else {
            editUserId = "001";
            editUserName = "heartbeat";
        }
        
        switch (teamworkEditEvent.getIndex()){
            case 0:
                log.info("WebSocket 心跳检测");
                this.sendMessageToOne(message,session);
                break;
                
            default:
                log.info("TeamworkEditEvent事件类型:{} 不存在!",teamworkEditEvent.getName());
        }

    }
    

}

WebSocketConfig配置类同上 

 03.分析调用链路

首先启动 nginx:80,启动gateway:8080,启动app-metadata服务(websocketServer)

请求的调用链路:由于网关做了请求前缀限制,必须已 /api 开头,所以在配置nginx升级时,是在 location /api/ { } 进行配置。然后在gateway配置路由时
          uri: lb:ws://app-metadata
          predicates: Path=/api/web/**
          filters: StripPrefix=2

app-metadata是websocketServer服务,lb表示负载均衡,ws表示websocket请求。StripPrefix=2表示请求到app-metadata服务时,过滤掉/api/web/前缀。

实际前端请求地址为(80可省略):ws://localhost/api/web/websocket/app/edit

 

五、注意事项

1.检查网关请求是否有前缀配置

spring: 
  webflux: 
    base-path: /api

2.检查网关是否做了黑白名单控制

3.在WebsocketServer类里,无法直接注入Bean。可通过编写ApplicationContextUtil来获取Ioc容器里的Bean

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @author luoyj
 * @date 2021/3/17.
 * @description
 */
@Component
public class ApplicationContextUtil implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        ApplicationContextUtil.applicationContext = applicationContext;
    }

    public static Object getBean(String name) throws BeansException {
        return applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> clazz) throws BeansException {
        return applicationContext.getBean(clazz);
    }

    public static ApplicationContext getApplicationContext(){
        return applicationContext;
    }

}

 

六、参考链接

https://blog.csdn.net/qq_34168515/article/details/108009811
https://www.jianshu.com/p/cfe3dbda9023
https://blog.csdn.net/zlxls/article/details/78504591/
https://www.cnblogs.com/kiwifly/p/11729304.html
https://www.cnblogs.com/zhongjidoushi/p/13367144.html
https://www.cnblogs.com/zhangXingSheng/p/11969633.html
https://www.cnblogs.com/xuwenjin/p/12664650.html

 

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐