private RBlockingQueue<TeacherAuthProgressContext> blockingQueue;
    private RDelayedQueue<TeacherAuthProgressContext> delayedQueue;

初始化:

 blockingQueue = client.getNativeClient().getBlockingQueue(queueName);
        if (blockingQueue == null) {
            throw new RuntimeException(queueName + " initialize failed");
        }
        delayedQueue = client.getNativeClient().getDelayedQueue(blockingQueue);
        if (delayedQueue == null) {
            throw new RuntimeException(queueName + " initialize failed");
        }
        AlpsThreadPool.getInstance().submit(this::takeOutDelayedQueue);

放入取出:

  public void putInDelayedQueue(TeacherAuthProgressContext data, long delay, TimeUnit timeUnit) {
        // 超过14天的任务,直接处理为结束啦,不再往延时队列放了
        if (data.getTotalDay() > 14) {
            TeacherAuthProgress teacherAuthProgress = data.getTeacherAuthProgress();
            teacherAuthProgress.setEndAt(new Date());
            teacherAuthProgressPersistence.upsert(teacherAuthProgress);
        } else {
            delayedQueue.offer(data, delay, timeUnit);
        }
    }

    private void takeOutDelayedQueue() {
        while (true) {
            try {
                TeacherAuthProgressContext data = blockingQueue.take();
                AlpsThreadPool.getInstance().submit(() -> process(data));
            } catch (InterruptedException e) {
                logger.error("teacher_auth_progress_delayed_queue process error:", e);
            }
        }
    }

底层lua脚本采用zrangebyscore实现。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐