遇到”后台推送“之类的需求,自然是躲不开websocket了。这一次遇到的需求有点特殊,客户端的ip是固定,需要根据客户端的ip来分辨具体是哪个客户端。

不过,为了方便以后使用,我也列一下另一种获取连接用户身份的方式——url参数。

有一点要格外注意,如果你的项目加了拦截器,或者用了security之类的安全框架,需要对websocket的路径放行,否则会一直报连接失败。

目录

依赖

配置文件

主类

方案一:URL鉴权

方案二:ip鉴权

1、定义一个拦截器

2、定义 WebSocketConfigurator

3、定义主体类

4、拦截器扫描


依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>2.4.5</version>
        </dependency>

配置文件

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

主类

主类管理websocket的连接,下面给出两种方案:

方案一:URL鉴权

下面的userNo在实际项目中可以换成token,在onOpen中做鉴权操作。

@Slf4j
@Component
@ServerEndpoint(value = "/websocket/{userNo}")
public class WebSocketServer {

    private static int onlineCount = 0;

    private static ConcurrentHashMap<String, WebSocketServer> serverMap = new ConcurrentHashMap<>();

    private Session session;

    private String userNo;

    @OnOpen
    public void onOpen(Session session, @PathParam("userNo") String userNo) {
        this.session = session;
        this.userNo = userNo;
        if(serverMap.containsKey(userNo)) {
            serverMap.remove(userNo);
            serverMap.put(userNo, this);
        }else {
            serverMap.put(userNo, this);
            addOnlineCount();
            log.info(userNo + ",已上线!");
        }
    }

    /**
     * 服务器接收客户端发来的消息
     * @param message 消息
     * @param session 会话session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("服务器收到了用户" + userNo + "发来的消息:" + message);
    }

    /**
     * 服务器主动发送消息
     * @param message 消息
     */
    public void sendMessage(String message){
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.error(e.getMessage());
        }
    }

    /**
     * 获取在线人数
     * @return 在线人数
     */
    public static int getOnlineCount() {
        return onlineCount;
    }

    @OnClose
    public void onClose() {
        if(serverMap.containsKey(userNo)) {
            serverMap.remove(userNo);
            subOnlineCount();
            log.info(userNo + ",已下线!");
        }
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        log.error("用户" + userNo + "发生了错误,具体如下:" + throwable.getMessage());
    }

    private static synchronized void subOnlineCount() {
        onlineCount--;
    }

    public static synchronized void addOnlineCount() {
        onlineCount++;
    }

    public static WebSocketServer get(String userNo) {
        return serverMap.get(userNo);
    }

    public static ConcurrentHashMap<String, WebSocketServer> getMap() {
        return serverMap;
    }

    public static boolean isOnline(String userNo) {
        return serverMap.containsKey(userNo);
    }
}

方案二:ip鉴权

适用于内网环境,且主机数固定、ip固定的场合。

WebSocket只是一种协议,实现这种协议的方式有很多种。

我们用的这个starter是无法直接获取客户端ip的,网上也有人很多人用的是netty-websocket-xx 包,这包提供了api用于获取客户端的ip。

换包太麻烦了,即使是在不换包的前提下,我们也可以解决这个问题。

1、定义一个拦截器

此拦截器用于获取ip,并放入session中

@javax.servlet.annotation.WebFilter(filterName = "sessionFilter",urlPatterns = "/*")
@Order(1)
public class WebFilter implements Filter {
    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest req= (HttpServletRequest) servletRequest;
        req.getSession().setAttribute("ip",req.getRemoteHost());
        filterChain.doFilter(servletRequest,servletResponse);
    }
}

2、定义 WebSocketConfigurator

用于将客户端的ip传递给websocket中的session,相当于是一个中介

public class WebSocketConfigurator extends ServerEndpointConfig.Configurator {

    public static final String IP_ADDR = "IP.ADDR";

    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {

        Map<String, Object> attributes = sec.getUserProperties();
        HttpSession session = (HttpSession) request.getHttpSession();
        if (session != null) {
            attributes.put(IP_ADDR, session.getAttribute("ip"));
            Enumeration<String> names = session.getAttributeNames();
            while (names.hasMoreElements()) {
                String name = names.nextElement();
                attributes.put(name, session.getAttribute(name));
            }
        }
    }
}

3、定义主体类

主体类用于管理websocket连接,并配置configurator 

@Slf4j
@Component
@ServerEndpoint(value = "/websocket",configurator = WebSocketConfigurator.class)
public class WebSocketServer {

    private static int onlineCount = 0;

    private static ConcurrentHashMap<String, WebSocketServer> serverMap = new ConcurrentHashMap<>();

    private Session session;
    
    private String ipAddr;

    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        Map<String, Object> userProperties = session.getUserProperties();
        this.ipAddr = (String) userProperties.get(WebSocketConfigurator.IP_ADDR);
        if(serverMap.containsKey(this.ipAddr)) {
            serverMap.remove(this.ipAddr);
            serverMap.put(this.ipAddr, this);
        }else {
            serverMap.put(this.ipAddr, this);
            addOnlineCount();
            log.info(this.ipAddr + ",已上线!");
        }
    }

    /**
     * 服务器接收客户端发来的消息
     * @param message 消息
     * @param session 会话session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("服务器收到了用户" + ipAddr + "发来的消息:" + message);
        
        //方便前端测试
        sendMessage("服务器收到了用户" + ipAddr + "发来的消息:" + message);
    }

    /**
     * 给ip地址为ipAddr的客户端发送消息
     * @param ipAddr ip地址
     * @param message 消息
     */
    public static void sendMessage(String ipAddr, String message) {
        if(serverMap.containsKey(ipAddr)) {
            WebSocketServer webSocketServer = serverMap.get(ipAddr);
            webSocketServer.sendMessage(message);
        }else {
            log.error("发送失败,客户端未连接: " + ipAddr);
        }
    }
    
    /**
     * 服务器主动发送消息
     * @param message 消息
     */
    public void sendMessage(String message){
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.error(e.getMessage());
        }
    }

    /**
     * 获取在线人数
     * @return 在线人数
     */
    public static int getOnlineCount() {
        return onlineCount;
    }

    @OnClose
    public void onClose() {
        if(serverMap.containsKey(ipAddr)) {
            serverMap.remove(ipAddr);
            subOnlineCount();
            log.info(ipAddr + ",已下线!");
        }
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        log.error("用户" + ipAddr + "发生了错误,具体如下:" + throwable.getMessage());
    }

    private static synchronized void subOnlineCount() {
        onlineCount--;
    }

    public static synchronized void addOnlineCount() {
        onlineCount++;
    }

    public static WebSocketServer get(String ipAddr) {
        return serverMap.get(ipAddr);
    }

    public static ConcurrentHashMap<String, WebSocketServer> getMap() {
        return serverMap;
    }

    public static boolean isOnline(String ipAddr) {
        return serverMap.containsKey(ipAddr);
    }

}

4、拦截器扫描

如果只有上面的代码,会出现tomcat session获取不到的情况,因为拦截器没有生效。

在SpringBoot启动类上加一个注解@ServletComponentScan:

@EnableScheduling
@SpringBootApplication
@ServletComponentScan("拦截器所在的包名")
public class WmsServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(WmsServerApplication.class, args);
    }

}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐