Spring boot 集成 Redis Stream实践
本文介绍spring-boot集成redis实现stream流消息的实践
版权说明: 本文由博主keep丶原创,转载请注明出处。
原文地址: https://blog.csdn.net/qq_38688267/article/details/114831524
简介
Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。
简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
实践
我的应用场景是做异常信息拦截的持久化,因为异常拦截是在下层支撑模块中定义的,没有直接对接DB的能力,所以只能通过MQ的方式交给实际的业务模块实现。
之前我是通过RabbitMQ实现的,但是这样会导致所有上层应用都有RabbitMQ的依赖,而上层需要用到RabbitMQ的模块不多,所以选择使用Redis的Stream代替,我们直接看具体实现:
producer
说是生产者,实际上就是调用redisTemplate..opsForStream().add(Record<K, ?> record)
方法添加一条消息。
以下是实际场景的部分核心代码:
try {
RedisStreamProducer.sendExceptionLogMsg(exceptionLog);
} catch (Exception exception) {
log.error("send exception log fail~~~");
}
/**
* 发送异常日志消息
*
* @param exceptionLog 异常消息内容
*/
public static void sendExceptionLogMsg(SysExceptionLog exceptionLog) {
RedisUtil.getInstance().sendStreamSmg(getExceptionLogKey(), exceptionLog);
}
/**
* 发送流消息
*
* @param streamKey 流通道键值
* @param msgContext 消息内容
*/
public void sendStreamSmg(String streamKey, Object msgContext) {
getStringTemplate().opsForStream()
.add(Record.of(msgContext).withStreamKey(streamKey));
}
consumer
消费者的实现也比较简单,我这里就只是做一个入库的操作。
@Slf4j
@Component
public class ExceptionLogStreamConsumer implements StreamListener<String, ObjectRecord<String, SysExceptionLog>> {
@Autowired
SysExceptionLogService exceptionLogService;
@Override
public void onMessage(ObjectRecord<String, SysExceptionLog> record) {
exceptionLogService.save(record.getValue());
}
}
config(全文重点)
前面生产者和消费者都比较简单,相对比较麻烦的是配置。
这配置主要做两个事情:一个是创建消息队列;另外一个就是创建这个消息队列的监听器,并绑定上面定义好的消费者。
/**
* 异常日志流监听器
*/
@Slf4j
@Component
public class ExceptionLogStreamListener implements ApplicationRunner, DisposableBean {
@Resource
RedisConnectionFactory redisConnectionFactory;
@Resource
ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Resource
ExceptionLogStreamConsumer streamMessageListener;
@Resource
StringRedisTemplate stringRedisTemplate;
private StreamMessageListenerContainer<String, ObjectRecord<String, SysExceptionLog>> streamMessageListenerContainer;
@Override
public void run(ApplicationArguments args) throws UnknownHostException {
/**
* 这里必须先判空,重复创建组会报错,获取不存在的key的组也会报错
* 所以需要先判断是否存在key,在判断是否存在组
* 我这里只有一个组,如果需要创建多个组的话则需要改下逻辑
*/
if (stringRedisTemplate.hasKey(RedisStreamProducer.getExceptionLogKey())) {
StreamInfo.XInfoGroups groups = stringRedisTemplate.opsForStream().groups(RedisStreamProducer.getExceptionLogKey());
if (groups.isEmpty()) {
stringRedisTemplate.opsForStream().createGroup(RedisStreamProducer.getExceptionLogKey(), RedisStreamProducer.getExceptionLogGroup());
}
} else {
stringRedisTemplate.opsForStream().createGroup(RedisStreamProducer.getExceptionLogKey(), RedisStreamProducer.getExceptionLogGroup());
}
// 创建配置对象
StreamMessageListenerContainerOptions<String, ObjectRecord<String, SysExceptionLog>> streamMessageListenerContainerOptions = StreamMessageListenerContainerOptions
.builder()
// 一次性最多拉取多少条消息
.batchSize(10)
// 执行消息轮询的执行器
.executor(this.threadPoolTaskExecutor)
// 消息消费异常的handler
.errorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
// throw new RuntimeException(t);
t.printStackTrace();
log.error("[MQ handler exception] " + t.getMessage());
}
})
// 超时时间,设置为0,表示不超时(超时后会抛出异常)
.pollTimeout(Duration.ZERO)
// 序列化器
.serializer(new StringRedisSerializer())
.targetType(SysExceptionLog.class)
.build();
// 根据配置对象创建监听容器对象
StreamMessageListenerContainer<String, ObjectRecord<String, SysExceptionLog>> streamMessageListenerContainer = StreamMessageListenerContainer
.create(this.redisConnectionFactory, streamMessageListenerContainerOptions);
// 使用监听容器对象开始监听消费(使用的是手动确认方式)
streamMessageListenerContainer.receive(Consumer.from(RedisStreamProducer.getExceptionLogGroup(), InetAddress.getLocalHost().getHostName()),
StreamOffset.create(RedisStreamProducer.getExceptionLogKey(), ReadOffset.lastConsumed()), this.streamMessageListener);
this.streamMessageListenerContainer = streamMessageListenerContainer;
// 启动监听
this.streamMessageListenerContainer.start();
}
@Override
public void destroy() throws Exception {
this.streamMessageListenerContainer.stop();
}
}
效果展示
关于异常捕捉功能,我们实现了可跨服务收集堆栈信息(哪个类哪个方法哪行)返回前端,并实现这些信息的持久化。
可以参考作者的另外一篇博客《全局异常捕获并返回前端报错和堆栈信息》
代码已同步更新到GITEE:https://gitee.com/zengzefeng/easy_frame
相关博客:<springcloud集成seata1.4.0,nacos1.4.0,sharding-jdbc,mybatis-plus实践>
更多推荐
所有评论(0)