背景

项目中的流程监控,有几种节点,需要监控每一个节点是否超时。按传统的做法,肯定是通过定时任务,去扫描然后判断,但是定时任务有缺点:1,数据量大会慢;2,时间不好控制,太短,怕一次处理不完,太长状态就会有延迟。所以就想到用延迟队列的方式去实现。

一,redis的过期key监控

1,开启过期key监听

在redis的配置里把这个注释去掉

notify-keyspace-events Ex

然后重启redis

2,使用redis过期监听实现延迟队列

继承KeyExpirationEventMessageListener类,实现父类的方法,就可以监听key过期时间了。当有key过期,就会执行这里。这里就把需要的key过滤出来,然后发送给kafka队列。

@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener  {

    @Autowired
    private KafkaProducerService kafkaProducerService;

    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    /**
     * 针对 redis 数据失效事件,进行数据处理
     * @param message
     * @param pattern
     */
    @Override
    public void onMessage(Message message, byte[] pattern){
        if(message == null || StringUtils.isEmpty(message.toString())){
            return;
        }
        String content = message.toString();
        //key的格式为   flag:时效类型:运单号  示例如下
        try {
            if(content.startsWith(AbnConstant.EMS)){
                kafkaProducerService.sendMessageSync(TopicConstant.EMS_WAYBILL_ABN_QUEUE,content);
            }else if(content.startsWith(AbnConstant.YUNDA)){
                kafkaProducerService.sendMessageSync(TopicConstant.YUNDA_WAYBILL_ABN_QUEUE,content);
            }
        } catch (Exception e) {
            log.error("监控过期key,发送kafka异常,",e);
        }
    }
}

可以看的出来,这种方式其实是很简单的,但是有几个问题需要注意,一是,这个尽量单机运行,因为多台机器都会执行,浪费cpu,增加数据库负担。二是,机器频繁部署的时候,如果有时间间隔,会出现数据的漏处理。三是,能不用就别用,有坑。

二,redis的zset实现延迟队列

1,生产者实现

可以看到生产者很简单,其实就是利用zset的特性,给一个zset添加元素而已,而时间就是它的score。

public void produce(Integer taskId, long exeTime) {
    System.out.println("加入任务, taskId: " + taskId + ", exeTime: " + exeTime + ", 当前时间:" + LocalDateTime.now());
    RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId));
}

2,消费者实现

消费者的代码也不难,就是把已经过期的zset中的元素给删除掉,然后处理数据。

public void consumer() {
    Executors.newSingleThreadExecutor().submit(new Runnable() {
        @Override
        public void run() {
            while (true) {
                Set<String> taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1);
                if (taskIdSet == null || taskIdSet.isEmpty()) {
                    System.out.println("没有任务");
 
                } else {
                    taskIdSet.forEach(id -> {
                        long result = RedisOps.getJedis().zrem(RedisOps.key, id);
                        if (result == 1L) {
                            System.out.println("从延时队列中获取到任务,taskId:" + id + " , 当前时间:" + LocalDateTime.now());
                        }
                    });
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    });
}

可以看到这种方式其实是比上个方式要好的。因为,他的那两个缺点都被克服掉了。多台机器也没事儿,也不用再担心部署时间间隔长的问题。

总结

两个方式都是不错的,都能解决问题。但是呢,两个方式都很坑。要我选,我肯定用rocketmq去做延时队列。哈哈,主要碰到问题,多思考,多总结。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐