业务背景:
后台定时任务刷新Redis的数据到数据库中,有多台机器开启了此定时同步的任务,但是需要其中一台工作,其他的作为备用,提高可用性。使用Redis分布式锁进行限制,拿到锁的机器去执行具体业务,拿不到锁的继续轮询。

分布式锁原理
分布式锁:当多个进程不在同一个系统中,多个进程共同竞争同一个资源,用分布式锁控制多个进程对资源的互斥访问。采用Redis服务器存储锁信息(即SET一个Key表示已加锁),可以实现多进程的并发读锁的状态,如果没有锁,则只允许一个进程加锁。

Redis分布式锁实现的关键点:

问题问题描述解决方案
互斥性保证只有一个client可以获取资源加锁
原子性如果锁不存在则执行加锁操作,必须是原子性操作原子性命令或者执行Lua脚本
避免死锁当拿到锁的Client因宕机或网络原因断线后,如果锁不能释放就会产生死锁为锁加超时时间
锁超时时间设定锁超时时间到了,业务没执行完问题心跳线程,不断更新锁超时时间
锁的所属权解铃还需系铃人,加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了Client 与锁进行一一对应,使用UUID作为锁的值
自动重连网络故障导致Client连接Redis 失败的情况,网络恢复后可以自动重连轮询

实现方案
方案一:采用Redis的原子性命令“SET key value EX expire-time NX”可以实现分布式锁的基本功能,其中的NX(Not Exist)即判断是否已存在锁,如果不存在key则可进行操作,SET key value 等同于加锁,EX expire-time即设置超时时间,可以避免死锁,但是超时时间的设置需要根据具体业务设置一个合理的经验值,避免锁超时时间到了,业务没执行完的问题。

方案二:采用Lua脚本实现,Redis会将整个脚本作为一个整体执行,因此Lua脚本可以实现原子性操作。相较于方案一,此处增加了心跳线程,不断更新锁超时时间,解决锁超时时间设置不合理的问题;生成UUID(或者是随机数字符串)作为锁的值,用于保证锁与Client的一一对应;采用轮询来实现断线自动重连。

实现方案1:SET EX NX
加锁流程图:
在这里插入图片描述
定义锁的变量名为lock,那么对应Redis命令:
判断是否加锁的命令:GET lock
加锁的命令:SET lock
设置超时时间的命令:EXPIRE expire-time
三条命令分开执行是不具有原子性的,比如可能会出现一个进程执行GET lock得到的结果为nil即尚未加锁,在其执行SET lock前另一个进程也执行了SET lock,导致两个进程都认为是可以加锁的,失去互斥性。
因此判断尚未加锁、加锁、设置超时时间必须原子操作,使用Redis的命令“SET key value EX expire-time NX”可以实现该原子操作。

package main

import (
 "fmt"
 "time"
 "github.com/gomodule/redigo/redis"
)

func main() {
    rds, err := redis.Dial("tcp", "127.0.0.1:6379")
    if err != nil {
        fmt.Println("Connect to redis error", err)
        return
    }   
    defer rds.Close()
    
    for true {
    	//检查是否有所与加锁必须是原子性操作
        result, err := rds.Do("SET", "lock", 1, "EX", 5, "NX")	
        if err != nil {
            fmt.Println("redis set error.", err)
            return
        }
        result, err = redis.String(result, err)
        // 加锁失败,继续轮询
		if result != "OK" {
            fmt.Println("SET lock failed.")
            time.Sleep(5 * time.Second)
            continue
        }
		// 加锁成功
        fmt.Println("work begining...")
        // 此处处理业务
        fmt.Println("work end");
        // 业务处理结束后释放锁
        result, err := rds.Do("del", "lock")
        break;
    }
}

此方法弊端是对超时时间的设置有要求,需要根据具体业务设置一个合理的经验值,避免锁超时时间到了,业务没执行完的问题。

实现方案2:Lua脚本
在这里插入图片描述

package main

import (
 "fmt"
 "time"
 "github.com/satori/go.uuid"
 "github.com/gomodule/redigo/redis"
)

var uuidClient uuid.UUID

const (
    SCRIPT_LOCK = ` 
    local res=redis.call('GET', KEYS[1])
    if res then
        return 0
    else
        redis.call('SET',KEYS[1],ARGV[1]);
        redis.call('EXPIRE',KEYS[1],ARGV[2])
        return 1
    end 
    `   
    SCRIPT_EXPIRE = ` 
    local res=redis.call('GET', KEYS[1])
    if not res then
        return -1
    end 
    if res==ARGV[1] then
        redis.call('EXPIRE', KEYS[1], ARGV[2])
        return 1
    else
        return 0
    end 
    `   
    SCRIPT_DEL = ` 
    local res=redis.call('GET', KEYS[1])
    if not res then 
        return -1
    end 
    if res==ARGV[1] then
        redis.call('DEL', KEYS[1])
    else
        return 0
    end 
    `   
)

func ResetExpire() {
    fmt.Println("Reset expire begin...")
    rds, err := redis.Dial("tcp", "127.0.0.1:6379")
    if err != nil {
        fmt.Println("Connect to redis server error.", err)
        return
    }

    for true {
        luaExpire := redis.NewScript(1, SCRIPT_EXPIRE)
        result, err := redis.Int(luaExpire.Do(rds, "lock", uuidClient.String(), 5))
        if err != nil {
            fmt.Println("luaExpire exec error", err)
            break
        }
        if result != 1 {
            fmt.Println("Reset expire failed.")
            break
        } else {
            fmt.Println("Reset expire succeed.")
        }
        time.Sleep(3 * time.Second)
    }
    fmt.Println("Reset expire end.")
}

func main() {
    for true {
        rds, err := redis.Dial("tcp", "127.0.0.1:6379")
        if err != nil {
            fmt.Println("Connect to redis server error.", err)
            time.Sleep(5 * time.Second)
            continue
        }
        defer rds.Close()
	
		// 生成UUID标识锁与Client的对应关系
        uuidClient,err = uuid.NewV4()	//也可以生成随机数字符串来代替
        if err != nil {
            fmt.Println("New uuid error.", err)
            return
        }

        luaLock := redis.NewScript(1, SCRIPT_LOCK)
        luaDel:= redis.NewScript(1, SCRIPT_DEL)
        for true {
            result, err := redis.Int(luaLock.Do(rds, "lock", uuidClient.String(), 5))
            if err != nil {
                fmt.Println("luaLock exec error.", err)
                break
            }

            if result == 0 {
                fmt.Println("Set lock failed.")
                time.Sleep(5 * time.Second)
                continue
            }
            fmt.Println("Set lock succeed.")

            go ResetExpire()
            // 加锁成功
        	fmt.Println("work begining...")
	        // 此处处理业务
	        fmt.Println("work end");
	        
	        // 业务处理结束后释放锁
	        result, err = redis.Int(luaDel.Do(rds, "lock", uuidClient.String()))
	        return
        }
    }
}

Redis采用Lua脚本可以执行更多的个性化的原子操作,在我项目中就采用这种容错性更高的方式。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐