分享一个使用 redisson 实现 redis 消费队列只消费一次的实现方法
因为 redis 的消费队列并没有提供多消费只消费一次的功能,如果想要实现多消费(实例)保证只消费一次,则需要自己在消费端去实现。简单封装一下 Redisson 中的 RTopic 的监听方法:public class RedisMessageUnrepeatable {private static final String INDEX_SUFFIX = "_unrepeatable";priva
·
因为 redis 的消费队列并没有提供多消费只消费一次的功能,如果想要实现多消费(实例)保证只消费一次,则需要自己在消费端去实现。
简单封装一下 Redisson 中的 RTopic 的监听方法:
redis 5.0 之后添加了 Redis Stream 数据结构, 支持消费者分组和消息窗口持久化等功能, 若是满足版本可直接使用官方支持。
https://redis.io/docs/data-types/streams/
public class RedisMessageUnrepeatable {
private static final String INDEX_SUFFIX = "_unrepeatable";
private static final String RW_SUFFIX = "_rwLock";
private final RedissonClient client;
// 主题名
private final String topic;
// 序列化方式
private final Codec codec;
private final RTopic rTopic;
private final RAtomicLong topicIndex;
private final RReadWriteLock rwLock;
public RedisMessageUnrepeatable(@NonNull RedissonClient client, @NonNull String topic, Codec codec) {
this.client = client;
this.topic = topic;
this.codec = codec;
this.rTopic = client.getTopic(topic, codec);
this.topicIndex = client.getAtomicLong(topic + INDEX_SUFFIX);
rwLock = client.getReadWriteLock(topic + RW_SUFFIX);
}
public <M> void addListener(Class<M> type, MessageListener<M> listener) {
// 获取写锁, 保证在注册新的消费者的时候 topicIndex 不会被修改.
// 若在注册的时候 topicIndex 在改变, 可能导致与 currentIndex 不一致,
// 从而导致 currentIndex 永远比 topicIndex 小, 造成该消费者永远消费不到消息.
rwLock.writeLock().lock();
try {
final AtomicLong currentIndex = new AtomicLong(topicIndex.get());
rTopic.addListener(type, (charSequence, s) -> {
// 获取读锁, 读锁是共享锁.
rwLock.readLock().lock();
try {
if (!topicIndex.compareAndSet(currentIndex.get(),
currentIndex.incrementAndGet())) {
return;
}
listener.onMessage(charSequence, s);
}finally {
rwLock.readLock().unlock();
}
});
}finally {
rwLock.writeLock().unlock();
}
}
public RedissonClient getClient() {
return client;
}
public String getTopic() {
return topic;
}
public Codec getCodec() {
return codec;
}
}
简单使用:
@Component
public class Test{
@Autowired
private RedissonClient client;
@PostConstruct
public void listener(){
RedisMessageUnrepeatable redisMq = new RedisMessageUnrepeatable(client, "test", StringCodec.INSTANCE);
redisMq.addListener(String.class, (t, m) -> System.out.printf("topic: %s, message: %s \n", t, m));
}
}
基本思想:
- 在 redis 中为每一个消息队列维护一个 topicIndex, 表示当前消费的消息序列。
- 在每一个消费者中维护一个 currentIndex, 表示当前消费者已消费到的消息序列, 创建时与 topicIndex 同步。
- redis 会为每一个消费者都推送每一条消息, 当消费者拿到当前消息时, 会将本地的 currentIndex 和 currentIndex 加一后的结果与 redis 中的 topicIndex 进行 CAS。
- 如果成功的将替换 topicIndex 的值则表示能够消费当前数据,随后执行具体消费逻辑。
- 如果未能替换 topicIndex , 则表示消息序列为 currentIndex 的消息已被其它的消费者消费, 则跳过当前消息处理下一条消息。
- 无论是否消费到消息, currentIndex 都将加一。
更多推荐
已为社区贡献4条内容
所有评论(0)