问题引入

Kafka 顺序消费一直是一个难以解决的问题,Kafka的消费策略是对于同Topic同Partition的消息可保证顺序消费,其余无法保证。如果一个Topic只有一个Partition,那么这个Topic对应consumer的消费必然是有序的。不同的Topic的任何情况下都无法保证consumer的消费顺序和producer的发送顺序一致。
如果不同Topic之间存在数据关联且对消费顺序有要求,该如何处理?

解决思路

现有 Topic-insert 和 Topic-update,数据唯一标识为id,对于id=1的数据而言,要保证Topic-insert消费在前,Topic-update消费在后。
两个Topic的消费为不同线程处理,所以为了保证在同一时间内的同一数据标识的消息仅有一个业务逻辑在处理,需要对业务添加锁操作。使用synchronized进行加锁的话,会影响无关联的 INSERT 和 UPDATE 的数据消费能力,如 id=1 的 INSERT 和 id=2 的 UPDATE,在synchronized的情况下,无法并发处理,这是没有必要的,我们需要的是对于id=1的insert和id=1的update在同一时间只有一个在处理,所以使用细粒度锁来完成加锁的操作。

细粒度锁实现 : 使用弱引用实现细粒度锁 (如果为分布式系统,细粒度锁需要使用分布式锁的对应实现)

在对 INSERT 和 UPDATE 加锁之后,其实还是没有解决消费顺序的问题,只是确保了同一时间只有一个业务在处理。 对于消费顺序异常的问题,也就是先消费 UPDATE 再消费 INSERT 的情况,处理方式 : 消费到 UPDATE 数据,校验库中是否存在当前数据 (也就是是否执行INSERT),如果没有,就将当前UPDATE数据存入缓存,key为数据标识id,在INSERT消费时检查是否存在id对应的 UPDATE 缓存,如果有,就证明当前数据的消费顺序异常,需执行 UPDATE 操作,再将缓存数据移除。

实现方案

消息发送 :

kafkaTemplate.send("TOPIC_INSERT", "1");
kafkaTemplate.send("TOPIC_UPDATE", "1");

监听代码示例 :

import org.springframework.stereotype.Component;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 弱引用锁,给某个kafka的消息(这里用id来标识)上锁协调多个topic消费逻辑
 *
 * 为什么要新写一个绑定kafka消息的弱引用锁呢?因为弱引用可以让jvm在内存不够的时候自动回收这些没有绑定的无用的锁,
 * 不需要关心频繁加锁而可能导致内存溢出的问题
 *
 * 在锁使用完成之后,id=1对应的锁就可以被回收处理,且在锁的使用过程中,其余线程尝试获取id=1对应的锁时,和当前线程所持有的锁对象必须为同一个
 */
@Component
public class WeakRefHashLock {
    /** 存储 key 对应 锁 的弱引用 */
    private final ConcurrentHashMap<Object, LockWeakReference> lockMap = new ConcurrentHashMap<>();
    /** 存储已过期的 ref */
    private final ReferenceQueue<ReentrantLock> queue = new ReferenceQueue<>();

    /**
     * 获取 key 对应的 lock
     *
     * @param key
     * @return
     */
    public Lock lock(Object key) {
        if (lockMap.size() > 1000) {
            clearEmptyRef();
        }
        // 获取 key 对应的 lock 弱引用
        LockWeakReference weakReference = lockMap.get(key);
        // 获取lock
        ReentrantLock lock = (weakReference == null ? null : weakReference.get());
        // 这里使用 while 循环获取,防止在获取过程中lock被gc回收
        while (lock == null) {
            // 使用 putIfAbsent,在多线程环境下,针对同一 key ,weakReference 均指向同一弱引用对象
            // 这里使用 可重入锁
            weakReference = lockMap.putIfAbsent(key, new LockWeakReference(key, new ReentrantLock(), queue));
            // 获取弱引用指向的lock,这里如果获取到 lock 对象值,将会使 lock 对象值的弱引用提升为强引用,不会被gc回收
            lock = (weakReference == null ? null : weakReference.get());
            // 在 putIfAbsent 的执行和 weakReference.get() 执行的间隙,可能存在执行gc的过程,会导致 lock 为null,所以使用while循环获取
            if (lock != null) {
                return lock;
            }
            // 获取不到 lock,移除map中无用的ref
            clearEmptyRef();
        }
        return lock;
    }

    /**
     * 清除 map 中已被回收的 ref
     */
    void clearEmptyRef() {
        Reference<? extends ReentrantLock> ref;
        while ((ref = queue.poll()) != null) {
            LockWeakReference lockWeakReference = (LockWeakReference) ref;
            lockMap.remove(lockWeakReference.key);
        }
    }

    static class LockWeakReference extends WeakReference<ReentrantLock> {
        /** 存储 弱引用 对应的 key 值,方便 之后的 remove 操作 */
        private final Object key;
        public LockWeakReference(Object key, ReentrantLock lock, ReferenceQueue<? super ReentrantLock> q) {
            super(lock, q);
            this.key = key;
        }
    }
}


import com.renren.kafka.util.WeakRefHashLock;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
/**
 * 利用自定义类WeakRefHashLock来协调TOPIC_INSERT和TOPIC_UPDATE两个主题的消息处理顺序,保证先insert再update
 */
@Component
public class KafkaListenerDemo {
    private static final Logger log = LoggerFactory.getLogger(KafkaListenerDemo.class);
    /** 消费到的数据缓存 */
    private final Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>();
    /** 数据存储 */
    private final Map<String, String> DATA_MAP = new ConcurrentHashMap<>();
    private final WeakRefHashLock weakRefHashLock;

    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {
        this.weakRefHashLock = weakRefHashLock;
    }

    @KafkaListener(topics = "TOPIC_INSERT")
    public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException {
        // 模拟顺序异常,也就是insert后消费,这里线程sleep
        Thread.sleep(1000);

        String id = record.value();
        log.info("接收到insert :: {}", id);

        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            log.info("开始处理 {} 的insert", id);
            // 模拟 insert 业务处理
            Thread.sleep(1000);
            // 从缓存中获取 是否存在有update数据
            if (UPDATE_DATA_MAP.containsKey(id)) {
                // 缓存数据存在,执行update
                doUpdate(id);
            }
            log.info("处理 {} 的insert 结束", id);
        } finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }

    @KafkaListener(topics = "TOPIC_UPDATE")
    public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException {
        String id = record.value();
        log.info("接收到update :: {}", id);
        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            // 测试使用,不做数据库的校验
            if (!DATA_MAP.containsKey(id)) {
                // 未找到对应数据,证明消费顺序异常,将当前数据加入缓存
                log.info("消费顺序异常,将update数据 {} 加入缓存", id);
                UPDATE_DATA_MAP.put(id, id);
            } else {
                doUpdate(id);
            }
        } finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }

    void doUpdate(String id) throws InterruptedException {
        // 模拟 update
        log.info("开始处理update::{}", id);
        Thread.sleep(1000);
        log.info("处理update::{} 结束", id);
    }
}

日志 (代码中已模拟必现消费顺序异常的场景) :

接收到update :: 1
消费顺序异常,将update数据 1 加入缓存
接收到insert :: 1
开始处理 1 的insert
开始处理update::1
处理update::1 结束
处理 1 的insert 结束

观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐