WebSocket和kafka实现数据实时推送到前端

一. 需求背景

需要将kafka中的数据实时推送到前端展示。最开始想到的是前端轮询接口数据,但是无法保证轮询的频率和消费的频率完全一致,或造成数据缺失等问题。最终确定用利用WebSocket实现数据的实时推送。

二. websocket简介

WebSocket用于在Web浏览器和服务器之间进行任意的双向数据传输的一种技术。WebSocket协议基于TCP协议实现,包含初始的握手过程,以及后续的多次数据帧双向传输过程。其目的是在WebSocket应用和WebSocket服务器进行频繁双向通信时,可以使服务器避免打开多个HTTP连接进行工作来节约资源,提高了工作效率和资源利用率。

1.引言

互联网发展的早期,网站上只是一些静态展示页面。用户请求(Request)网站页面,网站回复(Response)页面内容给用户浏览器。因为需求简单,所以也没有很复杂的协议过程。这种形式的Request/Response交互流程如下图所示:

img

​ 随着互联网技术的发展,带宽逐步提高,用户数也越来越庞大。对互联网的呈现内容提出了要求,随之出现了动态页面技术,对同一个页面,页面的某些部分对不同的访问用户,呈现的内容不同。相关的实现技术有CGI、ASP、PHP、JSP等。由于访问量的增加,WEB服务器同时处理的用户数也达到了万(10K)以上级别,这就是C10K问题:“The C10K problem”[1]。为了缓解服务器压力,每次Request/Response后连接(TCP连接)继续保持,以及对同一个TCP连接,多次复用Request/Response的方法(也称为Pipeline)也提了出来。这就是HTTP/1.1协议中长连接的主要内容。

伴随移动互联网的发展,大量移动终端和其上的APP应用接入网络,HTML5技术也提了出来,以便支持WEB上的音视频播放、实时游戏、实时聊天等。催生了这样一个**需求**,当服务器有更新时,需要立即将数据发送给客户端,这就是基于服务器端的**推送**技术。

WEBSOCKET之前的解决方法大概这么几种:

  • 1.轮询:客户端设置一个时间间隔,时间到以后,向服务器发送request询问有无新数据,服务器立即返回response,如果有更新则携带更新的数据。
  • 2.长连接(long poll): 和轮询相似,但是为阻塞模式的轮询,客户端请求新的数据request, 服务器会阻塞请求,直到有新数据后才返回response给客户端;然后客户端再重复此过程。这两种方式的特点,不断的建立HTTP连接,然后发送请求request,之后服务器等待处理。服务端体现的是一种被动性,同时这种处理方式,非常耗费网络带宽和服务器资源。

​ 服务器向客户端推送更新时,因为被动性,对低延迟的应用体验不好;因为request/response的交互方式,对网络带宽和服务器带来了额外的负担(例如多次请求的HTTP头部, TCP连接复用会导致的Head-of-Line Blocking线头阻塞[2]等)。如果在单一的TCP连接中,使用双向通信(全双工通信)就能很好的解决此问题。这就是WebSocket技术的缘由。

2. WebSocket技术及协议

WebSocket技术的优点有:

  • 1.通过第一次HTTP Request建立了连接之后,后续的数据交换都不用再重新发送HTTP Request,节省了带宽资源;
  • 2.WebSocket的连接是双向通信的连接,在同一个TCP连接上,既可以发送,也可以接收;
  • 3.具有多路复用的功能(multiplexing),也即几个不同的URI可以复用同一个WebSocket连接。这些特点非常类似TCP连接,但是因为它借用了HTTP协议的一些概念,所以被称为了WebSocket。
2.1 WebSocket API

WebSocket API[3], 也称为WebSocket接口(Interface),定义了Web应用和服务器进行双向通信的公共接口。 如下图所示:

img

接口的内容可以分为三类:状态变量、网络功能和消息处理等。

  • 1.构造函数WebSocket(url, protocols):构造WebSocket对象,以及建立和服务器连接; protocols可选字段,代表选择的子协议
  • 2.状态变量readyState: 代表当前连接的状态,短整型数据,取值为CONNECTING(值为0), OPEN(值为1), CLOSING(值为2), CLOSED(值为3)
  • 3.方法变量close(code, reason): 关闭此WebSocket连接。
  • 4.状态变量bufferedAmount: send函数调用后,被缓存并且未发送到网络上的数据长度
  • 5.方法变量send(data): 将数据data通过此WebSocket发送到对端
  • 6.回调函数onopen/onmessage/onerror/onclose: 当相应的事件发生时会触发此回调函数
2.1.1 示例

客户端使用例子(JavaScript):

var websocket = new WebSocket("ws://www.host.com/path"); 
websocket.onopen = function(evt) { onOpen(evt) }; 
websocket.onclose = function(evt) { onClose(evt) }; 
websocket.onmessage = function(evt) { onMessage(evt) }; 
websocket.onerror = function(evt) { onError(evt) }; }  
function onMessage(evt) { alert( evt.data); }
function onError(evt) { alert( evt.data); }  
websocket.send("client to server");

2.2 WebSocket协议
WebSocket看成是一种类似TCP/IP的socket技术;此socket在Web应用中实现,并获得了和TCP/IP通信一样灵活方便的全双向通信功能。

WebSocket协议由RFC 6455定义。协议分为两个部分: 握手阶段和数据通信阶段。

WebSocket为应用层协议,其定义在TCP/IP协议栈之上。WebSocket连接服务器的URI以"ws"或者"wss"开头。ws开头的默认TCP端口为80,wss开头的默认端口为443。

2.2.1 握手阶段

客户端和服务器建立TCP连接之后,客户端发送握手请求,随后服务器发送握手响应即完成握手阶段。如下图所示:

img

  • 客户端握手请求类似如下:

img

  • 服务器的握手响应类似如下:

img

需要关闭连接时,任意一方直接发送类型为关闭帧(Close frame)的控制帧数据给对方即可。

2.2.2 数据通信

WebSocket的数据在发送时,被组织为依次序的一串数据帧(data frame),然后进行传送。

传送的帧类型分为两类:数据帧(data frame)和控制帧(Control frame)。数据帧可以携带文本数据或者二进制数据;控制帧包含关闭帧(Close frame)和Ping/Pong帧。

帧的格式如下所示:
img

其中最重要的字段为opcode(4bit)和MASK(1bit):

  • MASK值,从客户端进行发送的帧必须置此位为1,从服务器发送的帧必须置为0。如果任何一方收到的帧不符合此要求,则发送关闭帧(Close frame)关闭连接。
  • opcode的值: 0x1代表此帧为文本数据帧, 0x2代表此帧为二进制数据帧, 0x8为控制帧中的连接关闭帧(close frame), 0x9为控制帧中的Ping帧, 0xA(十进制的10)为控制帧中的Pong帧。
  • Ping/Pong帧: Ping帧和Pong帧用于连接的保活(keepalive)或者诊断对端是否在线。这两种帧的发送和接收不对WEB应用公开接口,由实现WebSocket协议的底层应用(例如浏览器)来实现它。
2.2.3 连接关闭

任何一端发送关闭帧给对方,即可关闭连接。关闭连接时通常都带有关闭连接的状态码(status code)。常见状态码的含义如下:

  • 1000 连接正常关闭
  • 1001 端点离线,例如服务器down,或者浏览器已经离开此页面
  • 1002 端点因为协议错误而中断连接
  • 1003 端点因为受到不能接受的数据类型而中断连接
  • 1004 保留
  • 1005 保留, 用于提示应用未收到连接关闭的状态码
  • 1006 端点异常关闭
    1007 端点收到的数据帧类型不一致而导致连接关闭
  • 1008 数据违例而关闭连接
  • 1009 收到的消息数据太大而关闭连接
  • 1010 客户端因为服务器未协商扩展而关闭
  • 1011 服务器因为遭遇异常而关闭连接
  • 1015 TLS握手失败关闭连接

三. 服务端实现

1. pom文件

这里需要引用三个依赖。第一个为WebSocket需要的依赖,另外两个为kafka的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>person</groupId>
    <artifactId>wbSocketkafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- webSocket所需依赖 -->
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>7.0</version>
        </dependency>
        <!-- kafka 所需依赖 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.9.2</artifactId>
            <version>0.8.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>
</project>

2. webSocket服务端实现

//此处定义接口的uri
@ServerEndpoint("/wbSocket")
public class WebSocket {
    private Session session;
    public static CopyOnWriteArraySet<WebSocket> wbSockets = new CopyOnWriteArraySet<WebSocket>(); //此处定义静态变量,以在其他方法中获取到所有连接

    /**
     * 建立连接。
     * 建立连接时入参为session
     */
    @OnOpen
    public void onOpen(Session session){
        this.session = session;
        wbSockets.add(this); //将此对象存入集合中以在之后广播用,如果要实现一对一订阅,则类型对应为Map。由于这里广播就可以了随意用Set
        System.out.println("New session insert,sessionId is "+ session.getId());
    }
    /**
     * 关闭连接
     */
    @OnClose
    public void onClose(){
        wbSockets.remove(this);//将socket对象从集合中移除,以便广播时不发送次连接。如果不移除会报错(需要测试)
        System.out.println("A session insert,sessionId is "+ session.getId());
    }
    /**
     * 接收前端传过来的数据。
     * 虽然在实现推送逻辑中并不需要接收前端数据,但是作为一个webSocket的教程或叫备忘,还是将接收数据的逻辑加上了。
     */
    @OnMessage
    public void onMessage(String message ,Session session){
        System.out.println(message + "from " + session.getId());
    }

    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }
}

3. kafka消费者实现

public class ConsumerKafka extends Thread {

    private KafkaConsumer<String,String> consumer;
    private String topic = "kafkaTopic";

    public ConsumerKafka(){

    }

    @Override
    public void run(){
        //加载kafka消费者参数
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "ytna");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "15000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //创建消费者对象
        consumer = new KafkaConsumer<String,String>(props);
        consumer.subscribe(Arrays.asList(this.topic));
        //死循环,持续消费kafka
        while (true){
            try {
               //消费数据,并设置超时时间
                ConsumerRecords<String, String> records = consumer.poll(100);
                //Consumer message
                for (ConsumerRecord<String, String> record : records) {
                    //Send message to every client
                    for (WebSocket webSocket :wbSockets){
                        webSocket.sendMessage(record.value());
                    }
                }
            }catch (IOException e){
                System.out.println(e.getMessage());
                continue;
            }
        }
    }

    public void close() {
        try {
            consumer.close();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    //供测试用,若通过tomcat启动需通过其他方法启动线程
    public static void main(String[] args){
        ConsumerKafka consumerKafka = new ConsumerKafka();
        consumerKafka.start();
    }
}

P.S. 需要注意的是WebSocket对tomcat版本是有要求的,笔者使用的是7.0.7.8。

四. 前端简单实现

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket client</title>
    <script type="text/javascript">
        var socket;
        if (typeof (WebSocket) == "undefined"){
            alert("This explorer don't support WebSocket")
        }

        function connect() {
            //Connect WebSocket server
            socket =new WebSocket("ws://127.0.0.1:8080/wbSocket");
            //open
            socket.onopen = function () {
                alert("WebSocket is open");
            }
            //Get message
            socket.onmessage = function (msg) {
                alert("Message is " + msg);
            }
            //close
            socket.onclose = function () {
                alert("WebSocket is closed");
            }
            //error
            socket.onerror = function (e) {
                alert("Error is " + e);
            }
        }

        function close() {
            socket.close();
        }

        function sendMsg() {
            socket.send("This is a client message ");
        }
    </script>
</head>
<body>
    <button onclick="connect()">connect</button>
    <button onclick="close()">close</button>
    <button onclick="sendMsg()">sendMsg</button>
</body>
</html>
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐