基于Redisson实现延时队列 redis 主备
基于RBlockingQueue,RDelayedQueue实现延迟队列
·
1.生成RedissonConfig 配置
package com.hh.ota.config;
import com.hh.ota.dto.OtaTimeOut;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private Integer port;
@Value("${spring.redis.password:}")
private String password;
@Value("${spring.redis.database:0}")
private int database;
@Value("${spring.redis.jedis.pool.max-active:1000}")
private int maxActive;
@Value("${spring.redis.jedis.pool.min-idle:10}")
private int minIdel;
@Bean
public RedissonClient redissonClient() {
//redisson版本是3.5,集群的ip前面要加上“redis://”,不然会报错,3.2版本可不加
String node = "redis://" + host + ":" + port;
RedissonClient redisson = null;
Config config = new Config();
SingleServerConfig c1 = config.useSingleServer().setAddress(node);
if (StringUtils.isNotBlank(password)) {
c1.setPassword(password);
}
c1.setDatabase(database);
c1.setConnectionMinimumIdleSize(minIdel);
c1.setConnectionPoolSize((maxActive / 10)); // 1000/50 = 20
redisson = Redisson.create(config);
//可通过打印redisson.getConfig().toJSON().toString()来检测是否配置成功
return redisson;
}
@Bean
@Qualifier("timeoutQueue")
public RDelayedQueue<OtaTimeOut> timeoutQueue() {
RDelayedQueue<OtaTimeOut> req = null;
try {
req = redissonClient().getDelayedQueue(redissonClient()
.getBlockingQueue("ota:timeout:upgrade"));
} catch (Exception e) {
log.warn("获取延迟队列失败,原因:{}", e.getMessage());
}
return req;
}
@Bean
@Qualifier("blockingQueue")
public RBlockingQueue<OtaTimeOut> blockingQueue() {
RBlockingQueue<OtaTimeOut> req = null;
try {
req = redissonClient().getBlockingQueue("ota:timeout:upgrade");
} catch (Exception e) {
log.warn("获取延迟队列失败,原因:{}", e.getMessage());
}
return req;
}
}
2.实现TimeoutHandler过期队列
package com.hh.ota.handler;
import com.alibaba.fastjson.JSON;
import com.hh.ota.dto.TimeOut;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
* 过期队列
*/
@Component
@Slf4j
public class TimeoutHandler implements ApplicationListener<ContextClosedEvent> {
private final Thread handlerThread = new Thread(new TimeoutTask());
@Resource
private RBlockingQueue<TimeOut> blockingQueue;
@Resource
private RDelayedQueue<TimeOut> timeoutQueue;
private static volatile boolean lifeFlg = false;
@PostConstruct
public void init() {
lifeFlg = true;
handlerThread.start();
log.info("超时队列处理器启动成功。");
}
@Async
protected void handleTimeoutCmd(TimeOut timeOut) {
//超时执行数据
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
lifeFlg = false;
}
class TimeoutTask implements Runnable {
@Override
public void run() {
while (lifeFlg) {
try {
TimeOut timeoutCmd = blockingQueue.take();
log.info("超时队列执行入参:{}", JSON.toJSONString(timeoutCmd));
handleTimeoutCmd(timeoutCmd);
log.info("超时队列执行结束");
} catch (Exception e) {
log.warn("超时队列处理器发生错误:{}", e.getMessage());
}
}
}
}
//添加
public void offer(Supplier<TimeOut> supplier, Long timeOut) {
Object cmd = supplier.get();
log.info("添加超时队列元素入参:{}", JSON.toJSONString(cmd));
timeoutQueue.offer(supplier.get(), timeOut, TimeUnit.SECONDS);
}
public void remove(Supplier<TimeOut> supplier) {
Object cmd = supplier.get();
log.info("超时队列元素删除入参:{}", JSON.toJSONString(cmd));
boolean result = timeoutQueue.remove(cmd);
log.info("超时队列元素删除结果:{}", result);
}
}
3.具体实现方法
//添加超时队列
timeoutHandler.offer((()->{
OtaTimeOut ota = new OtaTimeOut();
return ota;
}),Long.valueOf(outTime));
//删除队列
timeoutHandler.remove(()->{
OtaSendTimeOut otaSendTimeOut = new OtaSendTimeOut();
return otaSendTimeOut;
});
更多推荐
已为社区贡献1条内容
所有评论(0)