引入依赖:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

消息提供者:

@Override
    public void produce(GiveVouchersParamDTO giveVouchersParamDTO) {
        // TODO:转成JSON格式
        String msg = JsonUtil.encode(giveVouchersParamDTO);
        // TODO: 左放入消息队列
        stringRedisTemplate.opsForList().leftPush(MESSAGE_KEY, msg);
    }

消息消费者:

@Override
    public void blockingConsume() {

//管道返回对应值
        List<Object> obj = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
            @Nullable
            @Override
            public Object doInRedis(RedisConnection connection) throws DataAccessException {
                //队列没有元素会阻塞操作,直到队列获取新的元素或超时,
                //阻塞线程每隔20s超时执行一次。该方法解决了 CPU 空转的问题。
                //MESSAGE_KEY: redisKye
                return connection.bLPop(20, MESSAGE_KEY.getBytes());
            }
        }, new StringRedisSerializer());

        if (obj == null) {
            return;
        }
        // 对管道返回值进行对应处理
        for (Object value : obj) {
            if (value != null) {
                if (value instanceof List) {
                    List<String> list = (List) value;
                    for (String v : list) {
                        if (v != null) {
                            if (!v.startsWith("{")) {
                                continue;
                            }
                            try {
                                /** 拿取到消息做对应的逻辑处理 */
                                GiveVouchersParamDTO giveVouchersParamDTO =   JsonUtil.decode(v,GiveVouchersParamDTO.class);
                                log.info(v);
                                log.info(giveVouchersParamDTO );
                            } catch (Exception e) {
                                log.error(e.getMessage(), e);
                            } 
                        }
                    }
                }
            }
        }
    }

启动线程去监听消息消费:
@PostConstruct:被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器调用一次,类似于Serclet的init()方法。被 @PostConstruct修饰的方法会在构造函数之后,init()方法之前运行
如果想在生成对象时完成某些初始化操作,而偏偏这些初始化操作又依赖于依赖注入,那么久无法在构造函数中实现。为此,可以使用@PostConstruct注解一个方法来完成初始化,@PostConstruct注解的方法将会在依赖注入完成后被自动调用。

/**
 * Redis 消息启动监听
 */
@Component
public class RedisMqJob {

    @Autowired
    RedisMQService redisMQService;

    @PostConstruct
    public void consume() {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        redisMQService.blockingConsume();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        thread.start();
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐