简单redis锁实现(思考问题)

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    public String testRedisLock() {
		String lockKey = "lock:product:1";
        String clientId = UUID.randomUUID().toString();
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
        if (!result)
            return "error";

        try {
            //TODO
        } finally {
            if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
                stringRedisTemplate.delete(lockKey);
            }
        }
        return "ok";
    }

大型网站高并发

@Service
public class ProductService {

    @Autowired
    private ProductDao productDao;

    @Autowired
    private RedisUtil redisUtil;

    @Autowired
    private Redisson redisson;

    public static final Integer PRODUCT_CACHE_TIMEOUT = 60 * 60 * 24;
    public static final String EMPTY_CACHE = "{}";
    public static final String LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX = "lock:product:hot_cache_create:";
    public static final String LOCK_PRODUCT_UPDATE_PREFIX = "lock:product:update:";
    public static Map<String, Product> productMap = new HashMap<>();

    @Transactional
    public Product create(Product product) {
        Product productResult = productDao.create(product);
        redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult));
        return productResult;
    }

    @Transactional
    public Product update(Product product) {
        Product productResult = null;
        //RLock productUpdateLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
        RReadWriteLock productUpdateLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
        RLock writeLock = productUpdateLock.writeLock();
        //加分布式写锁解决缓存双写不一致问题
        writeLock.lock();
        try {
            productResult = productDao.update(product);
            redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
                    genProductCacheTimeout(), TimeUnit.SECONDS);
        } finally {
            writeLock.unlock();
        }
        return productResult;
    }

    public Product get(Long productId) {
        Product product = null;
        String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;

        //从缓存里查数据
        product = getProductFromCache(productCacheKey);
        if (product != null) {
            return product;
        }

        //加分布式锁解决热点缓存并发重建问题
        RLock hotCreateCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX + productId);
        hotCreateCacheLock.lock();
        // 这个优化谨慎使用,防止超时导致的大规模并发重建问题
        try {
            hotCreateCacheLock.tryLock(1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            product = getProductFromCache(productCacheKey);
            if (product != null) {
                return product;
            }


            //RLock productUpdateLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
            RReadWriteLock productUpdateLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
            RLock rLock = productUpdateLock.readLock();
            //加分布式读锁解决缓存双写不一致问题
            rLock.lock();
            try {
                product = productDao.get(productId);
                if (product != null) {
                    redisUtil.set(productCacheKey, JSON.toJSONString(product),
                            genProductCacheTimeout(), TimeUnit.SECONDS);
                } else {
                    //设置空缓存解决缓存穿透问题
                    redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
                }
            } finally {
                rLock.unlock();
            }
        } finally {
            hotCreateCacheLock.unlock();
        }

        return product;
    }

    private Integer genProductCacheTimeout() {
        //加随机超时机制解决缓存批量失效(击穿)问题
        return PRODUCT_CACHE_TIMEOUT + new Random().nextInt(5) * 60 * 60;
    }

    private Integer genEmptyCacheTimeout() {
        return 60 + new Random().nextInt(30);
    }

    private Product getProductFromCache(String productCacheKey) {
        Product product = null;
        //多级缓存查询,jvm级别缓存可以交给单独的热点缓存系统统一维护,有变动推送到各个web应用系统自行更新
        product = productMap.get(productCacheKey);
        if (product != null) {
            return product;
        }
        String productStr = redisUtil.get(productCacheKey);
        if (!StringUtils.isEmpty(productStr)) {
            if (EMPTY_CACHE.equals(productStr)) {
                redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
                return new Product();
            }
            product = JSON.parseObject(productStr, Product.class);
            //缓存读延期
            redisUtil.expire(productCacheKey, genProductCacheTimeout(), TimeUnit.SECONDS);
        }
        return product;
    }

}

常规Redisson实现

Redisson github: https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

    @Autowired
    private Redisson redisson;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    public String testRedissonLock() {
        String lockKey = "lock:product:1";
        //获取锁对象
        RLock redissonLock = redisson.getLock(lockKey);
        //加分布式锁
        redissonLock.lock();
        try {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + "");
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        } finally {
            //解锁
            redissonLock.unlock();
        }
        return "ok";
    }

Redisson 加锁原理

在这里插入图片描述

Redisson 源码分析

Redisson 加锁

redissonLock.lock();

redissonLock.lock();
redissonLock.lockInterruptibly();
redissonLock.lockInterruptibly(-1L, (TimeUnit)null);
其他线程加锁处理while Semaphore 信号量
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = this.tryAcquire(leaseTime, unit, threadId);
        if (ttl != null) {
            RFuture<RedissonLockEntry> future = this.subscribe(threadId); //订阅
            this.commandExecutor.syncSubscription(future);

            try {
                while(true) {
                    ttl = this.tryAcquire(leaseTime, unit, threadId);
                    if (ttl == null) {
                        return;
                    }

                    if (ttl >= 0L) {
                        this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        this.getEntry(threadId).getLatch().acquire();
                    }
                }
            } finally {
                this.unsubscribe(future, threadId);
            }
        }
    }
消息订阅 唤醒信号量

RFuture<RedissonLockEntry> future = this.subscribe(threadId);

 protected RFuture<RedissonLockEntry> subscribe(long threadId) {
     return PUBSUB.subscribe(this.getEntryName(), this.getChannelName(), this.commandExecutor.getConnectionManager().getSubscribeService());
 }
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
	....
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(unlockMessage)) {
            value.getLatch().release();
            ....
	....
}
Lua 加锁 & 锁续命
    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1L) {
            return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        } else {
            RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
                public void operationComplete(Future<Boolean> future) throws Exception {
                    if (future.isSuccess()) {
                        Boolean ttlRemaining = (Boolean)future.getNow();
                        if (ttlRemaining) {
                            RedissonLock.this.scheduleExpirationRenewal(threadId);
                        }

                    }
                }
            });
            return ttlRemainingFuture;
        }
    }
Lua 原子操作

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
   this.internalLockLeaseTime = unit.toMillis(leaseTime);
   return this.commandExecutor.evalWriteAsync(this.getName(), 
   	LongCodec.INSTANCE, 
   	command, 
   	("if (redis.call('exists', KEYS[1]) == 0) then " +
         "redis.call('hset', KEYS[1], ARGV[2], 1); " +
         "redis.call('pexpire', KEYS[1], ARGV[1]); " +
         "return nil; " +
     "end; " +
     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
         "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
         "redis.call('pexpire', KEYS[1], ARGV[1]); " +
         "return nil; " +
     "end; " +
     "return redis.call('pttl', KEYS[1]);"), 
   	Collections.singletonList(this.getName()), 
   	new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}
  );
}
锁续命
    private void scheduleExpirationRenewal(final long threadId) {
        if (!expirationRenewalMap.containsKey(this.getEntryName())) {
            Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                    RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
                    future.addListener(new FutureListener<Boolean>() {
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
                            if (!future.isSuccess()) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
                            } else {
                                if ((Boolean)future.getNow()) {
                                    RedissonLock.this.scheduleExpirationRenewal(threadId);
                                }

                            }
                        }
                    });
                }
            }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
            if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {
                task.cancel();
            }

        }
    }

Redisson 解锁 发货消息(唤醒信号量线程)

redissonLock.unlock();

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return this.commandExecutor.evalWriteAsync(this.getName(), 	
        					LongCodec.INSTANCE, 
        					RedisCommands.EVAL_BOOLEAN, 
        					("if (redis.call('exists', KEYS[1]) == 0) then " +
		                       "redis.call('publish', KEYS[2], ARGV[1]); " +
		                       "return 1; " +
		                   "end;" +
		                   "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
		                       "return nil;" +
		                   "end; " +
		                   "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
		                   "if (counter > 0) then " +
		                       "redis.call('pexpire', KEYS[1], ARGV[2]); " +
		                       "return 0; " +
		                   "else " +
		                       "redis.call('del', KEYS[1]); " +
		                       "redis.call('publish', KEYS[2], ARGV[1]); " +
		                       "return 1; " +
		                   "end; " +
		                   "return nil;"), 
        					Arrays.asList(this.getName(), this.getChannelName()), 
        					new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
    }

Redisson 读写锁 (mode)

writelock
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        this.internalLockLeaseTime = unit.toMillis(leaseTime);
        return this.commandExecutor.evalWriteAsync(this.getName(), 
        		LongCodec.INSTANCE, 
        		command, 
        		("local mode = redis.call('hget', KEYS[1], 'mode'); " +
                  "if (mode == false) then " +
                      "redis.call('hset', KEYS[1], 'mode', 'write'); " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (mode == 'write') then " +
                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "local currentExpire = redis.call('pttl', KEYS[1]); " +
                          "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                  "end;" +
                  "return redis.call('pttl', KEYS[1]);"), 
        		Arrays.asList(this.getName()), 
        		new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
    }
readlock
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        this.internalLockLeaseTime = unit.toMillis(leaseTime);
        return this.commandExecutor.evalWriteAsync(this.getName(), 
        LongCodec.INSTANCE, 
        command, 
        ("local mode = redis.call('hget', KEYS[1], 'mode'); " +
         "if (mode == false) then " +
             "redis.call('hset', KEYS[1], 'mode', 'read'); " +
             "redis.call('hset', KEYS[1], ARGV[2], 1); " +
             "redis.call('set', KEYS[2] .. ':1', 1); " +
             "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
             "redis.call('pexpire', KEYS[1], ARGV[1]); " +
             "return nil; " +
         "end; " +
         "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
             "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
             "local key = KEYS[2] .. ':' .. ind;" +
             "redis.call('set', key, 1); " +
             "redis.call('pexpire', key, ARGV[1]); " +
             "redis.call('pexpire', KEYS[1], ARGV[1]); " +
             "return nil; " +
         "end;" +
         "return redis.call('pttl', KEYS[1]);"), 
        Arrays.asList(this.getName(), this.getReadWriteTimeoutNamePrefix(threadId)), 
        new Object[]{this.internalLockLeaseTime, this.getLockName(threadId), this.getWriteLockName(threadId)});
    }

Lua 脚本格式 在Redis2.6推出

EVAL script numkeys key.. arg...

127.0.0.1:6379> EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 arg1 arg2
1) "key1"
2) "key2"
3) "arg1"
4) "arg2"

Redisson 定时任务为什么不用Timer

  1. 时间计算不准确问题
    由于Timer是以绝对时间计算定时任务的,会受到系统时间的影响,假设在任务运行期间。更改了系统时间,那么会 导致时间计算不准确问题。导致任务没用按找预定的时间运行。
  2. 仅仅能单任务运行
    简单讲就是。仅仅能一次运行一个任务,假设前一个任务没有运行完毕。后一个任务是无法并行运行的,仅仅能等待前一个任务运行完毕
    才干运行。也有可能会出现这种结果。前一个任务运行的时间太长,后几个任务时间短,可能在一个时间段内运行了多个任务,任务又
    没有依照我们要运行的时间运行。
  3. 非检查异常导致异步任务终止
    Timer当在运行的过程中遇到非检查异常的时候,会导致本次任务失败,而且接下来的任务也无法被运行。Timer将会终止运行,这不是
    我们要的结果。我们须要一套恢复机制。

Redis 主从架构锁失效问题(比较难解决)

线程1 主节点加锁, 从节点未来得及同步,主节点挂了, 线程2 来了(线程1锁失效)

从CAP角度分析Redis与Zookeeper分布式锁区别

CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)

Redis AP 可用性、分区容错性 主节点加锁

Zookeeper CP 一致性、分区容错性 主节点加锁并同步到从节点一半以上(ZAB机制)

Redlock 分布式锁原理与存在的问题

奇数个Redis独立节点 超过半数Redis节点加锁成功才算加锁成功

在这里插入图片描述

问题

  1. 各个节点需要二分之一性能问题
  2. 高可用主从 (主从同步异常引起)
  3. 持久化(1s一次持久化)在1s内锁挂了,之后重启了
  4. 多节点性能问题
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐