@[TOC](java redis redisson 分布式锁 阻塞队列(BlockingQueue) 线程池 的使用)

一、场景

		用户付款成功->上传到第三方订单系统

	    订单付款成功 -> 添加到阻塞队列 -> 触发上传接口 -> 获取到锁(未获取到锁返回)-> 循环取出队列里的数据 ->
	    线程池 -> 多个线程处理 -> 处理完队列里面所有数据释放锁 -> 上传完成

二、代码实现

redisson工具类
		 //redisson配置此处省略
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.16.7</version>
        </dependency>
}

import org.redisson.api.RBlockingQueue;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * @description:
 * @author: alan
 * @time: 2021/12/29 17:32
 */
@Component
public class RedissonUtils<E> {

    private RedissonClient redisson;

    @Autowired
    public RedissonUtils(RedissonClient redisson) {
        this.redisson = redisson;
    }

    /**
     * 阻塞队列添加值的方法
     *
     * @param e         添加的值
     * @param queueName 队列名
     * @return
     * @throws Throwable
     */
    public Boolean setBlockingQueue(E e, String queueName) throws Throwable {
        Boolean isBoolean = false;
        RBlockingQueue<E> queue = redisson.getBlockingQueue(queueName);
        //offer 表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程);      
        isBoolean = queue.offer(e);
        return isBoolean;
    }

    /**
     * 阻塞队列获取首位的对象方法
     *
     * @param queueName
     * @return
     * @throws Throwable
     */
    public E getBlockingQueue(String queueName) throws Throwable {
        RBlockingQueue<E> queue = redisson.getBlockingQueue(queueName);
        //取走BlockingQueue里排在首位的对象,取不到时返回null;
        E value = queue.poll();
        return value;
    }

    /**
     *  获取自动延期锁,需要手工释放锁
     * @param lockName
     * @param time
     * @return
     * @throws Throwable
     */
    public RLock getTryLock(String lockName, int time) throws Throwable {
        RLock lock = redisson.getLock(lockName);    // 拿锁失败时会不停的重试
        // 尝试拿锁time s后停止重试,返回false 具有Watch Dog 自动延期机制 默认续30s
        boolean isBoolean = lock.tryLock(time, TimeUnit.SECONDS);
        if (!isBoolean){
            return  null;
        }
        return lock;
    }


}

}

线程池配置类


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @description: 自定义线程池
 * @author: alan
 * @time: 2021/12/30 14:52
 */
@Configuration
@EnableAsync//开启异步
public class ThreadPoolConfig {
    //在@SpringBootApplication启动类 添加注解@EnableAsync
    //异步方法使用注解@Async("@Bean的名称") ,返回值为void或者Future
    //切记一点 ,异步方法和调用方法一定要写在不同的类中,如果写在一个类中,是没有效果的

    //    在@Async标注的方法,同时也使用@Transactional进行标注;在其调用数据库操作之时,将无法产生事务管理的控制,原因就在于其是基于异步处理的操作。
    //    那该如何给这些操作添加事务管理呢?
    //    可以将需要事务管理操作的方法放置到异步方法内部,在内部被调用的方法上添加@Transactional
    //    示例:
    //    方法A, 使用了@Async/@Transactional来标注,但是无法产生事务控制的目的。
    //    方法B, 使用了@Async来标注,B中调用了C、D,C/D分别使用@Transactional做了标注,则可实现事务控制的目的

    /**
     *  商品订单上传线程池
     * @return
     */
    @Bean("pushGoodsOrderExecutor")
    public ThreadPoolTaskExecutor pushGoodsOrderExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(2);
        // 设置最大线程数
        executor.setMaxPoolSize(3);
        // 设置队列容量
        executor.setQueueCapacity(10);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        // 设置默认线程名称
        executor.setThreadNamePrefix("pushGoodsOrderExecutor-");
        // 设置拒绝策略 ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

基类


import java.util.List;

/**
 * @description:
 * @author: alan
 * @time: 2021/12/30 11:03
 */
public abstract class AbstractJstService<T> {

    /**
     * 上传订单
     */
    public abstract void pushOrder();


}

派生类

import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.concurrent.RejectedExecutionException;

/**
 * @description:
 * @author: alan
 * @time: 2021/12/30 11:04
 */
@Service
public class JstOrderServiceImpl extends AbstractJstService {

    private final Logger log = LoggerFactory.getLogger(JstOrderServiceImpl.class);

    private RedissonUtils redissonUtils;

    private AsyncJstOrderService asyncJstOrderService;

    @Autowired
    public JstOrderServiceImpl(RedissonUtils redissonUtils, AsyncJstOrderService asyncJstOrderService) {
        this.redissonUtils = redissonUtils;
        this.asyncJstOrderService = asyncJstOrderService;
    }

    @Override
    public void pushOrder() {
        RLock rLock = null;
        Boolean isWhile = true;
        try {
            rLock = redissonUtils.getTryLock(BlockingQueueEnum.商品订单上传队列锁.getName(), 10);
            if (rLock != null) {
                log.info("获取锁成功");
                while(isWhile){
                    String id = (String) redissonUtils.getBlockingQueue(BlockingQueueEnum.商品订单上传队列.getName());
                    System.out.println("id:"+id);
                    if (!StringUtils.isEmpty(id)) {
//                        Thread.sleep(500);//当前线程暂停0.5秒钟,再进行处理
                        //队列里面获取到数据了,开始执行业务逻辑
                        try {
                            asyncJstOrderService.pushOrder(Long.valueOf(id));
                        }catch (RejectedExecutionException rejectedExecutionException){
                            //线程池已满,触发RejectedExecutionException 返回
                            redissonUtils.setBlockingQueue(id,BlockingQueueEnum.商品订单上传队列.getName());//把没有处理id重新放回队列
                            log.info("线程池队列已满");
                            Thread.sleep(1000);//当前线程暂停1秒钟,再进行处理
                        }

                    }else {
                        //队列里面获取不到数据了,取消while循环
                        isWhile = false;
                    }
                }
            }else {
                log.info("获取锁失败");
            }
        } catch (Throwable t) {
            t.printStackTrace();
            if (rLock != null && rLock.isLocked() && rLock.isHeldByCurrentThread()) {
                rLock.unlock();
            }
        }finally {
            //   lock.isLocked():判断要解锁的key是否已被锁定。
            //   lock.isHeldByCurrentThread():判断要解锁的key是否被当前线程持有。
            if (rLock != null && rLock.isLocked() && rLock.isHeldByCurrentThread()) {
                rLock.unlock();
            }
        }
    }



}


异步方法类

package com.gemo.bear.service.order.jst.util;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * @description: 商品订单异步处理类
 * @author: alan
 * @time: 2021/12/30 15:56
 */
@Component
public class AsyncJstOrderService {

    @Async("pushGoodsOrderExecutor")
    public void pushOrder(Long id){
        System.out.println("商品订单异步线程ID:" + Thread.currentThread().getId() + "线程名字:" +Thread.currentThread().getName()+"执行异步任务:"+id);
    }
}

.枚举类

package com.gemo.bear.bean.order.enums;

public enum BlockingQueueEnum {
    商品订单上传队列("goods_order_push"),
    商品订单上传队列锁("goods_order_push_luck"),
    商品订单上传Error队列("goods_order_push_error");

    private String name;

    BlockingQueueEnum(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}


Controller类

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

@Api(tags = "订单相关")
@RestController
@RequestMapping("/order")
public class OrderController {
    @Resource(type = JstOrderServiceImpl.class)
    private AbstractJstService abstractJstService;
    
    @ApiOperation("测试添加队列数据")
    @PostMapping("test-setBlockingQueue")
    public ResponseBuild setBlockingQueue(@RequestBody Order order) throws Throwable {
        try {
            for (int i = 0; i < 15; i++) {
                boolean b = redissonUtils.setBlockingQueue(String.valueOf(i), BlockingQueueEnum.商品订单上传队列.getName());
                if (b) {
                    System.out.println("加入队列成功");
                }
            }
        } catch (Exception ex) {
            System.out.println(ex.getMessage());
        } finally {

        }
        return ResponseBuild.success();
    }

    @ApiOperation("测试上传")
    @PostMapping("test-pullOrder")
    public ResponseBuild testPullOrder() throws Throwable {
        try {
            abstractJstService.pullOrder();
        } catch (Exception ex) {
            System.out.println(ex.getMessage());
        } finally {

        }
        return ResponseBuild.success();
    }

}

线程运行情况

在这里插入图片描述

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐