Redis做消息队列(延迟消息队列ZADD)
一、BLPOP和LPUSH做简单的消息队列原理(BLPOP会产生阻塞)127.0.0.1:6379> BLPOP k1 1001) "k1"2) "55"(11.32s)127.0.0.1:6379> lpush k1 55(integer) 1二、Redis做延迟消息队列用ZADD,ZADD中有score是一个数字,可设置消息的延迟时间为scoreZADD key [NX|XX] [
·
一、BLPOP和LPUSH做简单的消息队列
原理(BLPOP会产生阻塞)
127.0.0.1:6379> BLPOP k1 100
1) "k1"
2) "55"
(11.32s)
127.0.0.1:6379> lpush k1 55
(integer) 1
二、Redis做延迟消息队列
用ZADD,ZADD中有score是一个数字,可设置消息的延迟时间为score
ZADD key [NX|XX] [GT|LT] [CH] [INCR] score member [score member ...]
- 定义消息类
/**
* 延迟消息
*/
public class DelayMsg {
private String id;
private Object data;
public DelayMsg() {
}
public DelayMsg(String id, Object data) {
this.id = id;
this.data = data;
}
@Override
public String toString() {
return "DelayMsg{" +
"id='" + id + '\'' +
", data=" + data +
'}';
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
}
- 消息传递需要序列化,添加JSON依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.0</version>
</dependency>
- 延迟消息队列代码
/**
* 延迟消息队列
*/
public class DelayMsgQueue {
//Redis客户端工具jedis
private Jedis jedis;
//队列的名字
private String queue;
public DelayMsgQueue(Jedis jedis, String queue) {
this.jedis=jedis;
this.queue=queue;
}
/**
* 发消息
* @param data
*/
public void push(Object data){
//构造消息对象
DelayMsg delayMsg = new DelayMsg(UUID.randomUUID().toString(), data);
try {
String member = new ObjectMapper().writeValueAsString(delayMsg);
jedis.zadd(queue, ((double) (System.currentTimeMillis() + 5000)), member);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
/**
* 消费消息
*/
public void pop() {
while (!Thread.interrupted()) {
//按照分数去查询
Set<String> zrange = jedis.zrangeByScore(queue, 0, ((double) System.currentTimeMillis()), 0, 1);
if (zrange.isEmpty()) {
//集合是空的
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
//5秒之后开启下一轮循环
continue;
}
//查询获取到的消息
String next = zrange.iterator().next();
if (jedis.zrem(queue, next) > 0) {
//从消息队列中移除该消息
//在这里做业务逻辑的处理
//...
System.out.println("receive:" + next + ">>>" + new Date());
}
}
}
}
- 测试代码
/**
* 延迟消息队列测试
*/
public class DelayMsgDemo {
public static void main(String[] args) {
new Redis().excute(jedis -> {
DelayMsgQueue my_delay_queue = new DelayMsgQueue(jedis, "my_delay_queue");
//生产消息
Thread producer=new Thread(){
@Override
public void run() {
my_delay_queue.push("hello"+new Date());
}
};
//消费消息
Thread consumer=new Thread(){
@Override
public void run() {
my_delay_queue.pop();
}
};
producer.start();
consumer.start();
});
new Scanner(System.in).next();
}
}
- 运行结果
更多推荐
已为社区贡献2条内容
所有评论(0)