redis发布订阅广播模式的使用&结合jeecg的Redis网关路由刷新机制


本质和传统的消息发布和订阅机制是差不多的,但是相较于其他几款MQ产品,Redis的使用更加便捷,也更加轻量化,不需要搭建一套繁重的MQ框架。
但是也它致命的缺点,redis的消息不会被持久化,服务器出现问题,消息会丢失,导致数据问题。对于数据一致性要求比较高的场景不适合使用,需要慎重选择。


导致消息丢失的情况:


一般获取消息的客户端(订阅者)会通过while循环不断的向redis服务器请求发布者获取消息,假如发布者在订阅者退出订阅状态时发布了消息,则该消息会丢失。

关于这个订阅者退出状态,值得探讨,这里做一个分析。使用终端模拟时,假如订阅者断开连接后,又重新连接,在这个断开的时间段内,如果存在数据发布,则重新连接的客户端是无法获取到消息的,因为redis服务器认为它是一个纯新的客户端。因此在程序里,必须小心使用redis的订阅连接,当订阅的连接没有主动释放时,也没有执行退出订阅时,数据会源源不断的写入内存,直到所有订阅者取走消息。

优缺点

优点:便捷,轻量化
缺点:

  • 不稳定,消息易丢失,导致数据一致性问题,性能不高

  • redis客户端在订阅消息时,要求订阅在发布之前,否则无法订阅到客户端订阅前,已经发布的消息。

  • redis的消息发布与订阅,无法实现高并发和大数据量。前者受限于redis本身的并发量限制和内存大小;后者是因为redis发布消息时,会先将数据推送到每个客户端的连接缓冲区,如果单个消息过大会撑爆缓冲区,导致redis错误,就算redis没有撑爆缓冲区,如果消费者(订阅方)没有及时取走消息,也会因为数据积累而撑爆内存。

  • 总结,快捷方便,消息不会持久化,网络不稳定,断开重连,无法接受到已发布的消息,链接没有释放,也没有退出订阅,消息会一直在内存中积压,撑爆内存。

简单使用

命令:


psubscribe   订阅一个或者多个channel,使用正则方式匹配多个
publish    发布消息到指定channel
pubsub    查看订阅与发布系统状态
pubsub channels   pattern   列出当前活跃的channel
pubsub  numsub  channel-1  channel-n   获取指定频道的订阅者数量
pubsub  numpat  获取订阅模式的数量
subscribe      订阅一个或者多个channel消息,使用一个或者多个准确的channel名实现订阅
unsubscribe   客户端退订channel

springboot中使用

@Autowired
private RedisTemplate<String, Object> redisTemplate;

BaseMap params = new BaseMap();
params.put("handlerName", "loderRouderHandler");
//刷新网关
redisTemplate.convertAndSend(GlobalConstants.REDIS_TOPIC_NAME, params);

jeecg代码分析:

注册消息监听器适配器


//RedisConfig中配置MessageListenerAdapter适配器,自定义消息处理方法
@Bean
MessageListenerAdapter commonListenerAdapter(RedisReceiver redisReceiver) {
    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(redisReceiver, "onMessage");
    messageListenerAdapter.setSerializer(this.jacksonSerializer());
    return messageListenerAdapter;
}

自定义消息接收器,

###在消息接收器中获取 JeecgRedisListener->onMessage()来获取消息,
###其他模块只需要实现JeecgRedisListener即可
###Object handlerName = params.get(“handlerName”),是消息内容传递过来的,可以动态指定是哪个消息处理类,比如路由处理类就是:GlobalConstants.LODER_ROUDER_HANDLER


@Component
public class RedisReceiver {
    public void onMessage(BaseMap params) {
        //获取redis消息处理类名称
        Object handlerName = params.get("handlerName");
        //通过SpringContextHolder拿到JeecgRedisListener
        JeecgRedisListener messageListener = (JeecgRedisListener)SpringContextHolder.getHandler(handlerName.toString(), JeecgRedisListener.class);
        if (ObjectUtil.isNotEmpty(messageListener)) {
            messageListener.onMessage(params);
        }
    }

    public RedisReceiver() {
    }

    public boolean equals(final Object o) {
        if (o == this) {
            return true;
        } else if (!(o instanceof RedisReceiver)) {
            return false;
        } else {
            RedisReceiver other = (RedisReceiver)o;
            return other.canEqual(this);
        }
    }

    protected boolean canEqual(final Object other) {
        return other instanceof RedisReceiver;
    }

    public int hashCode() {
        int result = true;
        return 1;
    }

    public String toString() {
        return "RedisReceiver()";
    }
}

定义路由监听器,(路由处理器),通过监听redis来实现

@Slf4j
@Component(GlobalConstants.LODER_ROUDER_HANDLER)
public class LoderRouderHandler implements JeecgRedisListener {

    @Resource
    private DynamicRouteLoader dynamicRouteLoader;


    @Override
    public void onMessage(BaseMap message) {
        dynamicRouteLoader.refresh(message);
    }

}

最终通过下面的方法获取路由配置

/**
 * 从redis中读取路由配置
 *
 * @return
 */
private void loadRoutesByRedis(BaseMap baseMap) {
    List<MyRouteDefinition> routes = Lists.newArrayList();
    configService = createConfigService();
    if (configService == null) {
        log.warn("initConfigService fail");
    }
    Object configInfo = redisUtil.get(CacheConstant.GATEWAY_ROUTES);
    if (ObjectUtil.isNotEmpty(configInfo)) {
        log.info("获取网关当前配置:\r\n{}", configInfo);
        JSONArray array = JSON.parseArray(configInfo.toString());
        try {
            routes = getRoutesByJson(array);
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }else{
        log.warn("ERROR: 从Redis获取网关配置为空,请确认system服务是否启动成功!");
    }
    
    for (MyRouteDefinition definition : routes) {
        log.info("update route : {}", definition.toString());
        Integer status=definition.getStatus();
        if(status.equals(0)){
            dynamicRouteService.delete(definition.getId());
        }else{
            dynamicRouteService.add(definition);
        }
    }
    if(ObjectUtils.isNotEmpty(baseMap)){
        String delRouterId = baseMap.get("delRouterId");
        if (ObjectUtils.isNotEmpty(delRouterId)) {
            dynamicRouteService.delete(delRouterId);
        }
    }
    this.publisher.publishEvent(new RefreshRoutesEvent(this));
}

参考链接:
https://www.cnblogs.com/lovelsl/articles/15272190.html

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐