直入主题

以IDatabase扩展方法的形式实现分布式锁方法,将代码拷到任意静态类里即可使用。

/// <summary>
/// 使用Redis分布式锁执行某些操作
/// </summary>
/// <param name="lockName">锁名</param>
/// <param name="act">操作</param>
/// <param name="expiry">锁过期时间,若超出时间自动解锁 单位:sec</param>
/// <param name="retry">获取锁的重复次数</param>
/// <param name="tryDelay">获取锁的重试间隔  单位:ms</param>
public static void LockAction(this IDatabase db, string lockName, Action act, int expiry = 10, int retry = 3, int tryDelay = 200)
{
    if (act.Method.IsDefined(typeof(AsyncStateMachineAttribute), false))
    {
        throw new ArgumentException("使用异步Action请调用LockActionAsync");
    }

    TimeSpan exp = TimeSpan.FromSeconds(expiry);
    string token = Guid.NewGuid().ToString("N");
    try
    {
        bool ok = false;
        // 延迟重试
        for (int test = 0; test < retry; test++)
        {
            if (db.LockTake(lockName, token, exp))
            {
                ok = true;
                break;
            }
            else
            {
                Task.Delay(tryDelay).Wait();
            }
        }
        if (!ok)
        {
            throw new InvalidOperationException($"获取锁[{lockName}]失败");
        }
        act();
    }
    finally
    {
        db.LockRelease(lockName, token);
    }
}

/// <summary>
/// 使用Redis分布式锁执行某些异步操作
/// </summary>
/// <param name="lockName">锁名</param>
/// <param name="act">操作</param>
/// <param name="expiry">锁过期时间,若超出时间自动解锁 单位:sec</param>
/// <param name="retry">获取锁的重复次数</param>
/// <param name="tryDelay">获取锁的重试间隔  单位:ms</param>
public static async Task LockActionAsync(this IDatabase db, string lockName, Func<Task> act, int expiry = 10, int retry = 3, int tryDelay = 200)
{
    TimeSpan exp = TimeSpan.FromSeconds(expiry);
    string token = Guid.NewGuid().ToString("N");
    try
    {
        bool ok = false;
        // 延迟重试
        for (int test = 0; test < retry; test++)
        {
            if (await db.LockTakeAsync(lockName, token, exp))
            {
                ok = true;
                break;
            }
            else
            {
                await Task.Delay(tryDelay);
            }
        }
        if (!ok)
        {
            throw new InvalidOperationException($"获取锁[{lockName}]失败");
        }
        await act();
    }
    finally
    {
        await db.LockReleaseAsync(lockName, token);
    }
}

使用方法

private async Task RedisLockTestAsync()
{
    string connStr = "Redis连接字符串";
    var conn = await ConnectionMultiplexer.ConnectAsync(connStr);
    IDatabase db = conn.GetDatabase();
    // 带异步操作的用 LockActionAsync
    await db.LockActionAsync("MyLockName", async () =>
    {
        // 执行异步方法...
        await Task.Delay(1000);
        Console.WriteLine("Done");
    });
}

private void RedisLockTest()
{
    string connStr = "Redis连接字符串";
    var conn = ConnectionMultiplexer.Connect(connStr);
    IDatabase db = conn.GetDatabase();
    // 同步操作的用 LockAction
    db.LockAction("MyLockName", () =>
    {
        // 同步方法...
        Task.Delay(1000).Wait();
        Console.WriteLine("Done");
    });
}

一些思考

分布式锁使用的场景

最近在Redis的使用中遇到了需要在List中查找某个值,若不存在则向List中新增,即需要维护一个没有重复项的List的需求。虽然Redis本身的所有操作都是互斥的,但是Redis本身并没有提供无重复项的List类型,也没有相关指令能对List进行去重,故需要拉整个List下来进行对比再写入,在这里就会存在一个并发的问题,若在分布式部署且高并发的情景里就有可能使List出现重复项。所以需要对这个操作加上分布式锁,类似的场景还有很多这里不一一列举。

Redis里分布式锁的实现

所谓分布式锁实际上就是一个Redis里的一个键值对,锁名为Key,占用者(Token)为value。当A需要占用时则将value修改为A的Token并返回True,当B再想占用时,发现这个Key(锁名)的value已经有值且不是自己的Token,此时就返回False。这部分的逻辑StackExchange.Redis都已经帮我们做了。我们只需要调用它的LockTake,LockRelease方法即可。

因为获取锁的时候锁可能已经被占用,所以需要有一个重试机制,有重试就应该有重试间隔,这就是为什么LockAction方法里会有retry和tryDelay参数。

为什么异步和同步的方法要分开

刚开始是想做成像 Task.Run(()=>{}) 那样既可以传异步Action也可以传同步Action,但是后面发现.net更新之后异步Action没办法像以前那样用BeginInvoke方法等待执行完成,本人技术有限,实在找不到办法兼容同步和异步Action,这个是客观的原因。

另外,主观上我觉得兼容同步和异步就是一件不值得提倡的事情,若该扩展方法设计为同步方法,那在同步方法里执行异步操作再强行wait它是一件很瓜皮的事,而且还可能会出现莫名其妙的bug;若该扩展方法设计为异步方法但是如果传进来是同步Action的话,那么一个异步操作都没有,虽然没问题但很别扭,所以这里最终还是把同步操作和异步操作分开写成了两个方法。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐