redis事务中setIfAbsent无法及时返回true或false,返回null
代码如下,现需要批量处理业务,为了防止一条数据被多次处理,故缓存到redis中。又因可以同时操作多条数据,故考虑用redis事务,当其中一个已被其他人处理中时,之前设置的也全部回滚。但实际过程中 redisOperations.opsForValue().setIfAbsent();并非返回true或false。需要等到exec时才能获取返回值。故一下做法就无效了,需要换一种实现方式。RedisT
·
代码如下,现需要批量处理业务,为了防止一条数据被多次处理,故缓存到redis中。又因可以同时操作多条数据,故考虑用redis事务,当其中一个已被其他人处理中时,之前设置的也全部回滚。但实际过程中 redisOperations.opsForValue().setIfAbsent();并非返回true或false。需要等到exec时才能获取返回值。故以下做法就无效了,需要换一种实现方式。
RedisTemplate redisTemplate = RedisUtils.getRedisTemplate();
Object result = redisTemplate.execute(new SessionCallback() {
@Override
public Object execute(RedisOperations redisOperations) throws DataAccessException {
redisOperations.multi();
List<String> existReportCode = new ArrayList<>();
for (String reportCode: reportCodes) {
redisOperations.opsForSet().add(reportBatchSet,reportCode);
//下面setIfAbsent返回null
if(!redisOperations.opsForValue().setIfAbsent(reportBatchWorkFlow+"-"+reportCode,"1")){
existReportCode.add(reportCode);
}
}
if(CollectionUtils.isEmpty(existReportCode)){
redisOperations.exec();
return null;
}else{
redisOperations.discard();
return StringUtils.join(existReportCode);
}
}
});
后面还是取消了事务,采用悲观锁实现。
String reportBatchWorkFlow = "reportBatchWorkFlow";
String pendingReportCode = "pendingReportCode";
String reportBatchSet = reportBatchWorkFlow + "_set-" +ShiroUtils.getUserId();
//校验
if(RedisLockUtils.tryLock(reportBatchWorkFlow)){
try{
//报告状态校验
List<Report> reportList = reportDao.findByReportCodeInAndCustomerId(Arrays.asList(reportCodes));
for (Report report: reportList) {
if(!report.getState().equals(entity.getState())){
return new ResponseData(2,"报告["+report.getReportCode()+"]已经流转到["+ReportStatus.getName(Integer.parseInt(report.getState()))+"]节点");
}
}
//当前任务校验
if(RedisUtils.hasKey(reportBatchSet) && CollectionUtils.isNotEmpty(RedisUtils.getSets(reportBatchSet))){
return new ResponseData(1,"当前有任务正在处理中");
}
//处理中任务校验
Set existReportCodes = RedisUtils.setIntersect(pendingReportCode,reportCodes);
if(CollectionUtils.isNotEmpty(existReportCodes)){
return new ResponseData(3,"报告["+String.join(",",existReportCodes)+"]正在处理中,请稍后再试");
}
RedisUtils.setsAdd(reportBatchSet,reportCodes);
RedisUtils.setsAdd(pendingReportCode,reportCodes);
}finally {
RedisLockUtils.releaseLock(reportBatchWorkFlow);
}
}else{
return new ResponseData(4,"获取锁超时,请稍后再试");
}
//实际业务处理
自己写的redis锁
@Component
public class RedisLockUtils {
private static RedisTemplate redisTemplate;
public static final String LOCKPREFIX = "LOCK_";
public static final int DEFAULT_RETRY_COUNT = 10;
public static final long DEFAULT_RETRY_TIME = 100;
@Autowired
private JedisConnectionFactory jedisConnectionFactory;
@PostConstruct
public void init() {
this.redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(jedisConnectionFactory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setDefaultSerializer(stringRedisSerializer);
redisTemplate.afterPropertiesSet();
redisTemplate.delete(redisTemplate.keys(LOCKPREFIX+"*"));
}
/**
* 获取锁
* @param key 业务代码
* @return
* @throws InterruptedException
*/
public static Boolean tryLock(String key) throws InterruptedException {
return tryLock(key,DEFAULT_RETRY_COUNT,DEFAULT_RETRY_TIME);
}
/**
* 获取锁
* @param key 业务代码
* @param count 重试次数
* @return
* @throws InterruptedException
*/
public static Boolean tryLock(String key,int count,long retryTime) throws InterruptedException {
Boolean lock = Boolean.FALSE;
do{
if(count < 0){
return false;
}
lock = redisTemplate.opsForValue().setIfAbsent(LOCKPREFIX+key, Thread.currentThread().getName());
Thread.sleep(retryTime);
count--;
}while (!lock);
return lock;
}
public static void releaseLock(String key){
redisTemplate.delete(LOCKPREFIX+key);
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)