解决webSocket在针对指定用户推送消息时,需要保存用户id至redis中,保存redis时会报Null pointer exception

最近在做一个项目遇到,websocket针对指定用户推送消息时,需要保存用户的socketId至redis中,存入redis中的key是一个字符串常量"socketNumber",value值是socketId的json串,每当用户建立连接之后,调用存储socketId到redis这一步就会报空指针异常。
在这里插入图片描述

定义的key是一个静态常量,没理由存不到redis中,百思不得其解。接下来先贴一段报空指针异常的websocket服务类方法,然后分析下报错原因,给出解决方法。

websocket服务类方法

导入redisClient工具类方式如下:
在这里插入图片描述
设置redis的key是静态常量
在这里插入图片描述
redis存储用户userId方法,每次断点过redisClient.set(socketNumber ,JsonUtil.toJson(socketIds));这部分总会报空指针,但是socketNumber定义的是静态常量,没有道理值不存在。
在这里插入图片描述

@ServerEndpoint(value = "/webSocket/{socketId}", encoders = {WebSocketCustomEncoding.class})
@Component
public class WebSocketServer {
    private static final AriesJcLogger logger = AriesJcLoggerFactory.getLogger(WebSocketServer.class);
    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static AtomicInteger online = new AtomicInteger();

    //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
    private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
    private static List<String> socketIds = new ArrayList<>();
    public static  String socketNumber ="socketNumber";
    @Autowired
    private RedisClient redisClient;

    /**
     * 发送消息方法
     *
     * @param session 客户端与socket建立的会话
     * @param message 消息
     * @throws IOException
     */
    public void sendMessage(Session session, BusinessRealTimeData message) throws IOException, EncodeException {
        if (session != null) {
//            session.getBasicRemote().sendText(message.toString());
            session.getBasicRemote().sendObject(message);
        }
    }

    /**
     * 连接建立成功调用
     *
     * @param session  客户端与socket建立的会话
     * @param socketId socketId
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "socketId") String socketId) {
        socketIds.add(socketId);
        redisClient.set(socketNumber ,JsonUtil.toJson(socketIds));
        sessionPools.put(socketId, session);
        addOnlineCount();
        logger.info("====加入新连接session:{}!当前在线人数为:{}", session.getId());
    }

    /**
     * 关闭连接时调用
     *
     * @param socketId socketId
     */
    @OnClose
    public void onClose(@PathParam(value = "socketId") String socketId) {
        redisClient.removeForSet(socketNumber, socketId);
        String plate = redisClient.get("plate_" + socketId);
        List<String> plates = JSONObject.parseArray(plate,String.class);
        redisClient.removeForSet("plate_"+socketId,plates);
        String camera = redisClient.get("camera_"+socketId);
        List<String> cameras = JSONObject.parseArray(camera,String.class);
        redisClient.removeForSet("camera_"+socketId,cameras);
        sessionPools.remove(socketId);
        subOnlineCount();
        logger.info("====关闭连接userid:{}", socketId);
    }

    /**
     * 收到客户端消息时触发(群发)
     *
     * @param message
     * @throws IOException
     */
    @OnMessage
    public void onMessage(String message) throws IOException {
        logger.debug("====来自客户端的消息session:{}---message:{}", message);
        for (Session session : sessionPools.values()) {
            try {
                BusinessRealTimeData businessRealTimeData = JSONObject.parseObject(message, BusinessRealTimeData.class);
                sendMessage(session, businessRealTimeData);
            } catch (Exception e) {
                e.printStackTrace();
                continue;
            }
        }
    }

    /**
     * 发生错误时候
     *
     * @param session
     * @param throwable
     */
    @OnError
    public void onError(Session session, Throwable throwable) {
        logger.error("websocket 发生错误", throwable);
        throwable.printStackTrace();
    }

    /**
     * 给指定用户发送消息
     *
     * @param socketId socketId
     * @param message  消息
     * @throws IOException
     */
    public void sendInfo(String socketId, BusinessRealTimeData message) {
        logger.debug("发送指定客户端的消息:{}", JsonUtil.toJson(message));
        logger.debug("发送指定客户端socketId:{}", JsonUtil.toJson(socketId));
        Session session = sessionPools.get(socketId);
        try {
            sendMessage(session, message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void sendtoAll(BusinessRealTimeData message) throws IOException {
        logger.debug("发送客户端的消息:{}", JsonUtil.toJson(message));
        for (Session session : sessionPools.values()) {
            try {
                sendMessage(session, message);
            } catch (Exception e) {
                e.printStackTrace();
                continue;
            }

问题分析

在@ServerEndpoint注解类中使用注解@Autowired注入RedisClient,注入RedisClient失败,因此报空指针异常。
为什么在websocket中不能通过@Autowire注解注入呢?
由于websocket本身是线程安全的,建立连接时候就会创建一个端点实例,一个websocket可以建立多个连接,因此会创建多个端点实例对象。@Autowired注解在Spring初始化对象实例时,会调用此构造函数,进行对象的实例化。项目初始化时,初始化一个websocket对象,此时的websocket对象没有建立连接,成功注入容器。当用户建立了连接,又会重新创建一个新的websocket,由于此前已经注入了一个没有建立连接的websocket对象,Spring默认支持单例模式,因此后边建立连接的websocket对象就不能被注入了,因此会报空指针异常。

解决方法

如何解决无法注入的问题?可以通过使用 getBean的方法主动获取实例。

  • 创建一个获取SpringUtils工具类
@Component
public final class SpringUtils implements BeanFactoryPostProcessor {

    private static ConfigurableListableBeanFactory beanFactory;

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        SpringUtils.beanFactory = beanFactory;
    }

    public static ConfigurableListableBeanFactory getBeanFactory() {
        return beanFactory;
    }

    /**
     * 获取对象
     *
     * @param name
     * @return Object 一个以所给名字注册的bean的实例
     * @throws org.springframework.beans.BeansException
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException {
        if (getBeanFactory() == null) {
            //zhengkai.blog.csdn.net
            System.out.println("本地调试Main模式,没有BeanFactory,忽略错误");
            return null;
        } else {
            T result = (T) getBeanFactory().getBean(name);
            return result;
        }
    }

    /**
     * 获取类型为requiredType的对象
     *
     * @param name
     * @return
     * @throws org.springframework.beans.BeansException
     */
    public static <T> T getBean(Class<T> name) throws BeansException {
        if (getBeanFactory() == null) {
            //zhengkai.blog.csdn.net
            System.out.println("本地调试Main模式,没有BeanFactory,忽略错误");
            return null;
        } else {
            T result = (T) getBeanFactory().getBean(name);
            return result;
        }
    }

    /**
     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name) {
        return getBeanFactory().containsBean(name);
    }

    /**
     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
        return getBeanFactory().isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注册对象的类型
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
        return getBeanFactory().getType(name);
    }

    /**
     * 如果给定的bean名字在bean定义中有别名,则返回这些别名
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
        return getBeanFactory().getAliases(name);
    }

}

-在websocket服务类中可以通过@Resource注解通过 private RedisClient redisClient = SpringUtils.getBean(RedisClient.class);方式导入redis工具类

@ServerEndpoint(value = "/webSocket/{socketId}", encoders = {WebSocketCustomEncoding.class})
@Component
public class WebSocketServer {
    private static final AriesJcLogger logger = AriesJcLoggerFactory.getLogger(WebSocketServer.class);
    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static AtomicInteger online = new AtomicInteger();

    //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
    private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
    private static List<String> socketIds = new ArrayList<>();
    public static  String socketNumber ="socketNumber";
    @Resource
    private RedisClient redisClient = SpringUtils.getBean(RedisClient.class);

    /**
     * 发送消息方法
     *
     * @param session 客户端与socket建立的会话
     * @param message 消息
     * @throws IOException
     */
    public void sendMessage(Session session, BusinessRealTimeData message) throws IOException, EncodeException {
        if (session != null) {
//            session.getBasicRemote().sendText(message.toString());
            session.getBasicRemote().sendObject(message);
        }
    }

    /**
     * 连接建立成功调用
     *
     * @param session  客户端与socket建立的会话
     * @param socketId socketId
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "socketId") String socketId) {
        socketIds.add(socketId);
        redisClient.set(socketNumber ,JsonUtil.toJson(socketIds));
        sessionPools.put(socketId, session);
        addOnlineCount();
        logger.info("====加入新连接session:{}!当前在线人数为:{}", session.getId());
    }

    /**
     * 关闭连接时调用
     *
     * @param socketId socketId
     */
    @OnClose
    public void onClose(@PathParam(value = "socketId") String socketId) {
        redisClient.removeForSet(socketNumber, socketId);
        String plate = redisClient.get("plate_" + socketId);
        List<String> plates = JSONObject.parseArray(plate,String.class);
        redisClient.removeForSet("plate_"+socketId,plates);
        String camera = redisClient.get("camera_"+socketId);
        List<String> cameras = JSONObject.parseArray(camera,String.class);
        redisClient.removeForSet("camera_"+socketId,cameras);
        sessionPools.remove(socketId);
        subOnlineCount();
        logger.info("====关闭连接userid:{}", socketId);
    }

    /**
     * 收到客户端消息时触发(群发)
     *
     * @param message
     * @throws IOException
     */
    @OnMessage
    public void onMessage(String message) throws IOException {
        logger.debug("====来自客户端的消息session:{}---message:{}", message);
        for (Session session : sessionPools.values()) {
            try {
                BusinessRealTimeData businessRealTimeData = JSONObject.parseObject(message, BusinessRealTimeData.class);
                sendMessage(session, businessRealTimeData);
            } catch (Exception e) {
                e.printStackTrace();
                continue;
            }
        }
    }

    /**
     * 发生错误时候
     *
     * @param session
     * @param throwable
     */
    @OnError
    public void onError(Session session, Throwable throwable) {
        logger.error("websocket 发生错误", throwable);
        throwable.printStackTrace();
    }

    /**
     * 给指定用户发送消息
     *
     * @param socketId socketId
     * @param message  消息
     * @throws IOException
     */
    public void sendInfo(String socketId, BusinessRealTimeData message) {
        logger.debug("发送指定客户端的消息:{}", JsonUtil.toJson(message));
        logger.debug("发送指定客户端socketId:{}", JsonUtil.toJson(socketId));
        Session session = sessionPools.get(socketId);
        try {
            sendMessage(session, message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void sendtoAll(BusinessRealTimeData message) throws IOException {
        logger.debug("发送客户端的消息:{}", JsonUtil.toJson(message));
        for (Session session : sessionPools.values()) {
            try {
                sendMessage(session, message);
            } catch (Exception e) {
                e.printStackTrace();
                continue;
            }
        }
    }

    public static void addOnlineCount() {
        online.incrementAndGet();
    }

    public static void subOnlineCount() {
        online.decrementAndGet();
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐