一、Redis消息队列

由上一章内容可知,基于阻塞队列的异步秒杀还存在2个问题:

  • 内存限制问题(如果不对BlockingQueue做大小限制,则会有内存溢出问题)
  • 数据安全问题(如果服务宕机,则内存的数据将会丢失)

这一章通过Redis的消息队列进行优化

1. 消息队列

消息队列Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

Redis提供了三种不同的方式来实现消息队列:

  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型
    在这里插入图片描述

在这里插入图片描述

2. 基于List结构模拟消息队列

  • Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
  • 队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
  • 注意,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

在这里插入图片描述

基于List的消息队列有哪些优缺点?
优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失(取到消息还没处理并且出现异常,则消息会丢失)
  • 只支持单消费者(一个线程取到消息后,list中将移除该消息,其他线程则获取不到)

3. 基于PubSub的消息队列

PubSub(发布订阅) 是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

  • SUBSCRIBE channel [channel] :订阅一个或多个频道
  • PUBLISH channel msg :向一个频道发送消息
  • PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
    在这里插入图片描述
    在这里插入图片描述

基于PubSub的消息队列有哪些优缺点?
优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化(与list不同的时,list本身是数据结构可以存储数据,而PubSub只支持消息发送)
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失(消费者来不及处理的消息,会存到消费者的缓冲区,缓冲区是有大小限制的)

4. 基于Stream的消息队列 - 单消费模式

  • Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令:

在这里插入图片描述
例如:

## 创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用Redis自动生成ID
127.0.0.1:6379> XADD users * name jack age 21
"1644805700523-0"

读取消息的方式之一:XREAD
在这里插入图片描述

例如,使用XREAD读取第一个消息:

在这里插入图片描述
XREAD阻塞方式,读取最新的消息:
在这里插入图片描述

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

在这里插入图片描述
STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯(消息不丢失,读取后消息仍然存在队列中)
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 消息漏读的风险

4. 基于Stream的消息队列 - 消费者组

  • 由于单消费模式会出现消息漏读的情况,所以出现了消费者组
  • 消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
    在这里插入图片描述

创建消费者组:

# key:队列名称
# groupName:消费者组名称
# ID:起始ID标识,$代表队列中最后一个消息,0则代表队列中第一个消息
# MKSTREAM:队列不存在时自动创建队列

XGROUP CREATE  key groupName ID [MKSTREAM]

其它常见命令:

# 删除指定的消费者组
XGROUP DESTORY key groupName

# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername

# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息:

# group:消费组名称
# consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
# count:本次查询的最大数量
# BLOCK milliseconds:当没有消息时最长等待时间
# NOACK:无需手动ACK,获取到消息后自动确认,自动ACK可能会出现消息丢失,所以一般需要手动ACK
# STREAMS key:指定队列名称
# ID:获取消息的起始ID">":从下一个未消费的消息开始
	其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始,消费完并确认后,则会从pending-list中移除
	正常情况通过">"读取消息,出现异常情况后,再从pending-list中读取

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

队列中有4个消息k1、k2、k3、k4

  • 创建队列s1,消费者组g1
  • 消费者组g1中的消费者c1先读2个消息k1、k2
  • 消费者组g1中的消费者c2再读2个消息k3、k4
    在这里插入图片描述

消息确认命令:

127.0.0.1:6379> help XACK

  XACK key group ID [ID ...]
  summary: Marks a pending message as correctly processed, effectively removing it from the pending entries list of the consumer group. Return value of the command is the number of messages successfully acknowledged, that is, the IDs we were actually able to resolve in the PEL.
  since: 5.0.0
  group: stream

一次性确认消费者组g1中的消息:

127.0.0.1:6379> XACK s1 g1 1646339018049-0 1646339342815-0 1646339529899-0 1646339537593-0
(integer)4
127.0.0.1:6379>

从pending-list中读取消息的命令:

127.0.0.1:6379> help XPENDING
# [IDLE min-idle-time]表示 获取消息以后 到 确认消息之前 的这段空闲时间
# start end 表示从start 到 end 之间的id 消息
  XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
  summary: Return information and entries from a stream consumer group pending entries list, that are messages fetched but never acknowledged.
  since: 5.0.0
  group: stream

  • “- +” 表示从最小到最大id之间的消息,10表示消息数量
  • 4)(integer)1 表示pending-list中有1个消息未被确认
    在这里插入图片描述

消费者监听消息的基本思路:

在这里插入图片描述

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

二、基于Redis的Stream结构作为消息队列,实现异步秒杀下单

需求:

  • 创建一个Stream类型的消息队列,名为stream.orders
  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

先提前创建stream.orders队列:

127.0.0.1:6379> XGROUP CREATE stream.orders g1 0 MKSTREAM
OK
127.0.0.1:6379> 

优化后的Lua脚本:

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key,2个。。表示拼接字符串
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
-- tonumber是将字符串转为数字
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足,返回1
    return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3.存在,说明是重复下单,返回2
    return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
-- 'id', orderId,用id对应orderId,因为VoucherOrder实体类中是用id表示的orderId,这里也用id表示的orderId,解析Redis对象后可以少一次转换
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

优化后的代码:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    // 创建阻塞队列
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
    // 用线程池创建独立线程
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    // 将代理对象作为全局变量,供所有线程使用
    private IVoucherOrderService proxy;

    // 项目启动后,就应该开启线程,异步从阻塞队列中获取信息
    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private class VoucherOrderHandler implements Runnable{
        private final String queueName = "stream.orders";
        @Override
        public void run() {
            while (true) {
                try {
                    // 0.初始化stream
                    initStream();
                    // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            // GROUP g1 c1
                            Consumer.from("g1", "c1"),
                            // BLOCK 2000
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            // STREAMS stream.orders >
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    // 2.判断订单信息是否为空
                    if (list == null || list.isEmpty()) {
                        // 如果为null,说明没有消息,继续下一次循环
                        continue;
                    }
                    // 3.解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    // 将Redis对象转为VoucherOrder对象
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 4.读取消息成功后,创建订单
                    handleVoucherOrder(voucherOrder);
                    // 5.确认消息 XACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常", e);
                    // 出现异常后,从PendingList中读取消息
                    handlePendingList();
                }
            }
        }

        public void initStream(){
            Boolean exists = stringRedisTemplate.hasKey(queueName);
            if (BooleanUtil.isFalse(exists)) {
                log.info("stream不存在,开始创建stream");
                // 不存在,需要创建
                stringRedisTemplate.opsForStream().createGroup(queueName, ReadOffset.latest(), "g1");
                log.info("stream和group创建完毕");
                return;
            }
            // stream存在,判断group是否存在
            StreamInfo.XInfoGroups groups = stringRedisTemplate.opsForStream().groups(queueName);
            if(groups.isEmpty()){
                log.info("group不存在,开始创建group");
                // group不存在,创建group
                stringRedisTemplate.opsForStream().createGroup(queueName, ReadOffset.latest(), "g1");
                log.info("group创建完毕");
            }
        }

        private void handlePendingList() {
            while (true) {
                try {
                    // 1.获取PendingList中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1), // 不需要BLOCK
                            StreamOffset.create(queueName, ReadOffset.from("0")) // 从PendingList中0开始读取
                    );
                    // 2.判断订单信息是否为空
                    if (list == null || list.isEmpty()) {
                        // 如果为null,说明PendingList没有消息,这里是结束循环,不再继续
                        break;
                    }
                    // 3.解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    // 将Redis对象转为VoucherOrder对象
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 4.读取消息成功后,创建订单
                    handleVoucherOrder(voucherOrder);
                    // 5.确认消息 XACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理PendingList中的订单异常", e);
                }
            }
        }
    }
    


    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        // 1.获取用户,注意,这里是单独的线程,所以不能从主线程的ThreadLocal获取userId
        Long userId = voucherOrder.getUserId();
        // 2.创建锁对象
        RLock redisLock = redissonClient.getLock("lock:order:" + userId);
        // 3.尝试获取锁,这里加锁是兜底方案,可以不用再加锁,因为前面执行过lua脚本校验过一人一单
        boolean isLock = redisLock.tryLock();
        // 4.判断是否获得锁成功
        if (!isLock) {
            // 获取锁失败,直接返回失败或者重试
            log.error("不允许重复下单!");
            return;
        }
        try {
            // 注意:不能通过 IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); 获取代理对象
            // 因为AopContext.currentProxy();底层内部也有个ThreadLocal,但是此时的线程是新开启的线程,所以不能获取不到主线程中的代理对象
            // 所以需要在主线程中先获取到代理对象,保存到全局变量供所有线程使用
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            // 释放锁
            redisLock.unlock();
        }
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
        Long userId = UserHolder.getUser().getId();
        long orderId = redisIdWorker.nextId("order");
        // 1.执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(), String.valueOf(orderId)
        );
        int r = result.intValue();
        // 2.判断结果是否为0
        if (r != 0) {
            // 2.1.不为0 ,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        // 3.获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        // 4.返回订单id
        return Result.ok(orderId);
    }

    @Transactional
    @Override
    public void createVoucherOrder(VoucherOrder voucherOrder) {

        Long userId = voucherOrder.getUserId();
        // 5.1.查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        // 5.2.判断是否存在
        if (count > 0) {
            // 用户已经购买过了
            log.error("用户已经购买过了");
            return;
        }

        // 6.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1") // set stock = stock - 1
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
                .update();
        if (!success) {
            // 扣减失败
            log.error("库存不足");
            return;
        }
        // 7.创建订单
        save(voucherOrder);
    }

}

测试结果:
在这里插入图片描述

  • 平均耗时和吞吐量有了明显提升
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐