简介

本文介绍使用Springboot简单实现Websocket的完整流程。由于是入门教程,所以以一个简单的案例呈现,包括了服务端和客户端的实现,这里服务端使用Springboot搭建,而客户端使用Java来实现。

什么是Websocket

在开始之前,先简单介绍一下Websocket的相关知识。Websocket是应用层协议,传输层使用了TCP协议来实现。Websocket与HTTP协议的不同之处在于,Websocket是全双工的,支持服务器与客户端的双向通信,而HTTP协议只能由客户端向服务端发起请求来获取资源。因此,Websocket的一个常见的应用场景是服务端向多个客户端推送消息。

环境配置

笔者的开发环境配置为:

  • AdoptOpenJDK 11
  • IntelliJ IDEA 2020.1.2 x64

服务端实现

本章节介绍服务端的实现。
1、引入Jar包
首先,在IDEA中创建一个Maven项目,在pom.xml文件中引入所需要的Jar包,如下所示:

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
</dependencies>

一个是springboot的starter,另一个是springboot内部集成的websocket。
2、Springboot集成的Websocket
使用Springboot集成的websocket需要使用到以下几个类/接口:

  1. WebSocketHandler:WebSocket消息以及生命周期事件的处理器。
  2. WebSocketConfigurer:对WebSocket进行配置,包括配置拦截器、配置接口地址。
  3. HttpSessionHandshakeInterceptor:拦截器,可以对Websocket通信过程中的请求进行拦截处理。

除了上述三个官方提供的类/接口之外,我们还需要实现一个WebSocket的包装类,用于为每一个WebSocket实例添加额外的信息,比如客户端的ID。在Spring内,WebSocket实例以WebSocketSession形式存在,每个session都代表了一个服务端与客户端的会话。
2.1、创建WebSocket包装类

import org.springframework.web.socket.WebSocketSession;

public class WebSocketBean {

    private WebSocketSession webSocketSession;
    private int clientId;

    public int getClientId() {
        return clientId;
    }

    public void setClientId(int clientId) {
        this.clientId = clientId;
    }
    
    public WebSocketSession getWebSocketSession() {
        return webSocketSession;
    }

    public void setWebSocketSession(WebSocketSession webSocketSession) {
        this.webSocketSession = webSocketSession;
    }
}

这里的包装类很简单,仅添加了一个客户端ID的属性,这里仅作为简单的示例。

2.2、实现WebSocketHandler接口
WebSocketHandler接口用于处理WebSocket的消息,Spring提供了一个抽象类AbstractWebSocketHandler实现了WebSocketHandler接口,因此我们可以直接继承抽象类,重写需要实现的方法即可。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class MyWebsocketHandler extends AbstractWebSocketHandler {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    private static final Map<String, WebSocketBean> webSocketBeanMap;
    private static final AtomicInteger clientIdMaker;   //仅用用于标识客户端编号

    static {
        webSocketBeanMap = new ConcurrentHashMap<>();
        clientIdMaker = new AtomicInteger(0);
    }

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    	//当WebSocket连接正式建立后,将该Session加入到Map中进行管理
        WebSocketBean webSocketBean = new WebSocketBean();
        webSocketBean.setWebSocketSession(session);
        webSocketBean.setClientId(clientIdMaker.getAndIncrement());
        webSocketBeanMap.put(session.getId(), webSocketBean);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
    	//当连接关闭后,从Map中移除session实例
        webSocketBeanMap.remove(session.getId());
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
    	//传输过程中出现了错误
        if (session.isOpen()){
            session.close();
        }
        webSocketBeanMap.remove(session.getId());
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
    	//处理接收到的消息
        logger.info("Received message from client[ID:" + webSocketBeanMap.get(session.getId()).getClientId() +
                "]; Content is [" + message.getPayload() + "].");
        TextMessage textMessage = new TextMessage("Server has received your message.");
        session.sendMessage(textMessage);
    }
}

实现的逻辑很简单,在每个方法都有了注释。值得注意的是,这里在handleTextMessage处理接收到的消息,表示处理接收到的字符串消息。除此之外,AbstractWebSocketHandler还提供了handleBinaryMessage以及handlePongMessage,前者表示处理二进制消息,而后者表示处理心跳数据包的信息。

2.3、实现WebSocket拦截器
WebSocket拦截器可以在Websocket连接建立之前的权限校验等功能,Spring提供了HttpSessionHandshakeInterceptor这个接口作为拦截器。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

import java.util.Map;

public class MyWebSocketInterceptor extends HttpSessionHandshakeInterceptor {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        logger.info("[MyWebSocketInterceptor#BeforeHandshake] Request from " + request.getRemoteAddress().getHostString());
        if (request instanceof ServletServerHttpRequest){
            ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
            String token = serverHttpRequest.getServletRequest().getHeader("token");
            //这里做一个简单的鉴权,只有符合条件的鉴权才能握手成功
            if ("token-123456".equals(token)){
                return super.beforeHandshake(request, response, wsHandler, attributes);
            }else {
                return false;
            }
        }
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
        logger.info("[MyWebSocketInterceptor#afterHandshake] Request from " + request.getRemoteAddress().getHostString());
    }
}

从代码可以看出,笔者在握手之前对请求进行了一个简单的校验,符合条件的请求才会进行下一步的握手。

2.4、对WebSocket进行配置
该步骤实现Springboot对WebSocket的支持,包括了配置接口地址、配置拦截器等。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
@EnableWebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {

    @Bean
    public MyWebSocketInterceptor webSocketInterceptor(){return new MyWebSocketInterceptor();}

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){return new ServerEndpointExporter();}

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
        webSocketHandlerRegistry.addHandler(new MyWebsocketHandler(), "/websocket").addInterceptors(webSocketInterceptor());
    }
}

从上面的代码看出,笔者将websocket的接口地址放在了“/websocket”上,当客户端访问“/websocket”时,就会尝试与服务端进行握手连接。

2.5、配置Application主类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;

@SpringBootApplication
@ServletComponentScan
public class MainApplication {

    public static void main(String[] args) {
        SpringApplication.run(MainApplication.class);
    }
}

客户端实现

本小节介绍WebSocket的客户端实现,这里使用Java所提供的WebSocket库来实现。在IDEA中新建一个工程:WebsocketDemoClient,用来保存我们的客户端代码。在pol.xml文件中引入:

<dependencies>
	<dependency>
		<groupId>org.java-websocket</groupId>
		<artifactId>Java-WebSocket</artifactId>
		<version>1.5.1</version>
	</dependency>
	<dependency>
    	<groupId>org.slf4j</groupId>
    	<artifactId>slf4j-simple</artifactId>
    	<version>1.7.25</version>
    	<scope>compile</scope>
    </dependency>
</dependencies>

1、创建Websocket client
WebsocketClient提供了连接到服务端的WebSocket的一切必要准备,我们只需要继承该类,重写所需要的方法来实现我们的业务逻辑,就能方便地使用WebSocket了。

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
public class MyWebSocketClient extends WebSocketClient {
    
    private Logger logger = LoggerFactory.getLogger(getClass());
    
    public MyWebSocketClient(URI serverUri) {
        super(serverUri);
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        logger.info("[MyWebSocketClient#onOpen]The WebSocket connection is open.");
    }

    @Override
    public void onMessage(String s) {
        logger.info("[MyWebSocketClient#onMessage]The client has received the message from server." +
                "The Content is [" + s + "]");
    }

    @Override
    public void onClose(int i, String s, boolean b) {
        logger.info("[MyWebSocketClient#onClose]The WebSocket connection is close.");
    }

    @Override
    public void onError(Exception e) {
        logger.info("[MyWebSocketClient#onError]The WebSocket connection is error.");
    }
}

从上述代码可以看出,实现逻辑很简单,这里主要是打印出各个方法执行时的情况。

2、使用WebSocketClient
下面创建Main函数来使用我们的WebSocketClient客户端,同时创建一个定时任务,使得客户端可以连续不间断地给服务端发送消息,代码如下所示:

import java.net.URI;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {

    private static final AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) {
        URI uri = URI.create("ws://127.0.0.1:8080/websocket");  //注意协议号为ws
        MyWebSocketClient client = new MyWebSocketClient(uri);
        client.addHeader("token", "token-123456");              //这里为header添加了token,实现简单的校验
        try {
            client.connectBlocking();   //在连接成功之前会一直阻塞

            Timer timer = new Timer();
            MyTimerTask timerTask = new MyTimerTask(client);
            timer.schedule(timerTask, 1000, 2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class MyTimerTask extends TimerTask{

        private final MyWebSocketClient client;

        public MyTimerTask(MyWebSocketClient client){
            this.client = client;
        }

        @Override
        public void run() {
            client.send("Test message from client, the number is " + count.getAndIncrement());
        }
    }
}

运行结果

服务端和客户端的代码编写完毕后,我们来看一下结果。先运行服务端代码,将服务端应用程序部署于8080端口中,然后运行客户端代码,结果分别如下图所示:
服务端运行结果
客户端运行结果
OK,运行结果正确,客户端与服务端建立了WebSocket链路,并实时发送消息到服务端中。本文所介绍的简单的例子到此为止,感谢你的阅读~

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐