因为 redis 的消费队列并没有提供多消费只消费一次的功能,如果想要实现多消费(实例)保证只消费一次,则需要自己在消费端去实现。
简单封装一下 Redisson 中的 RTopic 的监听方法:

redis 5.0 之后添加了 Redis Stream 数据结构, 支持消费者分组和消息窗口持久化等功能, 若是满足版本可直接使用官方支持。
https://redis.io/docs/data-types/streams/

public class RedisMessageUnrepeatable {

    private static final String INDEX_SUFFIX = "_unrepeatable";

    private static final String RW_SUFFIX = "_rwLock";
	
    private final RedissonClient client;

	// 主题名
    private final String topic;
	
	// 序列化方式
    private final Codec codec;

    private final RTopic rTopic;

    private final RAtomicLong topicIndex;

    private final RReadWriteLock rwLock;

    public RedisMessageUnrepeatable(@NonNull RedissonClient client, @NonNull String topic, Codec codec) {
        this.client = client;
        this.topic = topic;
        this.codec = codec;
        this.rTopic = client.getTopic(topic, codec);
        this.topicIndex = client.getAtomicLong(topic + INDEX_SUFFIX);
        rwLock = client.getReadWriteLock(topic + RW_SUFFIX);
    }

    public <M> void addListener(Class<M> type, MessageListener<M> listener) {
        // 获取写锁, 保证在注册新的消费者的时候 topicIndex 不会被修改.
        // 若在注册的时候 topicIndex 在改变, 可能导致与 currentIndex 不一致,
        // 从而导致 currentIndex 永远比 topicIndex 小, 造成该消费者永远消费不到消息.
        rwLock.writeLock().lock();
        try {
            final AtomicLong currentIndex = new AtomicLong(topicIndex.get());
            rTopic.addListener(type, (charSequence, s) -> {
                // 获取读锁, 读锁是共享锁.
                rwLock.readLock().lock();
                try {
                    if (!topicIndex.compareAndSet(currentIndex.get(),
                            currentIndex.incrementAndGet())) {
                        return;
                    }
                    listener.onMessage(charSequence, s);
                }finally {
                    rwLock.readLock().unlock();
                }
            });
        }finally {
            rwLock.writeLock().unlock();
        }
    }

    public RedissonClient getClient() {
        return client;
    }

    public String getTopic() {
        return topic;
    }

    public Codec getCodec() {
        return codec;
    }
}

简单使用:

@Component
public class Test{

	@Autowired
    private RedissonClient client;
	
	@PostConstruct
	public void listener(){
		RedisMessageUnrepeatable redisMq = new RedisMessageUnrepeatable(client, "test", StringCodec.INSTANCE);
		redisMq.addListener(String.class, (t, m) -> System.out.printf("topic: %s, message: %s \n", t, m));
	}
}

基本思想:

  1. 在 redis 中为每一个消息队列维护一个 topicIndex, 表示当前消费的消息序列。
  2. 在每一个消费者中维护一个 currentIndex, 表示当前消费者已消费到的消息序列, 创建时与 topicIndex 同步。
  3. redis 会为每一个消费者都推送每一条消息, 当消费者拿到当前消息时, 会将本地的 currentIndex 和 currentIndex 加一后的结果与 redis 中的 topicIndex 进行 CAS。
    1. 如果成功的将替换 topicIndex 的值则表示能够消费当前数据,随后执行具体消费逻辑。
    2. 如果未能替换 topicIndex , 则表示消息序列为 currentIndex 的消息已被其它的消费者消费, 则跳过当前消息处理下一条消息。
    3. 无论是否消费到消息, currentIndex 都将加一。
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐