代码如下,现需要批量处理业务,为了防止一条数据被多次处理,故缓存到redis中。又因可以同时操作多条数据,故考虑用redis事务,当其中一个已被其他人处理中时,之前设置的也全部回滚。但实际过程中 redisOperations.opsForValue().setIfAbsent();并非返回true或false。需要等到exec时才能获取返回值。故以下做法就无效了,需要换一种实现方式。

  RedisTemplate redisTemplate = RedisUtils.getRedisTemplate();
        Object result = redisTemplate.execute(new SessionCallback() {
            @Override
            public Object execute(RedisOperations redisOperations) throws DataAccessException {
                redisOperations.multi();
                List<String> existReportCode = new ArrayList<>();
                for (String reportCode: reportCodes) {
                    redisOperations.opsForSet().add(reportBatchSet,reportCode);
                    //下面setIfAbsent返回null
                    if(!redisOperations.opsForValue().setIfAbsent(reportBatchWorkFlow+"-"+reportCode,"1")){
                        existReportCode.add(reportCode);
                    }
                }

                if(CollectionUtils.isEmpty(existReportCode)){
                    redisOperations.exec();
                    return null;
                }else{
                    redisOperations.discard();
                    return StringUtils.join(existReportCode);
                }

            }
        });

后面还是取消了事务,采用悲观锁实现。


        String reportBatchWorkFlow = "reportBatchWorkFlow";
        String pendingReportCode = "pendingReportCode";
        String reportBatchSet = reportBatchWorkFlow + "_set-" +ShiroUtils.getUserId();

        //校验
        if(RedisLockUtils.tryLock(reportBatchWorkFlow)){
            try{
                //报告状态校验
                List<Report> reportList = reportDao.findByReportCodeInAndCustomerId(Arrays.asList(reportCodes));
                for (Report report: reportList) {
                    if(!report.getState().equals(entity.getState())){
                        return new ResponseData(2,"报告["+report.getReportCode()+"]已经流转到["+ReportStatus.getName(Integer.parseInt(report.getState()))+"]节点");
                    }

                }
                //当前任务校验
                if(RedisUtils.hasKey(reportBatchSet) && CollectionUtils.isNotEmpty(RedisUtils.getSets(reportBatchSet))){
                    return new ResponseData(1,"当前有任务正在处理中");
                }

                //处理中任务校验
                Set existReportCodes = RedisUtils.setIntersect(pendingReportCode,reportCodes);
                if(CollectionUtils.isNotEmpty(existReportCodes)){
                    return new ResponseData(3,"报告["+String.join(",",existReportCodes)+"]正在处理中,请稍后再试");
                }

                RedisUtils.setsAdd(reportBatchSet,reportCodes);
                RedisUtils.setsAdd(pendingReportCode,reportCodes);

            }finally {
                RedisLockUtils.releaseLock(reportBatchWorkFlow);
            }
        }else{
            return new ResponseData(4,"获取锁超时,请稍后再试");
        }

        //实际业务处理

自己写的redis锁


@Component
public class RedisLockUtils {

    private static RedisTemplate redisTemplate;

    public static final String LOCKPREFIX = "LOCK_";
    public static final int DEFAULT_RETRY_COUNT = 10;
    public static final long DEFAULT_RETRY_TIME = 100;

    @Autowired
    private JedisConnectionFactory jedisConnectionFactory;

    @PostConstruct
    public void init() {
        this.redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(jedisConnectionFactory);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        redisTemplate.setDefaultSerializer(stringRedisSerializer);
        redisTemplate.afterPropertiesSet();

        redisTemplate.delete(redisTemplate.keys(LOCKPREFIX+"*"));
    }


    /**
     * 获取锁
     * @param key   业务代码
     * @return
     * @throws InterruptedException
     */
    public static Boolean tryLock(String key) throws InterruptedException {
       return tryLock(key,DEFAULT_RETRY_COUNT,DEFAULT_RETRY_TIME);
    }
    /**
     * 获取锁
     * @param key   业务代码
     * @param count 重试次数
     * @return
     * @throws InterruptedException
     */
    public static Boolean tryLock(String key,int count,long retryTime) throws InterruptedException {
        Boolean lock = Boolean.FALSE;
        do{
            if(count < 0){
                return false;
            }
            lock = redisTemplate.opsForValue().setIfAbsent(LOCKPREFIX+key, Thread.currentThread().getName());
            Thread.sleep(retryTime);
            count--;
        }while (!lock);

        return lock;
    }


    public static void releaseLock(String key){
        redisTemplate.delete(LOCKPREFIX+key);
    }

}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐