Go微服务: redis分布式锁保证数据原子操作的一致性
随着云计算和大数据技术的飞速发展,分布式系统已经成为现代IT架构的重要组成部分在分布式系统中,数据的一致性是一个至关重要的挑战,特别是在并发访问和修改共享资源的场景下分布式锁是一种跨进程、跨机器节点的互斥锁,用于控制多个节点对共享资源的访问。其核心目标是确保在分布式系统中,同一时刻只有一个节点能够访问特定的共享资源,从而实现数据的一致性。分布式锁的实现方式多种多样,包括基于数据库、缓存(如Redi
·
概述
- 随着云计算和大数据技术的飞速发展,分布式系统已经成为现代IT架构的重要组成部分
- 在分布式系统中,数据的一致性是一个至关重要的挑战,特别是在并发访问和修改共享资源的场景下
- 分布式锁是一种跨进程、跨机器节点的互斥锁,用于控制多个节点对共享资源的访问
- 其核心目标是确保在分布式系统中,同一时刻只有一个节点能够访问特定的共享资源,从而实现数据的一致性
- 分布式锁的实现方式多种多样,包括基于数据库、缓存(如Redis)、分布式协调服务(如Zookeeper)等
场景示例
- 在多携程或者多线程的情况下,这个数据竞争是避免不了的, 参考下图
- 一开始我们有一个Redis图标代表着之前用到的分布式的锁
- 两边有两个grorouting,在实际工作当中有很多grorouting
- 让左边的 grorouting 进行 getKey 和 setKey,就是说对这个 redis 进行读写
- 让右边的 grorouting 也进行读写,这里面就会产生一个问题
- 多个gorouting在同时写的时候会不会产生数据一致性的问题
- 这个场景就是我们经常说的中断,举个例子
- 写代码,听歌上网写文档,感觉这个计算机同时在做几件事情一样
- 但是要在计算机看来,它在不停的切换,只是说计算机执行的足够快
- 人类是无法感觉到它在极短的时间内进行时间的这个切换
- 这里有两个命令,一个是上面的 getKey和下面这一行的 setKey
- 在高并发的时候就会导致切换出现数据竞争, 也就是数据不一致的这种情况发生
- REDIS 里提供了一个命令就是这个
Setnx
,全称就是Set if Not Exists
- 中文意思就是说设置它,如果它不存在,如果设置成功,就是1,设置失败就返回0
- 那就是说不存在这个key的时候,我就可以设置成功,如果存在了,那就设置是失败的
- 那这就保证了第一个gorouting是可以设置成功的,在这个后面的gorouting,它就是设置不成功的
源码示例
- 分布式锁 redSync 是如何结合这个setNX解决这个数据竞争的问题的
- 我们来看下之前的业务代码
pool := goredis.NewPool(client) rs := redsync.New(pool) mutexname := "my-global-mutex" mutex := rs.NewMutex(mutexname, redsync.WithExpiry(30*time.Second)) fmt.Println("Lock()....") if err := mutex.Lock(); err != nil { panic(err) }
- 上面这里
mutex.Lock()
进入源码// Lock locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again. func (m *Mutex) Lock() error { return m.LockContext(context.Background()) }
- 这里看到就一个
LockContext
方法,再次进入,其实这里有2个重载函数// LockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again. func (m *Mutex) LockContext(ctx context.Context) error { return m.lockContext(ctx, m.tries) } // lockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again. func (m *Mutex) lockContext(ctx context.Context, tries int) error { // 如果 ctx 不存在则新建 if ctx == nil { ctx = context.Background() } // 获取 value value, err := m.genValueFunc() if err != nil { return err } var timer *time.Timer // 循环 tries 次 for i := 0; i < tries; i++ { if i != 0 { if timer == nil { timer = time.NewTimer(m.delayFunc(i)) } else { timer.Reset(m.delayFunc(i)) } select { case <-ctx.Done(): timer.Stop() // Exit early if the context is done. return ErrFailed case <-timer.C: // Fall-through when the delay timer completes. } } start := time.Now() n, err := func() (int, error) { ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor))) defer cancel() return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { // 在分配锁的时候会加 nx, 进入这里 return m.acquire(ctx, pool, value) }) }() now := time.Now() until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor))) if n >= m.quorum && now.Before(until) { m.value = value m.until = until return nil } _, _ = func() (int, error) { ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor))) defer cancel() return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { return m.release(ctx, pool, value) }) }() if i == tries-1 && err != nil { return err } } return ErrFailed }
- 再次进入
m.acquire
func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) { conn, err := pool.Get(ctx) if err != nil { return false, err } defer conn.Close() // 这里 SetNX 就是 redSync 封装起来的 nx 特性 reply, err := conn.SetNX(m.name, value, m.expiry) if err != nil { return false, err } return reply, nil }
- 进入
conn.SetNX
,可见 key, value 和 过期时间 expiry (默认 8s)// Conn is a single Redis connection. type Conn interface { Get(name string) (string, error) Set(name string, value string) (bool, error) SetNX(name string, value string, expiry time.Duration) (bool, error) // 注意这里 Eval(script *Script, keysAndArgs ...interface{}) (interface{}, error) PTTL(name string) (time.Duration, error) Close() error }
- 不传过期时间,它会有死锁的风险
- 因为一旦处于某种异常状况,那你这个锁就永远不会释放了,就会出现死锁
- 过期时间比如说五秒之后自动释放,那我无论是宕机还是崩溃等其他问题,它都会五秒之后,释放出资源
- 释放之后其他 gorouting 就可以继续抢锁和业务逻辑操作
关于redis锁过期问题
- 在上述业务代码中
mutex := rs.NewMutex(mutexname, redsync.WithExpiry(30*time.Second))
- 这里有对过期时间做初始化的代码,其实在new的时候,有一个默认值,进入
NewMutex
// NewMutex returns a new distributed mutex with given name. func (r *Redsync) NewMutex(name string, options ...Option) *Mutex { m := &Mutex{ name: name, expiry: 8 * time.Second, tries: 32, delayFunc: func(tries int) time.Duration { return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond }, genValueFunc: genValue, driftFactor: 0.01, timeoutFactor: 0.05, quorum: len(r.pools)/2 + 1, pools: r.pools, } for _, o := range options { o.Apply(m) } if m.shuffle { randomPools(m.pools) } return m }
- 这里可见,默认是8s的过期时间,如果业务特别复杂,这个时间可以调整,因为如果8s的时间hold不住业务,其他gorouting就可能抢到锁干一些事情导致数据不一致
- redSync 中有一种机制是对锁进行续命,在 mutux.go 中有个 touch 方法
// 里面不是go脚本,是 Lua 脚本 var touchScript = redis.NewScript(1, ` // 核对你是否是key和值的拥有者,不能让别人随便设置 // 只有自己才能对自己进行续期 if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("PEXPIRE", KEYS[1], ARGV[2]) else return 0 end `) func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) { conn, err := pool.Get(ctx) if err != nil { return false, err } defer conn.Close() touchScript := touchScript if m.setNXOnExtend { touchScript = touchWithSetNXScript } status, err := conn.Eval(touchScript, m.name, value, expiry) if err != nil { return false, err } return status != int64(0), nil }
- 也就是说默认8s后,如果不够,通过 touch 方法续期,如果一直续期则会导致锁会一直被占用,结果带来的是不公平和可能得死锁问题需要谨慎
- 一般在8s内已经足够业务使用了,如果不够,可以考虑使用,但一定要慎用
分布式锁如何防止被篡改和删除
- redSync 已经对防篡改和删除做了一些防范处理
- 我们可以从 Unlock 的源码中获得一些答案
// Unlock unlocks m and returns the status of unlock. func (m *Mutex) Unlock() (bool, error) { return m.UnlockContext(context.Background()) } // UnlockContext unlocks m and returns the status of unlock. func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) { n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { return m.release(ctx, pool, m.value) }) if n < m.quorum { return false, err } return true, nil }
- 进入
release
函数var deleteScript = redis.NewScript(1, ` local val = redis.call("GET", KEYS[1]) if val == ARGV[1] then return redis.call("DEL", KEYS[1]) elseif val == false then return -1 else return 0 end `) func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) { conn, err := pool.Get(ctx) if err != nil { return false, err } defer conn.Close() status, err := conn.Eval(deleteScript, m.name, value) if err != nil { return false, err } if status == int64(-1) { return false, ErrLockAlreadyExpired } return status != int64(0), nil }
- 这里的核心是
conn.Eval(deleteScript, m.name, value)
直接把值删除,在删除的逻辑又是一段 Lua - Lua可以保证原子性,性能更高,现在再看这个 value 是怎么来的,回到
NewMutex
中,value 就在这里拿到的,在其源码内部有一个genValueFunc: genValue
的配置项func genValue() (string, error) { b := make([]byte, 16) _, err := rand.Read(b) if err != nil { return "", err } return base64.StdEncoding.EncodeToString(b), nil }
- 这里有一个处理 base64 的程序,其实在内部每次调用一次
LockContext
都会执行这个函数生成一次这个 base64并存放到m.value
上,这个base64 就达到了防篡改的目的 - 回到 Unlock 删除时的
deleteScript
中可以看到,删除时也要保证是自己的,这样就保证了数据的安全性 - 这样就不会随随便便被别人给干掉
总结
- 为了保持分布式锁的数据原子操作的一致性,我们可以得出以下结论
1 )互斥性
- 分布式锁的最基本特性是互斥性,即同一时刻只有一个节点能够持有锁,访问共享资源。这一特性确保了并发操作下的数据一致性
- 当多个节点尝试同时访问共享资源时,只有获得锁的节点能够执行操作,其他节点则被阻塞或等待,直到锁被释放
2 )锁的获取与释放
- 在分布式系统中,锁的获取和释放过程需要特别小心
- 首先,锁的获取必须是一个原子操作,以确保在多个节点同时尝试获取锁时,只有一个节点能够成功
- 其次,锁的释放也需要在操作完成后立即进行,以避免死锁和资源泄露
- 此外,为了应对可能的异常情况(如节点宕机、网络故障等),还需要实现锁的自动释放机制,如设置超时时间
3 )超时机制
- 超时机制是分布式锁中非常重要的一个特性。当节点在持有锁的过程中出现异常或处理时间过长时,可能导致其他节点无法获取锁,形成死锁
- 为了避免这种情况的发生,分布式锁通常会设置超时时间
- 当节点持有锁的时间超过设定的超时时间时,锁会自动释放,其他节点可以重新尝试获取锁
4 )锁的续期
- 在某些场景下,节点可能需要长时间持有锁以完成某些复杂的操作
- 为了避免因超时导致锁被释放而中断操作,分布式锁通常支持锁的续期功能
- 节点可以在持有锁的过程中定期发送续期请求,以延长锁的持有时间
- 这样可以在保证数据一致性的同时,提高系统的可用性和性能。
5 )分布式锁的实现方式
- 分布式锁的实现方式多种多样,包括基于数据库的分布式锁、基于缓存的分布式锁(如Redis)以及基于分布式协调服务的分布式锁(如Zookeeper)
- 这些实现方式各有优缺点,需要根据具体的业务场景和需求来选择合适的实现方式
6 )所以
- 分布式锁作为保证分布式系统中数据一致性的一种重要机制,具有广泛的应用场景。通过实现互斥性、锁的获取与释放、超时机制、锁的续期等特性,分布式锁能够有效地保证数据原子操作的一致性
- 在实际应用中,我们需要根据具体的业务场景和需求来选择合适的分布式锁实现方式,并合理配置相关参数,以确保系统的稳定性和性能
更多推荐
已为社区贡献5条内容
所有评论(0)