理解redis分布式锁

faith team

一、事情背景

在问到redis锁的时候,经常重复被提起一个问题,那么就是当在锁里执行一个操作的时候,如果redis 宕机了会导致锁一直存在的缘故,今天就来好好的实操解决掉。

二、代码理解

(1)、写一个常用的单线程锁机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int stock = 10;
var users = new HashSet<string>();
object locker = new();

// 模拟 50 人并发抢票
Parallel.For(1, 50, i =>
{
var userId = $"user_{i}";
var result = GrabTicket(userId);
Console.WriteLine($"{userId} => {result}");
});
string GrabTicket(string userId)
{
lock (locker)
{
// 是否抢过
if (users.Contains(userId))
return "已抢过";

// 是否有库存
if (stock <= 0)
return "已售罄";

stock--;
users.Add(userId);

return "抢票成功";
}
}

如图上所示 默认10个库存,使用并发模拟抢票来进行抢票,单线程使用C# 的lock锁来保持线程安全,在分布式多台服务的情况下就无法使用lock,需要通过一个第三方服务比如redis来进行加锁(比如为了负载均衡增加并发,部署多个抢票服务,就无法使用lock,需要去一个统一的服务进行加锁操作)。

(2)、进阶到redis服务来进行加锁

以下写一个我之前错误的方式,请勿模仿!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#region redis 模拟加锁操作
int stock = 10;
var users = new HashSet<string>();
var redis = await ConnectionMultiplexer.ConnectAsync("localhost:6379");
var db = redis.GetDatabase();
//// 模拟 50 人并发抢票
var tasks = Enumerable.Range(1, 50).Select(async i =>
{
var userId = $"user_{i}";
var result = await GrabTicket(userId);
Console.WriteLine($"{userId} => {result}");
});

await Task.WhenAll(tasks);

async Task<string> GrabTicket(string userId)
{
try
{
if (await db.StringSetAsync(
userId,
"hello",
TimeSpan.FromMinutes(5),
when: When.NotExists // 关键点
))
{
// 是否抢过
if (users.Contains(userId))
return "已抢过";

// 是否有库存
if (stock <= 0)
return "已售罄";

stock--;
users.Add(userId);

return "抢票成功";
}
else
{
//锁存在则返回
throw new Exception("服务繁忙");
}
}
catch (Exception ex)
{
throw;
}
}
#endregion

之前一直给我的印象是使用redis 的isexits方法判断锁是否存在 存在则是锁被占用,服务繁忙,现在理解确实是为了防止相同的人 在同一个时间节点同时请求抢票,避免了同一个人的并发问题,但是没有解决50个人的并发问题。

(3)、正常企业加锁操作

需要写一个redis IDataBase单例模式,保证使用的是同一个redis实例,再进行加锁操作

3.1 RedisLock.cs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace FaithBlog.Test
{
public class RedisLock
{
private readonly IDatabase _db;

public RedisLock(IDatabase db)
{
_db = db;
}

/// <summary>
/// 尝试获取锁(立即返回)
/// </summary>
public async Task<string?> TryAcquireAsync(string lockKey, TimeSpan expire)
{
//redis value 需要时不同的 这里其实也可以通过不同的订单id来区分开 需要在释放锁的时候对比value对不对 避免误删其他人的锁
var orderId = Guid.NewGuid().ToString();

bool locked = await _db.StringSetAsync(
lockKey,
requestId,
expire,
When.NotExists
);
//StringSetAsync返回的bool类型时区分是否上锁成功的 如果上锁成功 返回订单id false的话是已经加锁了(锁被占用了)返回null
return locked ? orderId : null;
}

/// <summary>
/// 等待获取锁(带超时)
/// </summary>
public async Task<string?> AcquireAsync(
string lockKey,
TimeSpan expire,
TimeSpan waitTime,
TimeSpan retryInterval)
{
//避免锁超时 比如我设置超市10s 就在当前时间后加上10秒
var endTime = DateTime.UtcNow + waitTime;
// 在这10s内 假设没有取到锁进行重试操作
while (DateTime.UtcNow < endTime)
{
var orderId = await TryAcquireAsync(lockKey, expire);
if (orderId != null)
return orderId;

await Task.Delay(retryInterval);
}

return null;
}

/// <summary>
/// 安全释放锁(Lua 保证原子性)
/// </summary>
public async Task<bool> ReleaseAsync(string lockKey, string orderId)
{
//首先为什么要使用lua脚本呢?
//其实就是为了保证整个操作的原子性
//用户是否抢过,库存是否足够 ,扣除库存+标记用户必须再一个原子性下调用
//下面模拟使用调用api的方式
//if (!await db.KeyExistsAsync(userKey))
//{
// var stock = await db.StringGetAsync(stockKey);
// if (stock > 0)
// {
// await db.StringDecrementAsync(stockKey);
// await db.StringSetAsync(userKey, 1);
// }
//}
//看似操作时一起的 逻辑没有问题,但是在高并发下依然会发生超卖 因为redis每条命令是独立执行的,中间会被插队
//前面加锁是保持互斥性,lua是保持原子性核心是lua
//lua 脚本
//拆解lua脚本
//redis.call('GET', KEYS[1]) 获取锁的值
//ARGV[1] 之前的orderId
//在释放锁的时候需要对比这个人获得的锁的值 是否是自己的 如果是自己的则删除 否则不操作 避免删除其他人的锁
const string lua = @"
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end";

var result = (int)await _db.ScriptEvaluateAsync(
lua,
new RedisKey[] { lockKey },
new RedisValue[] { orderId });

return result == 1;
}
}
}

如上是redis的帮助类

3.2 redis分布式锁的demo实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#region 完整版redis 锁
int stock = 10;
var users = new HashSet<string>();
// Redis 单例连接(非常重要)
var redis = await ConnectionMultiplexer.ConnectAsync("localhost:6379");
var db = redis.GetDatabase();
var redisLock = new RedisLock(db);
// 模拟 20 个并发任务争抢锁
var tasks = Enumerable.Range(1, 20).Select(async i =>
{
var userId = $"worker_{i}";
var result = await DoWork(userId);
Console.WriteLine($"{userId} => {result}");
});

await Task.WhenAll(tasks);

async Task<string> DoWork(string userId)
{
string lockKey = "lock:demo";

// ① 获取锁(最多等待3秒)
var requestId = await redisLock.AcquireAsync(
lockKey,
expire: TimeSpan.FromSeconds(5),
waitTime: TimeSpan.FromSeconds(3),
retryInterval: TimeSpan.FromMilliseconds(100)
);
//经过了重试还没有获取到锁则没有抢到
if (requestId == null)
return "服务繁忙";

try
{
// 临界区(只有一个线程能进)
Console.WriteLine($"{userId} 进入临界区");
//demo 展示
// 是否抢过
//if (users.Contains(userId))
// return "已抢过";

// 是否有库存
//if (stock <= 0)
// return "已售罄";

//stock--;
//users.Add(userId);

//return "抢票成功";

// 执行 Lua 脚本,原子扣库存
var result = (int)await db.ScriptEvaluateAsync(
lua,
new RedisKey[] { "stock:ticket", $"user:{userId}" },
new RedisValue[] { userId }
);
// 这里根据 result 做后续业务处理
if (result == 1)
{
Console.WriteLine("抢票成功");
// 发 MQ / 异步下单 / 记录日志
await rabbitMqDoWork(userId);
await InsertOrderAsync();
await InsertLogsAsync();
await DoSomethings();
}
else if (result == 0)
{
Console.WriteLine("已售罄");
}
else if (result == -1)
{
Console.WriteLine("已抢过");
}

}
finally
{
// 释放锁
await redisLock.ReleaseAsync(lockKey, requestId);
}
}
#endregion

但是一般实战中,不会使用内存的库存,正常我们理解的库存都是存储在数据库中,如果在redis锁中再去调用批量调用数据库 明显会使整个抢单变得繁琐,那么这个时候应该如何解决呢???
我们可以将数据库的库存先缓存到redis中,直接通过lua脚本来进行扣除库存,之后将订单记录库存结果通过消息队列(rabbitMq)的方式来同步到数据库当中。
1.实现缓存数据库库存到redis

1
2
//Key: stock:ticket:123
//Value: 100 (剩余库存)

2.调整lua脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- KEYS[1] = stock key
-- KEYS[2] = user key
-- ARGV[1] = userId

if redis.call('EXISTS', KEYS[2]) == 1 then
return -1 -- 用户已抢过
end

local stock = tonumber(redis.call('GET', KEYS[1]))
if stock <= 0 then
return 0 -- 已售罄
end

-- 扣库存
redis.call('DECR', KEYS[1])
-- 标记用户已抢
redis.call('SET', KEYS[2], 1)

return 1 -- 抢票成功

3.调整安全释放方法

1
2
3
4
5
6
7
var lua = @"-- 上面的 Lua 脚本";
var result = (int)await db.ScriptEvaluateAsync(
lua,
new RedisKey[] { "stock:ticket", $"user:{userId}" },
new RedisValue[] { userId }
);
return result;

(4)、总结

再之前的方法中,假设再次遇到加锁的过程当中服务器宕机的情况

1.情况 A:抢票开始前 Redis 就宕机

  • AcquireAsync 连接 Redis 会失败
  • 方法直接抛异常或返回 null(锁获取失败)
  • 结果:所有用户请求都不能进入临界区 → 抢票全部失败
  • 用户看到的提示通常是 “服务繁忙”
    影响:抢票功能无法使用,库存不会被扣
    2.情况 B:获取锁成功,但执行 Lua 脚本时 Redis 宕机
  • 你拿到锁(orderId),开始执行 Lua 脚本扣库存
  • Redis 这时宕机 → Lua 执行失败
  • 结果:
    • 用户没有扣库存 → 订单未生成
    • finally 释放锁访问 Redis 失败 → 锁可能无法释放
  • 影响
    • 后续请求可能无法获取锁
    • 库存未扣 → 用户没抢到,但系统看起来“卡住”
      3.情况 C:Lua 执行成功,但 finally 释放锁时 Redis 宕机
  • Lua 执行成功 → Redis 中库存已扣,用户标记已写入
  • finally 中 ReleaseAsync 访问 Redis 失败
  • 结果:
    • 锁 key 可能没删掉
    • 其他请求尝试获取锁失败 → 临界区短时间阻塞
  • 库存和用户标记是对的 → 业务一致性没有问题
  • 影响:抢票速度受 Redis 锁阻塞影响
    4.情况 D:Redis 恢复后
  • Lua 脚本、锁释放失败的请求可以重试
  • Redis key 过期时间(你锁是 5 秒)到了 → 自动释放锁
  • 系统恢复正常,但短时间内部分请求可能失败

如何缓解

1.锁获取失败立即返回 / 限流

  • 避免用户一直等待

2.Redis 高可用集群

  • 哨兵/Cluster 避免单点宕机

3.库存持久化

  • Redis 只是缓存 + 锁
  • 核心库存和订单最终落数据库
    4.超时机制
  • 锁过期时间比业务执行时间长
  • 防止 finally 未释放锁 → 自动过期解锁
    5.重试机制
  • Lua 执行失败可以重试一次

分布式锁功能和lua的不同应用场景

其实上面的抢票的功能并不需要分布式锁这个功能,我们需要分清楚分布式锁和lua脚本的不同应用场景
!!! lua脚本其实就是为了保持原子性的在redis中执行lua时单线程的
!!! 分布式锁主要是避免做一件事情被其他互斥的,我进门加锁 做事情 昨晚事情出来 解锁 下一位重复
如上面的代码 分布式锁服务的就是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var result = (int)await db.ScriptEvaluateAsync(
lua,
new RedisKey[] { "stock:ticket", $"user:{userId}" },
new RedisValue[] { userId }
);
// 这里根据 result 做后续业务处理
if (result == 1)
{
Console.WriteLine("抢票成功");
// 发 MQ / 异步下单 / 记录日志
await rabbitMqDoWork(userId);
await InsertOrderAsync();
await InsertLogsAsync();
await DoSomethings();
}
else if (result == 0)
{
Console.WriteLine("已售罄");
}
else if (result == -1)
{
Console.WriteLine("已抢过");
}

做着一系列操作的时候排他

当前的抢票实现里,Redis 宕机只会导致 部分请求失败或延迟,不会出现库存被多扣或者用户重复抢票,但系统的可用性会受影响。只需要优化一下 Redis 高可用集群即可。

  • Title: 理解redis分布式锁
  • Author: faith team
  • Created at: 2026-02-27 11:21:05
  • Updated at: 2026-03-02 00:50:34
  • Link: https://redefine.ohevan.com/2026/02/27/20260227Redis锁/
  • License: This work is licensed under CC BY-NC-SA 4.0.
 Comments