SpringBoot集成Redis实现消息队列的方法(stringRedisTemplate.executePipelined)的用法
引入依赖:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>消息提供者:@Overridepublic void produce(G
·
引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
消息提供者:
@Override
public void produce(GiveVouchersParamDTO giveVouchersParamDTO) {
// TODO:转成JSON格式
String msg = JsonUtil.encode(giveVouchersParamDTO);
// TODO: 左放入消息队列
stringRedisTemplate.opsForList().leftPush(MESSAGE_KEY, msg);
}
消息消费者:
@Override
public void blockingConsume() {
//管道返回对应值
List<Object> obj = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
@Nullable
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
//队列没有元素会阻塞操作,直到队列获取新的元素或超时,
//阻塞线程每隔20s超时执行一次。该方法解决了 CPU 空转的问题。
//MESSAGE_KEY: redisKye
return connection.bLPop(20, MESSAGE_KEY.getBytes());
}
}, new StringRedisSerializer());
if (obj == null) {
return;
}
// 对管道返回值进行对应处理
for (Object value : obj) {
if (value != null) {
if (value instanceof List) {
List<String> list = (List) value;
for (String v : list) {
if (v != null) {
if (!v.startsWith("{")) {
continue;
}
try {
/** 拿取到消息做对应的逻辑处理 */
GiveVouchersParamDTO giveVouchersParamDTO = JsonUtil.decode(v,GiveVouchersParamDTO.class);
log.info(v);
log.info(giveVouchersParamDTO );
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
}
}
}
}
启动线程去监听消息消费:
@PostConstruct:被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器调用一次,类似于Serclet的init()方法。被 @PostConstruct修饰的方法会在构造函数之后,init()方法之前运行
如果想在生成对象时完成某些初始化操作,而偏偏这些初始化操作又依赖于依赖注入,那么久无法在构造函数中实现。为此,可以使用@PostConstruct注解一个方法来完成初始化,@PostConstruct注解的方法将会在依赖注入完成后被自动调用。
/**
* Redis 消息启动监听
*/
@Component
public class RedisMqJob {
@Autowired
RedisMQService redisMQService;
@PostConstruct
public void consume() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
redisMQService.blockingConsume();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
thread.start();
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)