1.生成RedissonConfig 配置

package com.hh.ota.config;

import com.hh.ota.dto.OtaTimeOut;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
@Slf4j
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private Integer port;
    @Value("${spring.redis.password:}")
    private String password;
    @Value("${spring.redis.database:0}")
    private int database;

    @Value("${spring.redis.jedis.pool.max-active:1000}")
    private int maxActive;
    @Value("${spring.redis.jedis.pool.min-idle:10}")
    private int minIdel;



    @Bean
    public RedissonClient redissonClient() {
        //redisson版本是3.5,集群的ip前面要加上“redis://”,不然会报错,3.2版本可不加
        String node = "redis://" + host + ":" + port;
        RedissonClient redisson = null;
        Config config = new Config();
        SingleServerConfig c1 = config.useSingleServer().setAddress(node);
        if (StringUtils.isNotBlank(password)) {
            c1.setPassword(password);
        }
        c1.setDatabase(database);
        c1.setConnectionMinimumIdleSize(minIdel);
        c1.setConnectionPoolSize((maxActive / 10)); // 1000/50 = 20
        redisson = Redisson.create(config);
        //可通过打印redisson.getConfig().toJSON().toString()来检测是否配置成功
        return redisson;
    }

    @Bean
    @Qualifier("timeoutQueue")
    public RDelayedQueue<OtaTimeOut> timeoutQueue() {
        RDelayedQueue<OtaTimeOut> req = null;
        try {
            req = redissonClient().getDelayedQueue(redissonClient()
                    .getBlockingQueue("ota:timeout:upgrade"));
        } catch (Exception e) {
            log.warn("获取延迟队列失败,原因:{}", e.getMessage());
        }

        return req;
    }

    @Bean
    @Qualifier("blockingQueue")
    public RBlockingQueue<OtaTimeOut> blockingQueue() {
        RBlockingQueue<OtaTimeOut> req = null;
        try {
            req = redissonClient().getBlockingQueue("ota:timeout:upgrade");
        } catch (Exception e) {
            log.warn("获取延迟队列失败,原因:{}", e.getMessage());
        }

        return req;
    }

}

2.实现TimeoutHandler过期队列

package com.hh.ota.handler;

import com.alibaba.fastjson.JSON;
import com.hh.ota.dto.TimeOut;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
 * 过期队列
 */
@Component
@Slf4j
public class TimeoutHandler implements ApplicationListener<ContextClosedEvent> {

    private final Thread handlerThread = new Thread(new TimeoutTask());
    @Resource
    private RBlockingQueue<TimeOut> blockingQueue;
    @Resource
    private RDelayedQueue<TimeOut> timeoutQueue;
  

    private static volatile boolean lifeFlg = false;

    @PostConstruct
    public void init() {

        lifeFlg = true;
        handlerThread.start();
        log.info("超时队列处理器启动成功。");

    }

    
    @Async
    protected void handleTimeoutCmd(TimeOut timeOut) {
            //超时执行数据
        

    }


    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        lifeFlg = false;
    }

    class TimeoutTask implements Runnable {

        @Override
        public void run() {

            while (lifeFlg) {

                try {
                    TimeOut timeoutCmd =  blockingQueue.take();
                    log.info("超时队列执行入参:{}", JSON.toJSONString(timeoutCmd));
                    handleTimeoutCmd(timeoutCmd);
                    log.info("超时队列执行结束");
                } catch (Exception e) {
                    log.warn("超时队列处理器发生错误:{}", e.getMessage());
                }
            }

        }
    }

    //添加
    public void offer(Supplier<TimeOut> supplier, Long timeOut) {
        Object cmd = supplier.get();
        log.info("添加超时队列元素入参:{}", JSON.toJSONString(cmd));
        timeoutQueue.offer(supplier.get(), timeOut, TimeUnit.SECONDS);
    }


    public void remove(Supplier<TimeOut> supplier) {
        Object cmd = supplier.get();
        log.info("超时队列元素删除入参:{}", JSON.toJSONString(cmd));
        boolean result = timeoutQueue.remove(cmd);
        log.info("超时队列元素删除结果:{}", result);
    }



}

3.具体实现方法

//添加超时队列
timeoutHandler.offer((()->{
                OtaTimeOut ota = new OtaTimeOut();
                return ota;
            }),Long.valueOf(outTime));
//删除队列
timeoutHandler.remove(()->{
                OtaSendTimeOut otaSendTimeOut = new OtaSendTimeOut();
                return otaSendTimeOut;
            });
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐