redis——基于redission的延时队列
private RBlockingQueue<TeacherAuthProgressContext> blockingQueue;private RDelayedQueue<TeacherAuthProgressContext> delayedQueue;初始化:blockingQueue = client.getNativeClient().getBlockingQueu
·
private RBlockingQueue<TeacherAuthProgressContext> blockingQueue;
private RDelayedQueue<TeacherAuthProgressContext> delayedQueue;
初始化:
blockingQueue = client.getNativeClient().getBlockingQueue(queueName);
if (blockingQueue == null) {
throw new RuntimeException(queueName + " initialize failed");
}
delayedQueue = client.getNativeClient().getDelayedQueue(blockingQueue);
if (delayedQueue == null) {
throw new RuntimeException(queueName + " initialize failed");
}
AlpsThreadPool.getInstance().submit(this::takeOutDelayedQueue);
放入取出:
public void putInDelayedQueue(TeacherAuthProgressContext data, long delay, TimeUnit timeUnit) {
// 超过14天的任务,直接处理为结束啦,不再往延时队列放了
if (data.getTotalDay() > 14) {
TeacherAuthProgress teacherAuthProgress = data.getTeacherAuthProgress();
teacherAuthProgress.setEndAt(new Date());
teacherAuthProgressPersistence.upsert(teacherAuthProgress);
} else {
delayedQueue.offer(data, delay, timeUnit);
}
}
private void takeOutDelayedQueue() {
while (true) {
try {
TeacherAuthProgressContext data = blockingQueue.take();
AlpsThreadPool.getInstance().submit(() -> process(data));
} catch (InterruptedException e) {
logger.error("teacher_auth_progress_delayed_queue process error:", e);
}
}
}
底层lua脚本采用zrangebyscore实现。
更多推荐
已为社区贡献1条内容
所有评论(0)