一、Redis分布式锁的实现原理

通过setnx设置分布式锁,拿到这个锁的进程可以执行业务代码,没有拿到只能进行等待,进程执行完业务代码后需要通过del key 释放锁,让其他进程重新获取,这样就实现了在多进程并发的情况下始终只有一个进程在执行业务代码【在生产环境中通常需要对多进程同时写数据库的代码块加锁】

二、获得锁

  1. 通过调用redis底层命令 setnx来实现加锁(key,value),python 语言redis包封装了SETNX命令
  2. setnx(key,value) 方法在key不存在的情况下,将key设置为value 返回True,若key存在则直接返回False
  3. acquire_timeout 是客户端获取锁的结束时间,超过了acquire_timeout还没有获取到就会放弃获取
  4. time_out 是key的过期时间,主要是为了防止进程崩溃导致其他线程无法获取锁
import time
import uuid
from multiprocessing import Process
import redis

redis_client = redis.Redis(host='127.0.0.1', port=6379)

# 加锁的过程
def acquire_lock(lock_name, args, acquite_timeout=30, time_out=120):
    identifier = str(uuid.uuid4())
    # 客户端获取锁的结束时间
    end = time.time() + acquite_timeout
    lock_names = "lock_name:" + lock_name
    print(f"进程 {str(args)} end_time:{end}")
    while time.time() < end:
        # setnx(key,value) 只有key不存在情况下,将key的值设置为value 返回True,若key存在则不做任何动作,返回False
        if redis_client.setnx(lock_names, identifier):
            # 设置键的过期时间,过期自动剔除,释放锁
            print('获得锁:进程' + str(args))
            print(f'分布式锁value:{identifier}')
            redis_client.expire(lock_name, time_out)
            return identifier
        # 当锁未被设置过期时间时,重新设置其过期时间
        elif redis_client.ttl(lock_name) == -1:
            redis_client.expire(lock_name, time_out)
        time.sleep(0.001)
    return False

三、释放锁

  1. 释放锁实际上就是将redis中的数据删除,这里可以使用redis提供的事务流水线去执行
  2. pipe.get(lock_names) 类型为bytes类型,需要使用decode 解码成string类型才可以和 indentifire比较
  3. 客户端在使用 MULTI 开启了一个事务之后,因为断线而没有成功执行 EXEC ,那么事务中的所有命令都不会被执行,Redis事务可参考官方文档:MULTI — Redis 命令参考
# 锁的释放
def release_lock(lock_name, identifire):
    lock_names = "lock_name:" + lock_name
    pipe = redis_client.pipeline(True)
    while True:
        try:
            # 通过watch命令监视某个键,当该键未被其他客户端修改值时,事务成功执行。当事务运行过程中,发现该值被其他客户端更新了值,任务失败
            pipe.watch(lock_names)
            print(pipe.get((lock_names)))
            if pipe.get(lock_names).decode() == identifire:  # 检查客户端是否仍然持有该锁
                # multi命令用于开启一个事务,它总是返回ok
                # multi执行之后, 客户端可以继续向服务器发送任意多条命令, 这些命令不会立即被执行, 而是被放到一个队列中, 当 EXEC 命令被调用时, 所有队列中的命令才会被执行
                pipe.multi()
                # 删除键,释放锁
                pipe.delete(lock_names)
                # execute命令负责触发并执行事务中的所有命令
                pipe.execute()
                return True
            pipe.unwatch()
            break
        except redis.exceptions.WatchError:
            # # 释放锁期间,有其他客户端改变了键值对,锁释放失败,进行循环
            pass
    return False

四、验证Redis分布式锁

  1. 通过调用acquire_lock方法获取锁,如果锁存在则执行业务代码,执行完之后释放锁
  2. 使用多进程并发执行exec_test 函数,传入相同 key,然后同时启动9个进程,每一次只能有一个进程拿到锁,其他锁处于等待状态,通过acquire_timeout 控制等待时长 【本文通过设置30s的等待时长刚好够9个进程都执行完3s的业务代码】
# 模拟加锁解锁的过程
def exec_test(lockname, args):
    identifire = acquire_lock(lockname, args)
    print(f'identifire :{identifire}')
    # 如果获取到锁,则进行业务逻辑处理
    if identifire:
        # sleep 3s 模拟业务逻辑处理,处理完之后进行锁释放,让其他进程获取锁
        time.sleep(3)
        res = release_lock(lockname, identifire)
        print(f'释放状态: {res}')
    else:
        print('获取redis分布式锁失败,其他进程正在使用')


if __name__ == '__main__':
    for i in range(0, 9):
        Process(target=exec_test, args=('test', i)).start()

输出结果

 

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐