前言

        首先说一下业务,我们的webapi需要从redis订阅消息,并把订阅到的消息推送给web前端。要求不同的用户建立不同的websocket连接,并把websocket要把消息分发给不同的用户。

Redis的消息订阅与发布并不复杂,这里不再赘述。主要讲解如何通过webSocke将消息推送给前端。

我们使用的是 .Net Core 3.1 + 原生 webSocket ,前端使用的是VueJS。

在websocket中订阅redis消息,当收到订阅消息时,处理并发送给前端。

实践

1、配置websocket中间键

        首先,安装 System.Net.WebSockets包,可在Nuget中安装,也可以在程序包管理控制台安装。

然后,在 Startup 类的 Configure 方法中添加 WebSocket 中间件: app.UseWebSockets();

可以配置以下属性:

KeepAliveInterval - 向客户端发送“ping”帧的频率,以确保代理保持连接处于打开状态。 ReceiveBufferSize - 用于接收数据的缓冲区的大小。 高级用户可能需要对其进行更改,以便根据数据大小调整性能。

app.UseWebSockets(new WebSocketOptions()
            {
                KeepAliveInterval = TimeSpan.FromSeconds(120000),
                ReceiveBufferSize = 4 * 1024
                
            });

2、新建一个WebSocketsHelper类,代码如下: 

public class WebSocketsHelper
    {
        /// <summary>
        /// 日志接口
        /// </summary>
        private ILogger<WebSocketsHelper> _logger;

        /// <summary>
        /// 下一级管道
        /// </summary>
        private RequestDelegate _next;

        /// <summary>
        /// 缓冲区大小
        /// </summary>
        private const int bufferSize = 1024 * 4;

        /// <summary>
        /// URL地址后缀
        /// </summary>
        private const string routePostfix = "/ws";

        /// <summary>
        /// Socket列表
        /// </summary>
        /// <typeparam name="string">typeof(string),用户名</typeparam>
        /// <typeparam name="WebSocket">typeof(WebSocket),WebSocket</typeparam>
        /// <returns></returns>
        private static ConcurrentDictionary<string, WebSocket> _socketsList
            = new ConcurrentDictionary<string, WebSocket>();

        //private static ConcurrentDictionary<string, IList<MReadViewModel>> _meterList = new ConcurrentDictionary<string, IList<MReadViewModel>>();

        private const string Channel_1 = "TestChannel1";
        private const string Channel_2 = "TestChannel2";
        private readonly RedisHelper _helper;
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="next">下一级管道</param>
        /// <param name="logger">日志接口</param>
        public WebSocketsHelper(RequestDelegate next, ILogger<WebSocketsHelper> logger, RedisHelper helper)
        {
            _next = next;
            _logger = logger;
            _helper = helper;
        }

        public async Task Invoke(HttpContext context)
        {
            //判断当前请求是否为WebSocket
            if (!IsWebSocket(context))
            {
                await _next.Invoke(context);
                return;
            }

            //将请求转换为 WebSocket 连接
            var webSocket = await context.WebSockets.AcceptWebSocketAsync();

            while (webSocket.State == WebSocketState.Open)
            {
                //接收消息
                var entity = await Receiveentity<SocketMessageEntity>(webSocket);
                HandRecMsg(entity, webSocket);
                //订阅redis消息
                await _helper.UnsubscribeAsync(Channel_1 );
                await _helper.SubscribeAsync(Channel_1 , async (channel, msg) => await HandleChannelMsg(webSocket, channel, msg));
                //await _helper.UnsubscribeAsync(Channel_2);
                //await _helper.SubscribeAsync(Channel_2, async (channel, msg) => await HandleChannelMsg(webSocket, channel, msg));
            }
            await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Close", default(CancellationToken));


        }

        /// <summary>
        ///  处理低订阅到的信息,并发送给用户
        /// </summary>
        /// <param name="webSocket"></param>
        /// <param name="channelName"></param>
        /// <param name="msg"></param>
        /// <returns></returns>
        private async Task HandleChannelMsg(WebSocket webSocket, string channelName, string msg)
        {
            _logger.LogError("收到订阅结果:" + msg);

            MeterReadResult res = JsonConvert.DeserializeObject<MeterReadResult>(msg);
            //消息类型
            MReadViewModel mod = new MReadViewModel()
            {
                MeterAddr = res.Meter,
                readTime = DateTime.Now
            };
            mod.Flag = res.Status;
            SocketMessageEntity entity = new SocketMessageEntity()
            {
                Receiver = res.UserId,
                Sender = "Amr-Api",
                Message = mod

            };
            await HandleSend(webSocket, entity);
        }

        /// <summary>
        /// 处理Socket收到的信息
        /// </summary>
        /// <param name="msg"></param>
        /// <param name="webSocket"></param>
        private void HandRecMsg(SocketMessageEntity msg, WebSocket webSocket)
        {
            _logger.LogError("收到socket消息:" + JsonConvert.SerializeObject(msg));
            if (msg != null && msg.Receiver != "" && msg.Receiver != null)
            {
                AddUser(msg.Receiver, webSocket);
            }
            else
            {
                //_helper.Unsubscribe(Channel_2);
                _helper.Unsubscribe(Channel_1);
                //webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Close", default(CancellationToken));
            }

        }

        private bool RemoveUser(string UserId, WebSocket webSocket)
        {
            WebSocket socket1 = null;
            _socketsList.TryRemove(UserId, out webSocket);
            webSocket = null;
            return true;
        }

        /// <summary>
        /// 添加用户
        /// </summary>
        /// <param name="UserId"></param>
        /// <param name="webSocket"></param>
        /// <returns></returns>
        private bool AddUser(string UserId, WebSocket webSocket)
        {
            var res = false;
            //不存在
            if (!_socketsList.TryGetValue(UserId, out WebSocket socket))
            {
                res = _socketsList.TryAdd(UserId, webSocket);
            }
            else
            {
                //webSocket = null;
                WebSocket socket1 = null;
                _socketsList.TryRemove(UserId, out socket1);
                res = _socketsList.TryAdd(UserId, webSocket);
            }
            return res;
        }


        /// <summary>
        /// 处理WebSocket聊天
        /// </summary>
        /// <param name="webSocket">WebSocket</param>
        /// <param name="entity">Entity of Message</param>
        /// <returns></returns>
        private async Task HandleSend(WebSocket webSocket, SocketMessageEntity entity)
        {
            _logger.LogError(entity.Sender + ";" + entity.Receiver + ";" + entity.Message);
            await SendOne(entity.Sender, entity.Receiver, entity.Message);
        }

        /// <summary>
        /// 给指定用户发送消息
        /// </summary>
        /// <param name="sender">发送者</param>
        /// <param name="receiver">接收者</param>
        /// <param name="message">消息内容</param>
        /// <returns></returns>
        private async Task SendOne(string sender, string receiver, MReadViewModel message, MessageType type = MessageType.Chat)
        {
            if (sender == receiver) return;
            if (message == null) return;
            if (!ValidateUser(receiver)) return;
            var socket = _socketsList[receiver];
            var chatEntity = new SocketMessageEntity() { Receiver = receiver, Sender = sender, Message = message };

            await SendMessage(socket, chatEntity);
        }

        /// <summary>
        /// 当前请求是否为WebSocket
        /// </summary>
        /// <param name="context">Http上下文</param>
        /// <returns></returns>
        private bool IsWebSocket(HttpContext context)
        {
            return context.WebSockets.IsWebSocketRequest &&
                context.Request.Path == routePostfix;
        }

        /// <summary>
        /// 验证用户是否存在
        /// </summary>
        /// <param name="userName">用户名</param>
        /// <returns></returns>
        private bool ValidateUser(string userName)
        {
            return _socketsList.ContainsKey(userName);
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <typeparam name="TEntity"></typeparam>
        /// <param name="webSocket"></param>
        /// <param name="entity"></param>
        /// <returns></returns>
        private async Task SendMessage<TEntity>(WebSocket webSocket, TEntity entity)
        {
            try
            {
                if (webSocket.State == WebSocketState.Open)
                {
                    var Json = JsonConvert.SerializeObject(entity);
                    _logger.LogError("Json数据:" + Json);

                    var bytes = Encoding.UTF8.GetBytes(Json);

                    await webSocket.SendAsync(
                        new ArraySegment<byte>(bytes),
                        WebSocketMessageType.Text,
                        true,
                        CancellationToken.None
                    );
                }

            }
            catch
            {
                await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Close", default(CancellationToken));
            }

        }


        /// <summary>
        /// 接收消息
        /// </summary>
        /// <param name="webSocket">WebSocket</param>
        /// <typeparam name="TEntity">typeof(TEntity)</typeparam>
        /// <returns></returns>
        private async Task<TEntity> Receiveentity<TEntity>(WebSocket webSocket)
        {
            try
            {
                if (webSocket.State == WebSocketState.Open)
                {
                    var buffer = new ArraySegment<byte>(new byte[bufferSize]);
                    var result = await webSocket.ReceiveAsync(buffer, CancellationToken.None);
                    while (!result.EndOfMessage)
                    {
                        result = await webSocket.ReceiveAsync(buffer, default(CancellationToken));
                    }
                    var json = Encoding.UTF8.GetString(buffer.Array);
                    json = json.Replace("\0", "").Trim();
                    return JsonConvert.DeserializeObject<TEntity>(json, new JsonSerializerSettings()
                    {
                        DateTimeZoneHandling = DateTimeZoneHandling.Local
                    });
                }
                else
                {
                    return default;
                }

            }
            catch
            {
                await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Close", default(CancellationToken));
                return default;
            }
        }


    }

注意:

1、在添加user时,一定要判断当前用户的socket链接是否存在,如果已存在,需要移除老的连接,并重新添加新连接,否则会导致用户收不到消息

2、用户建立多个socket连接时,在Invoke方法中会执行多次,所以防止订阅多次,要先取消订阅,然后再进行订阅。其次,在收到断开连接的消息时,最好也进行取消订阅。

3、新建MiddlewareExtenstion 类,注册中间件,代码如下:

public static class MiddlewareExtenstion
    {
        /// <summary>
        /// 添加应用 中间件注册
        /// </summary>
        /// <param name="app"></param>
        public static void UseWebSocketHelper(this IApplicationBuilder app)
        {
            app.UseMiddleware<WebSocketsHelper>();
        }
    }

4、最后,在Startup 中的 Configure 方法中,添加自定义的 socket中间件:

             //添加socket 
            app.UseWebSocketHelper();

使用

前端使用  ws://ip:端口/ws,来建立WebSocket连接,需要注意的是当后台服务使用域名时,要换成wss://ip:端口/ws 来访问。

例如:ws://127.0.0.1:80/ws  改为  wss://csdn.test.com:80/ws

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐