一、事情背景 在问到redis锁的时候,经常重复被提起一个问题,那么就是当在锁里执行一个操作的时候,如果redis 宕机了会导致锁一直存在的缘故,今天就来好好的实操解决掉。
二、代码理解 (1)、写一个常用的单线程锁机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 int stock = 10 ;var users = new HashSet<string >();object locker = new ();Parallel.For(1 , 50 , i => { var userId = $"user_{i} " ; var result = GrabTicket(userId); Console.WriteLine($"{userId} => {result} " ); }); string GrabTicket (string userId ){ lock (locker) { if (users.Contains(userId)) return "已抢过" ; if (stock <= 0 ) return "已售罄" ; stock--; users.Add(userId); return "抢票成功" ; } }
如图上所示 默认10个库存,使用并发模拟抢票来进行抢票,单线程使用C# 的lock锁来保持线程安全,在分布式多台服务的情况下就无法使用lock,需要通过一个第三方服务比如redis来进行加锁(比如为了负载均衡增加并发,部署多个抢票服务,就无法使用lock,需要去一个统一的服务进行加锁操作)。
(2)、进阶到redis服务来进行加锁 以下写一个我之前错误的方式,请勿模仿!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 #region redis 模拟加锁操作 int stock = 10 ;var users = new HashSet<string >();var redis = await ConnectionMultiplexer.ConnectAsync("localhost:6379" );var db = redis.GetDatabase();var tasks = Enumerable.Range(1 , 50 ).Select(async i =>{ var userId = $"user_{i} " ; var result = await GrabTicket(userId); Console.WriteLine($"{userId} => {result} " ); }); await Task.WhenAll(tasks);async Task<string > GrabTicket (string userId ){ try { if (await db.StringSetAsync( userId, "hello" , TimeSpan.FromMinutes(5 ), when : When.NotExists )) { if (users.Contains(userId)) return "已抢过" ; if (stock <= 0 ) return "已售罄" ; stock--; users.Add(userId); return "抢票成功" ; } else { throw new Exception("服务繁忙" ); } } catch (Exception ex) { throw ; } } #endregion
之前一直给我的印象是使用redis 的isexits方法判断锁是否存在 存在则是锁被占用,服务繁忙,现在理解确实是为了防止相同的人 在同一个时间节点同时请求抢票,避免了同一个人的并发问题,但是没有解决50个人的并发问题。
(3)、正常企业加锁操作 需要写一个redis IDataBase单例模式,保证使用的是同一个redis实例,再进行加锁操作
3.1 RedisLock.cs 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 using StackExchange.Redis;using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace FaithBlog.Test { public class RedisLock { private readonly IDatabase _db; public RedisLock (IDatabase db ) { _db = db; } public async Task<string ?> TryAcquireAsync(string lockKey, TimeSpan expire) { var orderId = Guid.NewGuid().ToString(); bool locked = await _db.StringSetAsync( lockKey, requestId, expire, When.NotExists ); return locked ? orderId : null ; } public async Task<string ?> AcquireAsync( string lockKey, TimeSpan expire, TimeSpan waitTime, TimeSpan retryInterval) { var endTime = DateTime.UtcNow + waitTime; while (DateTime.UtcNow < endTime) { var orderId = await TryAcquireAsync(lockKey, expire); if (orderId != null ) return orderId; await Task.Delay(retryInterval); } return null ; } public async Task<bool > ReleaseAsync (string lockKey, string orderId ) { const string lua = @" if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end" ; var result = (int )await _db.ScriptEvaluateAsync( lua, new RedisKey[] { lockKey }, new RedisValue[] { orderId }); return result == 1 ; } } }
如上是redis的帮助类
3.2 redis分布式锁的demo实战 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 #region 完整版redis 锁 int stock = 10 ;var users = new HashSet<string >();var redis = await ConnectionMultiplexer.ConnectAsync("localhost:6379" );var db = redis.GetDatabase();var redisLock = new RedisLock(db);var tasks = Enumerable.Range(1 , 20 ).Select(async i =>{ var userId = $"worker_{i} " ; var result = await DoWork(userId); Console.WriteLine($"{userId} => {result} " ); }); await Task.WhenAll(tasks);async Task<string > DoWork (string userId ){ string lockKey = "lock:demo" ; var requestId = await redisLock.AcquireAsync( lockKey, expire: TimeSpan.FromSeconds(5 ), waitTime: TimeSpan.FromSeconds(3 ), retryInterval: TimeSpan.FromMilliseconds(100 ) ); if (requestId == null ) return "服务繁忙" ; try { Console.WriteLine($"{userId} 进入临界区" ); var result = (int )await db.ScriptEvaluateAsync( lua, new RedisKey[] { "stock:ticket" , $"user:{userId} " }, new RedisValue[] { userId } ); if (result == 1 ) { Console.WriteLine("抢票成功" ); await rabbitMqDoWork(userId); await InsertOrderAsync(); await InsertLogsAsync(); await DoSomethings(); } else if (result == 0 ) { Console.WriteLine("已售罄" ); } else if (result == -1 ) { Console.WriteLine("已抢过" ); } } finally { await redisLock.ReleaseAsync(lockKey, requestId); } } #endregion
但是一般实战中,不会使用内存的库存,正常我们理解的库存都是存储在数据库中,如果在redis锁中再去调用批量调用数据库 明显会使整个抢单变得繁琐,那么这个时候应该如何解决呢??? 我们可以将数据库的库存先缓存到redis中,直接通过lua脚本来进行扣除库存,之后将订单记录库存结果通过消息队列(rabbitMq)的方式来同步到数据库当中。 1.实现缓存数据库库存到redis
2.调整lua脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 if redis.call('EXISTS' , KEYS[2 ]) == 1 then return -1 end local stock = tonumber (redis.call('GET' , KEYS[1 ]))if stock <= 0 then return 0 end redis.call('DECR' , KEYS[1 ]) redis.call('SET' , KEYS[2 ], 1 ) return 1
3.调整安全释放方法
1 2 3 4 5 6 7 var lua = @"-- 上面的 Lua 脚本" ;var result = (int )await db.ScriptEvaluateAsync( lua, new RedisKey[] { "stock:ticket" , $"user:{userId} " }, new RedisValue[] { userId } ); return result;
(4)、总结 再之前的方法中,假设再次遇到加锁的过程当中服务器宕机的情况 1.情况 A:抢票开始前 Redis 就宕机
AcquireAsync 连接 Redis 会失败
方法直接抛异常或返回 null(锁获取失败)
结果:所有用户请求都不能进入临界区 → 抢票全部失败
用户看到的提示通常是 “服务繁忙” 影响:抢票功能无法使用,库存不会被扣 2.情况 B:获取锁成功,但执行 Lua 脚本时 Redis 宕机
你拿到锁(orderId),开始执行 Lua 脚本扣库存
Redis 这时宕机 → Lua 执行失败
结果:
用户没有扣库存 → 订单未生成
finally 释放锁访问 Redis 失败 → 锁可能无法释放
影响
后续请求可能无法获取锁
库存未扣 → 用户没抢到,但系统看起来“卡住” 3.情况 C:Lua 执行成功,但 finally 释放锁时 Redis 宕机
Lua 执行成功 → Redis 中库存已扣,用户标记已写入
finally 中 ReleaseAsync 访问 Redis 失败
结果:
锁 key 可能没删掉
其他请求尝试获取锁失败 → 临界区短时间阻塞
库存和用户标记是对的 → 业务一致性没有问题
影响:抢票速度受 Redis 锁阻塞影响 4.情况 D:Redis 恢复后
Lua 脚本、锁释放失败的请求可以重试
Redis key 过期时间(你锁是 5 秒)到了 → 自动释放锁
系统恢复正常,但短时间内部分请求可能失败
如何缓解 1.锁获取失败立即返回 / 限流
2.Redis 高可用集群
3.库存持久化
Redis 只是缓存 + 锁
核心库存和订单最终落数据库 4.超时机制
锁过期时间比业务执行时间长
防止 finally 未释放锁 → 自动过期解锁 5.重试机制
Lua 执行失败可以重试一次
分布式锁功能和lua的不同应用场景 其实上面的抢票的功能并不需要分布式锁这个功能,我们需要分清楚分布式锁和lua脚本的不同应用场景 !!! lua脚本其实就是为了保持原子性的在redis中执行lua时单线程的 !!! 分布式锁主要是避免做一件事情被其他互斥的,我进门加锁 做事情 昨晚事情出来 解锁 下一位重复 如上面的代码 分布式锁服务的就是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 var result = (int )await db.ScriptEvaluateAsync( lua, new RedisKey[] { "stock:ticket" , $"user:{userId} " }, new RedisValue[] { userId } ); if (result == 1 ) { Console.WriteLine("抢票成功" ); await rabbitMqDoWork(userId); await InsertOrderAsync(); await InsertLogsAsync(); await DoSomethings(); } else if (result == 0 ) { Console.WriteLine("已售罄" ); } else if (result == -1 ) { Console.WriteLine("已抢过" ); }
做着一系列操作的时候排他
当前的抢票实现里,Redis 宕机只会导致 部分请求失败或延迟,不会出现库存被多扣或者用户重复抢票,但系统的可用性会受影响。只需要优化一下 Redis 高可用集群即可。