socket.io

官网地址

服务端

  • Java: https://github.com/mrniko/netty-socketio
  • Java: https://github.com/trinopoty/socket.io-server-java

netty-socketio

demo

Class - Web client page

com.corundumstudio.socketio.demo.ChatLauncher - /client/index.html

com.corundumstudio.socketio.demo.EventChatLauncher - /client/event-index.html

com.corundumstudio.socketio.demo.SslChatLauncher - /client/ssl-event-index.html

com.corundumstudio.socketio.demo.NamespaceChatLauncher - /client/namespace-index.html

com.corundumstudio.socketio.demo.AckChatLauncher - /client/ack-index.html

com.corundumstudio.socketio.demo.BinaryEventLauncher - /client/binary-event-index.html

实现目标

目标:
1、通信时权限控制
2、springboot集成
3、以starter形式提供
4、demo(广播、群发、指定用户、全双工通信、自动重连)

借鉴博客:
https://my.oschina.net/u/4115730/blog/4274331

https://juejin.cn/post/6844903946184556557

https://github.com/hiwepy/socketio-spring-boot-starter

集群部署socket

官网介绍
支持socketio 2x
https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.3.0/socket.io.js

创建多个服务器

创建多个 Socket.IO 服务器时,有两件事要做:

  • 您需要启用粘性会话(请参阅此处以获取完整说明)
  • 您需要用 Redis 适配器(或其他兼容的适配器)替换默认的内存适配器
    -

注意:我们也可以创建多个侦听不同端口(或使用多个主机)的进程,并在它们前面添加一个反向代理。文档中介绍了为 NginX 或 HAProxy 等常见反向代理解决方案启用粘性会话。

服务端转发消息

还有最后一个需要修改:我们需要确保消息确实到达收件人,即使此收件人未连接在同一 Socket.IO 服务器上:

在这里插入图片描述

这是 Redis 适配器的职责,它依赖 Redis 发布/订阅机制在 Socket.IO 服务器之间广播消息并最终到达所有客户端。

代码实现

scs-core-socketio-starter

引入pom

        <dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>${netty-socketio.version}</version>
        </dependency>
      
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>${redisson.version}</version>
        </dependency>
目录结构

在这里插入图片描述

socketio配置类
@ConfigurationProperties(SocketioServerProperties.PREFIX)
public class SocketioServerProperties extends Configuration {

	public static final String PREFIX = "spring.socketio.server";

	/**
	 * If set to true, then useLinuxNativeEpoll property is passed to SocketIO server as is.
	 * If set to false and useLinuxNativeEpoll set to true,
	 * then additional check is performed if epoll library is available on classpath.
	 */
	private boolean failIfNativeEpollLibNotPresent = false;

	/**
	 * Enable Socketio Server.
	 */
	private boolean enabled = false;

	public boolean isEnabled() {
		return enabled;
	}

	public void setEnabled(boolean enabled) {
		this.enabled = enabled;
	}

	public boolean isFailIfNativeEpollLibNotPresent() {
		return failIfNativeEpollLibNotPresent;
	}

	public void setFailIfNativeEpollLibNotPresent(boolean failIfNativeEpollLibNotPresent) {
		this.failIfNativeEpollLibNotPresent = failIfNativeEpollLibNotPresent;
	}
}

socketioServer自动注入
@Configuration
@ConditionalOnProperty(prefix = SocketioServerProperties.PREFIX, value = "enabled", havingValue = "true")
@EnableConfigurationProperties({ SocketioServerProperties.class })
public class SocketioServerAutoConfiguration implements DisposableBean {

	protected static Logger LOG = LoggerFactory.getLogger(SocketioServerAutoConfiguration.class);

	@Autowired
	private SocketioServerProperties config;
	
	@Bean
	@ConditionalOnMissingBean
	public AuthorizationListener socketAuthzListener() {
		return new SuccessAuthorizationListener();
	}
	
	@Bean
	@ConditionalOnMissingBean
	public ExceptionListener exceptionListener() {
		return  new DefaultExceptionListener();
	}
	@Bean
	@ConditionalOnMissingBean
	public StoreFactory clientStoreFactory() {
		return new MemoryStoreFactory();
	}

	@Bean(destroyMethod = "stop")
	public SocketIOServer socketIOServer(AuthorizationListener socketAuthzListener,
										 ExceptionListener exceptionListener, StoreFactory clientStoreFactory) {

		// 身份验证
		config.setAuthorizationListener(socketAuthzListener);
		config.setExceptionListener(exceptionListener);
		config.setStoreFactory(clientStoreFactory);

		if (config.isUseLinuxNativeEpoll()
				&& !config.isFailIfNativeEpollLibNotPresent()
				&& !Epoll.isAvailable()) {
			LOG.warn("Epoll library not available, disabling native epoll");
			config.setUseLinuxNativeEpoll(false);
		}

		final SocketIOServer server = new SocketIOServer(config);
		
		/**
		 * 应用退出时,要调用shutdown来清理资源,关闭网络连接,注销自己
		 * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
		 */
		Runtime.getRuntime().addShutdownHook(new SocketioServerShutdownHook(server));
		
		server.start();
		
		return server;
	}

	@Bean
	public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
		return new SpringAnnotationScanner(socketServer);
	}
	
	@Autowired
	protected SocketIOServer socketIOServer;
	
	@Override
	public void destroy() throws Exception {
		if (socketIOServer != null) {
			socketIOServer.stop();
		}
	}

}

AbstractSocketEventHandler实现类
public abstract class AbstractSocketEventHandler {

	private static Logger LOG = LoggerFactory.getLogger(AbstractSocketEventHandler.class);
	private SocketIOServer socketIOServer;
	
	public AbstractSocketEventHandler() {
	}
	
	public AbstractSocketEventHandler(SocketIOServer socketIOServer) {
		this.socketIOServer = socketIOServer;
	}

	/**
	 * 添加connect事件,当客户端发起连接时调用,本文中将clientid与sessionid存入数据库
	 * 方便后面发送消息时查找到对应的目标client,
	 * @param client
	 */
	@OnConnect
	public void onConnect(SocketIOClient client) {
		LOG.debug("Connect OK.");
		LOG.debug("Session ID  : %s", client.getSessionId());
		LOG.debug("HttpHeaders : %s", client.getHandshakeData().getHttpHeaders());
		LOG.debug("UrlParams   : %s", client.getHandshakeData().getUrlParams());
		
		client.sendEvent("welcome", "ok");
	}
	
	/**
	 * 添加@OnDisconnect事件,客户端断开连接时调用,刷新客户端信息
	 * @param client
	 */
	@OnDisconnect
	public void onDisconnect(SocketIOClient client) {
		LOG.debug("Disconnect OK.");
		LOG.debug("Session ID  : %s", client.getSessionId());
	}

	public Collection<SocketIOClient> getClients(String group) {
		return getSocketIOServer().getNamespace(group).getAllClients();
	}

	public SocketIOServer getSocketIOServer() {
		return socketIOServer;
	}

	public void setSocketIOServer(SocketIOServer socketIOServer) {
		this.socketIOServer = socketIOServer;
	}
	
}
SocketioServerShutdownHook关闭socketio的钩子
public class SocketioServerShutdownHook extends Thread {
	
	private SocketIOServer server;
	
	public SocketioServerShutdownHook(SocketIOServer server) {
		this.server = server;
	}
	
	@Override
	public void run() {
		try {
			server.stop();
		} catch (Exception e) {
		}
	}
}
scs-core-socketio-starter-demo

引入pom

  <dependency>
            <groupId>com.liuhm</groupId>
            <artifactId>scs-core-socketio-starter</artifactId>
            <version>1.0</version>
        </dependency>
目录结构

在这里插入图片描述

yml配置
################################################################################################################
### SocketIO 配置:
################################################################################################################
spring:
  socketio:
    redis:
      redisson:
        enabled: true
        server: single
        single:
          address: redis://127.0.0.1:6379
          client-name: redis
          connection-minimum-idle-size: 5
          connection-pool-size: 50
    # 服务端配置
    server:
      enabled: true
      ## 服务上下文地址,该地址与Nginx负载地址适配 /socket.io
      context: /socket.io
      ## host在本地测试可以设置为localhost或者本机IP,在Linux服务器跑可换成服务器IP gateway负载去掉hostname
      hostname: 127.0.0.1
      ## netty启动端口
      port: 10065
      ## Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
      ping-interval: 25000
      ## Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
      ping-timeout: 60000
      ## 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
      max-frame-payload-length: 1048576
      ## 设置http交互最大内容长度
      max-http-content-length: 1048576
      ## socket连接数大小(如只监听一个端口boss线程组为1即可)
      boss-threads: 1
      worker-threads: 100
      origin: ":*:"
ClientUserCache 缓存用户和客户端的信息
@Component
public class ClientUserCache {

    //本地缓存
    private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>();


    /**
     * 存入本地缓存
     * @param userId 用户ID
     * @param sessionId 页面sessionID
     * @param socketIOClient 页面对应的通道连接信息
     */
    public void saveClient(String userId, UUID sessionId, SocketIOClient socketIOClient){
        HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId);
        if(sessionIdClientCache==null){
            sessionIdClientCache = new HashMap<>();
        }
        sessionIdClientCache.put(sessionId,socketIOClient);
        concurrentHashMap.put(userId,sessionIdClientCache);
    }

    /**
     * 根据用户ID获取所有通道信息
     * @param userId
     * @return
     */
    public HashMap<UUID, SocketIOClient> getUserClient(String userId){
        return concurrentHashMap.get(userId);
    }

    /**
     * 根据用户ID及页面sessionID删除页面链接信息
     * @param userId
     * @param sessionId
     */
    public void deleteSessionClient(String userId,UUID sessionId){
        concurrentHashMap.get(userId).remove(sessionId);
    }
}
SocketioConfig
@Configuration
public class SocketioConfig {

	@Bean
	public ExceptionListener exceptionListener() {
		return new SocketExceptionListener();
	}

	@Bean
	public AuthorizationListener socketAuthzListener() {
		return new SocketAuthorizationListener();
	}

}

SocketIOController
@RestController
public class SocketIOController {
    @Autowired
    SocketIOService socketIOService;

    @GetMapping("/sendMessage")
    public String sendMessage(@RequestParam String topic, @RequestParam(required = false) String userId,@RequestParam String msgContent) {
        socketIOService.sendMessage(topic, userId, msgContent);
        return "消息发送成功";
    }
}

SocketEventHandler

@OnEvent 可以添加多个,接收客户端的消息

@Component
@Slf4j
public class SocketEventHandler extends AbstractSocketEventHandler {

   @Autowired
   private SocketIOServer socketIOServer;

   @Autowired
   private ClientUserCache clientUserCache;
   @Override
   public SocketIOServer getSocketIOServer() {
      return socketIOServer;
   }

   /**
    * 添加connect事件,当客户端发起连接时调用,本文中将clientid与sessionid存入数据库
    * 方便后面发送消息时查找到对应的目标client,
    * @param client
    */
   @OnConnect
   @Override
   public void onConnect(SocketIOClient client) {
      String userId = client.getHandshakeData().getSingleUrlParam("userId");
      log.info("**********客户端:" + userId + "你成功的连接上了服务器哦**********");
      UUID sessionId = client.getSessionId();
      if (userId != null) {
         clientUserCache.saveClient(userId,sessionId,client);
      }

   }

   /**
    * 消息接收入口,当接收到消息后,查找发送目标客户端,并且向该客户端发送消息,且给自己发送消息
    * @param client
    * @param request
    * @param data
    */
   @OnEvent(value = "push_data_event")
   public void onEvent(SocketIOClient client, AckRequest request, Object data) {
      log.info("收到消息{}", data);
   }

   /**
    * 添加@OnDisconnect事件,客户端断开连接时调用,刷新客户端信息
    * @param client
    */
   @OnDisconnect
   @Override
   public void onDisconnect(SocketIOClient client) {
      String userId = client.getHandshakeData().getSingleUrlParam("userId");
      log.info("**********客户端:" + userId + "已断开连接**********");
      if (userId != null) {
         clientUserCache.deleteSessionClient(userId,client.getSessionId());
         client.disconnect();
      }
   }
}
SocketAuthorizationListener 权限控制
public class SocketAuthorizationListener implements AuthorizationListener {
    @Override
    public boolean isAuthorized(HandshakeData handshakeData) {
        List<String> list = handshakeData.getUrlParams().get("token");
        if(list != null&&list.size() > 0 && list.get(0).equals("123456")){
            return true;
        }
        return false;
    }
}
SocketIOServiceImpl 实现
@Service
@Slf4j
public class SocketIOServiceImpl implements SocketIOService {

    @Autowired
    private SocketIOServer socketIOServer;
    @Autowired
    private ClientUserCache clientUserCache;
    @Autowired(required = false)
    RedissonClient redissonClient;
    @Autowired(required = false)
    SocketioRedissonProperties socketioRedissonProperties;
    @Autowired
    SocketioServerProperties socketioServerProperties;

    /**
     * 广播(群发)前缀
     */
    private static final String MASS_PREFIX = "/mass";
    /**
     * socketio
     */
    private static final String TOPIC_SOCKETIO_SINGLE = "socketio:single";
    private static final String TOPIC_SOCKETIO_TOALL = "socketio:toAll";

    @Override
    public void pushMessageToUser(String topic,String userId, String msgContent) {
        HashMap<UUID, SocketIOClient> userClient = clientUserCache.getUserClient(userId);
        if(userClient == null){
            log.debug("没有在线的用户");
            return;
        }
        userClient.forEach((uuid, socketIOClient) -> {
            //向客户端推送消息
            socketIOClient.sendEvent(topic,msgContent);
        });
    }

    @Override
    public void sendToAll(String topic,String msgContent) {
        if(StringUtils.isBlank(topic)){
            topic = MASS_PREFIX+"/toAll";
        }
        socketIOServer.getBroadcastOperations().sendEvent(topic, msgContent);
    }
    @Override
    public void sendMessage(String topic, String userId, String msgContent) {

        SocketIOMessageDTO socketIOMessageDTO = new SocketIOMessageDTO(topic, userId, msgContent);

        if(StringUtils.isNotBlank(socketIOMessageDTO.getUserId())){
            if(!Objects.isNull(socketioRedissonProperties) && socketioRedissonProperties.isEnabled()){
                RTopic rTopic = redissonClient.getTopic(TOPIC_SOCKETIO_SINGLE);
                rTopic.publish(socketIOMessageDTO);
            }else {
                pushMessageToUser(socketIOMessageDTO.getTopic(),socketIOMessageDTO.getUserId(),socketIOMessageDTO.getMsgContent());
            }
        }else{
            sendToAll(socketIOMessageDTO.getTopic(),socketIOMessageDTO.getMsgContent());
        }
    }

    @PostConstruct
    public void init() {
        if(redissonClient == null){
            return;
        }
        RTopic topic = redissonClient.getTopic(TOPIC_SOCKETIO_SINGLE);
        topic.addListener(SocketIOMessageDTO.class, new MessageListener<SocketIOMessageDTO>() {
            @Override
            public void onMessage(CharSequence channel, SocketIOMessageDTO socketIOMessageDTO) {
                socketIOMessageDTO.setMsgContent(socketioServerProperties.getPort()+" : "+socketIOMessageDTO.getMsgContent());
                if(StringUtils.isNotBlank(socketIOMessageDTO.getUserId())){
                    pushMessageToUser(socketIOMessageDTO.getTopic(),socketIOMessageDTO.getUserId(),socketIOMessageDTO.getMsgContent());
                    log.info("{} {} {}",socketIOMessageDTO.getTopic(),socketIOMessageDTO.getUserId(),socketIOMessageDTO.getMsgContent());
                }
            }
        });
    }
}    
nginx 实现负载
http {
    upstream nodes {
            # enable sticky session with either "hash" (uses the complete IP address)
            #hash $remote_addr consistent;
            # or "ip_hash" (uses the first three octets of the client IPv4 address, or the entire IPv6 address)
            # ip_hash;
            # or "sticky" (needs commercial subscription)
            # sticky cookie srv_id expires=1h domain=.example.com path=/;

            server 127.0.0.1:10066;
            server 127.0.0.1:10065;
        }
	
    server {
        listen       3000;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;

       
		location / {
			proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
			proxy_set_header Host $host;

			proxy_pass http://nodes;

			# enable WebSockets
			proxy_http_version 1.1;
			proxy_set_header Upgrade $http_upgrade;
			proxy_set_header Connection "upgrade";
		}
}
spring-gateway网关实现负载

借鉴地址

去掉socketio 配置的hostname

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QIBlzvnv-1623978061230)(imgs/41d06d5af09779cd0e69be543b7e7f7f.png)]

gateway 2.0.4

SocketioLoadBalancerClientFilter
public class SocketioLoadBalancerClientFilter extends LoadBalancerClientFilter {

    public SocketioLoadBalancerClientFilter(LoadBalancerClient loadBalancer) {
        super(loadBalancer);
    }

    /**
     * 实现转发到socketio集群,但是集群内socketio服务端口必须固定
     */
    protected ServiceInstance choose(ServerWebExchange exchange) {
        ServiceInstance serviceInstance = super.choose(exchange);
        if(null != serviceInstance && "socketio".equals(serviceInstance.getServiceId())
                && exchange.getRequest().getPath().toString().contains("/socket.io/")) {
           Map<String, String> metadata = serviceInstance.getMetadata();
           String socketioPort = metadata.get("socketio-port");
           int sport;
           if (socketioPort != null && !"".equals(socketioPort)) {
              try {
               sport = Integer.parseInt(socketioPort);
            } catch (NumberFormatException e) {
               sport = 10003;
            }
           } else {
              sport = 10003;
           }
            // Specify port 23456
            return new DefaultServiceInstance(serviceInstance.getServiceId(),
                    serviceInstance.getHost(), sport, serviceInstance.isSecure());
        }
        return serviceInstance;
    }
}
SocketioLoadBalancerClientConfig
@Configuration
public class SocketioLoadBalancerClientConfig {

   @Bean
    @ConditionalOnMissingBean({SocketioLoadBalancerClientFilter.class})
    public SocketioLoadBalancerClientFilter socketioLoadBalancerClientFilter(LoadBalancerClient loadBalancer) {
        return new SocketioLoadBalancerClientFilter(loadBalancer);
    }
}

yml

  routes:
        # 服务唯一标志 如下访问:http://localhost:9888/sample/mysql > http://localhost:9000/mysql
        - id: socketio
          # 目标服务地址
          uri: lb://socketio
          # 路由规则
          predicates:
            - Path=/api/ms/**
            - RequestBody=true
          # 过滤规则
          filters:
            # 访问目标服务时,去掉多少个前缀 http://localhost:9888/sample/mysql > http://localhost:9000/mysql
            - StripPrefix=2
            - PreserveHostHeader
            - name: Hystrix
              args:
                name: time-consuming
                fallbackUri: forward:/fallback
前端实现
<!DOCTYPE html>
<html>
<head>
        <meta charset="utf-8" />
        <title>Demo Chat</title>
        <link href="bootstrap.css" rel="stylesheet">
   <style>
      body {
         padding:20px;
      }
      #console {
         height: 400px;
         overflow: auto;
      }
      .username-msg {color:orange;}
      .connect-msg {color:green;}
      .disconnect-msg {color:red;}
      .send-msg {color:#888}
   </style>
   <script src="js/socket.io/socket.io.js"></script>
        <script src="js/moment.min.js"></script>
        <script src="js/jquery-1.7.2.min.js"></script>
   <script>
                var userName = 'user' + Math.floor((Math.random()*1000)+1);
       var socket =  io.connect('http://127.0.0.1:3000?userId=2&token=123456',{
                    transports:['websocket','xhr-polling','jsonp-polling']
                });
      socket.on('connect', function() {
         output('<span class="connect-msg">Client has connected to the server!</span>');
      });

                socket.on('push_data_event', function(data) {
                    output(data);
                });

                socket.on('/mass/toAll', function(data) {
                    output(data);
                });

      socket.on('disconnect', function() {
         output('<span class="disconnect-msg">The client has disconnected!</span>');
      });

                function sendDisconnect() {
                        socket.disconnect();
                }

      function sendMessage() {
                        var message = $('#msg').val();
                        $('#msg').val('');

                        var jsonObject = {userName: userName,
                                          message: message};
                        socket.emit('push_data_event', jsonObject);
      }

      function output(message) {
                        var currentTime = "<span class='time'>" +  moment().format('HH:mm:ss.SSS') + "</span>";
                        var element = $("<div>" + currentTime + " " + message + "</div>");
         $('#console').prepend(element);
      }

        $(document).keydown(function(e){
            if(e.keyCode == 13) {
                $('#send').click();
            }
        });
   </script>
</head>
<body>
   <h1>Netty-socketio Demo Chat</h1>
   <br/>
   <div id="console" class="well">
   </div>
        <form class="well form-inline" onsubmit="return false;">
           <input id="msg" class="input-xlarge" type="text" placeholder="Type something..."/>
           <button type="button" onClick="sendMessage()" class="btn" id="send">Send</button>
           <button type="button" onClick="sendDisconnect()" class="btn">Disconnect</button>
        </form>
</body>
</html>
操作

开启nginx

后端启动两个服务

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RAmxCm1r-1623912028990)(imgs/image-20210617143923371.png)]
在这里插入图片描述
在这里插入图片描述

客户端访问 3000端口
在这里插入图片描述

客户端接收单个消息

打开页面

然后请求

http://127.0.0.1:8080/sendMessage?topic=push_data_event&msgContent=你好&userId=2

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DYMdeOqp-1623902589366)(imgs/image-20210617120030730.png)]

广播
http://127.0.0.1:8999/sendMessage?topic=/mass/toAll&msgContent=你好,群发

在这里插入图片描述

发送消息

在这里插入图片描述
在这里插入图片描述

代码下载

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐