版权说明: 本文由博主keep丶原创,转载请注明出处。
原文地址: https://blog.csdn.net/qq_38688267/article/details/114831524

简介

  Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

  简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

上文摘抄自<菜鸟教程>,更多信息和基础命令请直接访问访问<菜鸟教程>

实践

   我的应用场景是做异常信息拦截的持久化,因为异常拦截是在下层支撑模块中定义的,没有直接对接DB的能力,所以只能通过MQ的方式交给实际的业务模块实现。
   之前我是通过RabbitMQ实现的,但是这样会导致所有上层应用都有RabbitMQ的依赖,而上层需要用到RabbitMQ的模块不多,所以选择使用Redis的Stream代替,我们直接看具体实现:

producer

   说是生产者,实际上就是调用redisTemplate..opsForStream().add(Record<K, ?> record)方法添加一条消息。

   以下是实际场景的部分核心代码:


	try {
            RedisStreamProducer.sendExceptionLogMsg(exceptionLog);
        } catch (Exception exception) {
            log.error("send exception log fail~~~");
        }


    /**
     * 发送异常日志消息
     *
     * @param exceptionLog 异常消息内容
     */
    public static void sendExceptionLogMsg(SysExceptionLog exceptionLog) {
        RedisUtil.getInstance().sendStreamSmg(getExceptionLogKey(), exceptionLog);
    }

    /**
     * 发送流消息
     *
     * @param streamKey  流通道键值
     * @param msgContext 消息内容
     */
    public void sendStreamSmg(String streamKey, Object msgContext) {
        getStringTemplate().opsForStream()
			.add(Record.of(msgContext).withStreamKey(streamKey));
    }
consumer

  消费者的实现也比较简单,我这里就只是做一个入库的操作。

@Slf4j
@Component
public class ExceptionLogStreamConsumer implements StreamListener<String, ObjectRecord<String, SysExceptionLog>> {

    @Autowired
    SysExceptionLogService exceptionLogService;

    @Override
    public void onMessage(ObjectRecord<String, SysExceptionLog> record) {
        exceptionLogService.save(record.getValue());
    }
}

config(全文重点)

  前面生产者和消费者都比较简单,相对比较麻烦的是配置。

  这配置主要做两个事情:一个是创建消息队列;另外一个就是创建这个消息队列的监听器,并绑定上面定义好的消费者。


/**
 * 异常日志流监听器
 */
@Slf4j
@Component
public class ExceptionLogStreamListener implements ApplicationRunner, DisposableBean {

    @Resource
    RedisConnectionFactory redisConnectionFactory;

    @Resource
    ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Resource
    ExceptionLogStreamConsumer streamMessageListener;

    @Resource
    StringRedisTemplate stringRedisTemplate;

    private StreamMessageListenerContainer<String, ObjectRecord<String, SysExceptionLog>> streamMessageListenerContainer;

    @Override
    public void run(ApplicationArguments args) throws UnknownHostException {
    	/**
         * 这里必须先判空,重复创建组会报错,获取不存在的key的组也会报错
         * 所以需要先判断是否存在key,在判断是否存在组
         * 我这里只有一个组,如果需要创建多个组的话则需要改下逻辑
         */
        if (stringRedisTemplate.hasKey(RedisStreamProducer.getExceptionLogKey())) {
            StreamInfo.XInfoGroups groups = stringRedisTemplate.opsForStream().groups(RedisStreamProducer.getExceptionLogKey());
            if (groups.isEmpty()) {
                stringRedisTemplate.opsForStream().createGroup(RedisStreamProducer.getExceptionLogKey(), RedisStreamProducer.getExceptionLogGroup());
            }
        } else {
            stringRedisTemplate.opsForStream().createGroup(RedisStreamProducer.getExceptionLogKey(), RedisStreamProducer.getExceptionLogGroup());
        }


        // 创建配置对象
        StreamMessageListenerContainerOptions<String, ObjectRecord<String, SysExceptionLog>> streamMessageListenerContainerOptions = StreamMessageListenerContainerOptions
                .builder()
                // 一次性最多拉取多少条消息
                .batchSize(10)
                // 执行消息轮询的执行器
                .executor(this.threadPoolTaskExecutor)
                // 消息消费异常的handler
                .errorHandler(new ErrorHandler() {
                    @Override
                    public void handleError(Throwable t) {
                        // throw new RuntimeException(t);
                        t.printStackTrace();
                        log.error("[MQ handler exception] " + t.getMessage());
                    }
                })
                // 超时时间,设置为0,表示不超时(超时后会抛出异常)
                .pollTimeout(Duration.ZERO)
                // 序列化器
                .serializer(new StringRedisSerializer())
                .targetType(SysExceptionLog.class)
                .build();

        // 根据配置对象创建监听容器对象
        StreamMessageListenerContainer<String, ObjectRecord<String, SysExceptionLog>> streamMessageListenerContainer = StreamMessageListenerContainer
                .create(this.redisConnectionFactory, streamMessageListenerContainerOptions);

        // 使用监听容器对象开始监听消费(使用的是手动确认方式)
        streamMessageListenerContainer.receive(Consumer.from(RedisStreamProducer.getExceptionLogGroup(), InetAddress.getLocalHost().getHostName()),
                StreamOffset.create(RedisStreamProducer.getExceptionLogKey(), ReadOffset.lastConsumed()), this.streamMessageListener);

        this.streamMessageListenerContainer = streamMessageListenerContainer;
        // 启动监听
        this.streamMessageListenerContainer.start();

    }

    @Override
    public void destroy() throws Exception {
        this.streamMessageListenerContainer.stop();
    }
}

效果展示

在这里插入图片描述
在这里插入图片描述

  关于异常捕捉功能,我们实现了可跨服务收集堆栈信息(哪个类哪个方法哪行)返回前端,并实现这些信息的持久化。

  可以参考作者的另外一篇博客《全局异常捕获并返回前端报错和堆栈信息》

  代码已同步更新到GITEE:https://gitee.com/zengzefeng/easy_frame

  相关博客:<springcloud集成seata1.4.0,nacos1.4.0,sharding-jdbc,mybatis-plus实践>

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐