Redis深入理解六 :Redis 分布式锁 以及 Redisson 锁续命分析、读写锁
简单redis锁实现(思考问题)@Autowiredprivate StringRedisTemplate stringRedisTemplate;public String testRedisLock() {String lockKey = "lock:product:1";String clientId = UUID.randomUUID().toString();Boolean result
·
简单redis锁实现(思考问题)
@Autowired
private StringRedisTemplate stringRedisTemplate;
public String testRedisLock() {
String lockKey = "lock:product:1";
String clientId = UUID.randomUUID().toString();
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
if (!result)
return "error";
try {
//TODO
} finally {
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
stringRedisTemplate.delete(lockKey);
}
}
return "ok";
}
大型网站高并发
@Service
public class ProductService {
@Autowired
private ProductDao productDao;
@Autowired
private RedisUtil redisUtil;
@Autowired
private Redisson redisson;
public static final Integer PRODUCT_CACHE_TIMEOUT = 60 * 60 * 24;
public static final String EMPTY_CACHE = "{}";
public static final String LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX = "lock:product:hot_cache_create:";
public static final String LOCK_PRODUCT_UPDATE_PREFIX = "lock:product:update:";
public static Map<String, Product> productMap = new HashMap<>();
@Transactional
public Product create(Product product) {
Product productResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult));
return productResult;
}
@Transactional
public Product update(Product product) {
Product productResult = null;
//RLock productUpdateLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RReadWriteLock productUpdateLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RLock writeLock = productUpdateLock.writeLock();
//加分布式写锁解决缓存双写不一致问题
writeLock.lock();
try {
productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
genProductCacheTimeout(), TimeUnit.SECONDS);
} finally {
writeLock.unlock();
}
return productResult;
}
public Product get(Long productId) {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
//从缓存里查数据
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//加分布式锁解决热点缓存并发重建问题
RLock hotCreateCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX + productId);
hotCreateCacheLock.lock();
// 这个优化谨慎使用,防止超时导致的大规模并发重建问题
try {
hotCreateCacheLock.tryLock(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//RLock productUpdateLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RReadWriteLock productUpdateLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RLock rLock = productUpdateLock.readLock();
//加分布式读锁解决缓存双写不一致问题
rLock.lock();
try {
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),
genProductCacheTimeout(), TimeUnit.SECONDS);
} else {
//设置空缓存解决缓存穿透问题
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
} finally {
rLock.unlock();
}
} finally {
hotCreateCacheLock.unlock();
}
return product;
}
private Integer genProductCacheTimeout() {
//加随机超时机制解决缓存批量失效(击穿)问题
return PRODUCT_CACHE_TIMEOUT + new Random().nextInt(5) * 60 * 60;
}
private Integer genEmptyCacheTimeout() {
return 60 + new Random().nextInt(30);
}
private Product getProductFromCache(String productCacheKey) {
Product product = null;
//多级缓存查询,jvm级别缓存可以交给单独的热点缓存系统统一维护,有变动推送到各个web应用系统自行更新
product = productMap.get(productCacheKey);
if (product != null) {
return product;
}
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)) {
if (EMPTY_CACHE.equals(productStr)) {
redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
return new Product();
}
product = JSON.parseObject(productStr, Product.class);
//缓存读延期
redisUtil.expire(productCacheKey, genProductCacheTimeout(), TimeUnit.SECONDS);
}
return product;
}
}
常规Redisson实现
Redisson github: https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95
@Autowired
private Redisson redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
public String testRedissonLock() {
String lockKey = "lock:product:1";
//获取锁对象
RLock redissonLock = redisson.getLock(lockKey);
//加分布式锁
redissonLock.lock();
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
//解锁
redissonLock.unlock();
}
return "ok";
}
Redisson 加锁原理
Redisson 源码分析
Redisson 加锁
redissonLock.lock();
redissonLock.lock();
redissonLock.lockInterruptibly();
redissonLock.lockInterruptibly(-1L, (TimeUnit)null);
其他线程加锁处理while Semaphore 信号量
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId); //订阅
this.commandExecutor.syncSubscription(future);
try {
while(true) {
ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().acquire();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
}
消息订阅 唤醒信号量
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(this.getEntryName(), this.getChannelName(), this.commandExecutor.getConnectionManager().getSubscribeService());
}
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
....
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(unlockMessage)) {
value.getLatch().release();
....
....
}
Lua 加锁 & 锁续命
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
if (future.isSuccess()) {
Boolean ttlRemaining = (Boolean)future.getNow();
if (ttlRemaining) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}
}
Lua 原子操作
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(),
LongCodec.INSTANCE,
command,
("if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);"),
Collections.singletonList(this.getName()),
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}
);
}
锁续命
private void scheduleExpirationRenewal(final long threadId) {
if (!expirationRenewalMap.containsKey(this.getEntryName())) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
if (!future.isSuccess()) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
} else {
if ((Boolean)future.getNow()) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {
task.cancel();
}
}
}
Redisson 解锁 发货消息(唤醒信号量线程)
redissonLock.unlock();
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(),
LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
("if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;"),
Arrays.asList(this.getName(), this.getChannelName()),
new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
}
Redisson 读写锁 (mode)
writelock
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(),
LongCodec.INSTANCE,
command,
("local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'write'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'write') then " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local currentExpire = redis.call('pttl', KEYS[1]); " +
"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
"return nil; " +
"end; " +
"end;" +
"return redis.call('pttl', KEYS[1]);"),
Arrays.asList(this.getName()),
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
readlock
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(),
LongCodec.INSTANCE,
command,
("local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[2] .. ':1', 1); " +
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);"),
Arrays.asList(this.getName(), this.getReadWriteTimeoutNamePrefix(threadId)),
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId), this.getWriteLockName(threadId)});
}
Lua 脚本格式 在Redis2.6推出
EVAL script numkeys key.. arg...
127.0.0.1:6379> EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 arg1 arg2
1) "key1"
2) "key2"
3) "arg1"
4) "arg2"
Redisson 定时任务为什么不用Timer
- 时间计算不准确问题
由于Timer是以绝对时间计算定时任务的,会受到系统时间的影响,假设在任务运行期间。更改了系统时间,那么会 导致时间计算不准确问题。导致任务没用按找预定的时间运行。 - 仅仅能单任务运行
简单讲就是。仅仅能一次运行一个任务,假设前一个任务没有运行完毕。后一个任务是无法并行运行的,仅仅能等待前一个任务运行完毕
才干运行。也有可能会出现这种结果。前一个任务运行的时间太长,后几个任务时间短,可能在一个时间段内运行了多个任务,任务又
没有依照我们要运行的时间运行。 - 非检查异常导致异步任务终止
Timer当在运行的过程中遇到非检查异常的时候,会导致本次任务失败,而且接下来的任务也无法被运行。Timer将会终止运行,这不是
我们要的结果。我们须要一套恢复机制。
Redis 主从架构锁失效问题(比较难解决)
线程1 主节点加锁, 从节点未来得及同步,主节点挂了, 线程2 来了(线程1锁失效)
从CAP角度分析Redis与Zookeeper分布式锁区别
CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)
Redis AP 可用性、分区容错性 主节点加锁
Zookeeper CP 一致性、分区容错性 主节点加锁并同步到从节点一半以上(ZAB机制)
Redlock 分布式锁原理与存在的问题
奇数个Redis独立节点 超过半数Redis节点加锁成功才算加锁成功
问题
- 各个节点需要二分之一性能问题
- 高可用主从 (主从同步异常引起)
- 持久化(1s一次持久化)在1s内锁挂了,之后重启了
- 多节点性能问题
更多推荐
已为社区贡献4条内容
所有评论(0)