一、BLPOP和LPUSH做简单的消息队列
原理(BLPOP会产生阻塞)

127.0.0.1:6379> BLPOP k1 100
1) "k1"
2) "55"
(11.32s)

127.0.0.1:6379> lpush k1 55
(integer) 1

二、Redis做延迟消息队列
用ZADD,ZADD中有score是一个数字,可设置消息的延迟时间为score

ZADD key [NX|XX] [GT|LT] [CH] [INCR] score member [score member ...]

  1. 定义消息类
/**
 * 延迟消息
 */
public class DelayMsg {

    private String id;
    private Object data;

    public DelayMsg() {
    }

    public DelayMsg(String id, Object data) {
        this.id = id;
        this.data = data;
    }

    @Override
    public String toString() {
        return "DelayMsg{" +
                "id='" + id + '\'' +
                ", data=" + data +
                '}';
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }
}

  1. 消息传递需要序列化,添加JSON依赖
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.0</version>
        </dependency>
  1. 延迟消息队列代码
/**
 * 延迟消息队列
 */
public class DelayMsgQueue {
    //Redis客户端工具jedis
    private Jedis jedis;
    //队列的名字
    private String queue;

    public DelayMsgQueue(Jedis jedis, String queue) {
        this.jedis=jedis;
        this.queue=queue;
    }

    /**
     * 发消息
     * @param data
     */
    public void push(Object data){
        //构造消息对象
        DelayMsg delayMsg = new DelayMsg(UUID.randomUUID().toString(), data);
        try {
            String member = new ObjectMapper().writeValueAsString(delayMsg);
            jedis.zadd(queue, ((double) (System.currentTimeMillis() + 5000)), member);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 消费消息
     */
    public void pop() {
        while (!Thread.interrupted()) {
            //按照分数去查询
            Set<String> zrange = jedis.zrangeByScore(queue, 0, ((double) System.currentTimeMillis()), 0, 1);

            if (zrange.isEmpty()) {
                //集合是空的
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
                //5秒之后开启下一轮循环
                continue;
            }
            //查询获取到的消息
            String next = zrange.iterator().next();
            if (jedis.zrem(queue, next) > 0) {
                //从消息队列中移除该消息
                //在这里做业务逻辑的处理
                //...
                System.out.println("receive:" + next + ">>>" + new Date());
            }
        }
    }
}

  1. 测试代码
/**
 * 延迟消息队列测试
 */
public class DelayMsgDemo {

    public static void main(String[] args) {
        new Redis().excute(jedis -> {
            DelayMsgQueue my_delay_queue = new DelayMsgQueue(jedis, "my_delay_queue");
            //生产消息
            Thread producer=new Thread(){
                @Override
                public void run() {
                    my_delay_queue.push("hello"+new Date());
                }
            };
            //消费消息
            Thread consumer=new Thread(){
                @Override
                public void run() {
                    my_delay_queue.pop();
                }
            };
            producer.start();
            consumer.start();
        });
        new Scanner(System.in).next();
    }
}

  1. 运行结果
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐