前言
最近在公司遇到一個問題,廠商提供我們API查詢訂單,我們會在對訂單做後續的處理。 但廠商有幾個查詢的條件
- 每次查詢的時間範圍是有限的,最短一分鐘最多一小時
- 每五分鐘只能有一台Server發出請求
查詢的時間範圍有限,那我們只要記錄著最後查詢時間用排程就好了,所以這不是問題, 那問題二呢?要如何確保同時只能有一台Server做這件事呢?
我的想法有幾個
- 限制我們佈署的那台Server,讓他永遠只能有這一台在工作。 結論:被否決了,理由是因為這件事不該由佈署決定,而且你也有辦法保證以後不管是誰佈署都要知道他只能擁有一台。
- 存在SQL裡面,多一個欄位是判斷最近有沒有做事過,如果true的話就不做事。 結論:也被否決了,原因是為了這件事多開張表或是欄位很浪費。
最後的做法就是標題啦,Redis的分布式鎖,其實概念跟第二點很相似,雖然說要注意Redis資料的持久性問題,但以這個Case來說有Log就可以解決這個問題了。
這是不是最好的方法我也不知道,有聽到一個好方法是做一張萬用Table表,專門存這些比較特殊的需求的,我覺得這方法還蠻不錯的,而且用Mongo DB的特性就很適合做這件事情。
接下來進入正題,介紹一下Redis的分布式鎖
這其實就跟在多執行緒中,我們常用的lock是一樣的,只是Redis扮演著分部式快取的這個角色,那他的鎖也是,只有拿到鎖的人才能去做後續的動作。
用簡單的方式比喻就是,今天大家搶著做一件事情,如果沒有人決定順序那大家一定會亂成一團做一堆重工的事,但今天假設他們做這些事情的時候都先問一個人呢? 這樣問題是不是就解決了 :laughing:
st=>start: 開始
e=>end: 結束
c=>inputoutput: 1.取得鎖
cond=>condition: 可取得嗎?
op2=>operation: 2.做些事情
op3=>operation: 3.釋放鎖
st(right)->c->cond(right)->op2(right)->op3(right)->e
cond(true)->op2
cond(false)->c
假設今天在第二步程式錯誤了沒有釋放,那你的鎖便會一直存在,這樣會導致再也沒有人可以取得鎖(死鎖),所以釋放鎖這件事情非常重要,設定鎖的過期時間,流程控制在finally釋放都是防止死鎖的好方法。
Redis的分布式鎖有兩個指令
- 如果沒有的話就設定 (SET if Not eXists.)
SETNX Key Value
可以看到第一次回傳True,之後的會回傳False
2. 設定過期時間 (SET EXpire value.)
SETEX Key Seconds Value
用ttl來查看剩餘時間
再來就是用C#,因為功能很簡單所以我直接用LinqPad示範。 先建立對DB的連線
public sealed class RedisAccessor
{
public static IDatabase Instance
{
get { return _connection.Value.ConnectionMultiplexer.GetDatabase(); }
}
public readonly ConnectionMultiplexer ConnectionMultiplexer;
private static readonly Lazy<RedisAccessor> _connection = new Lazy<RedisAccessor>(() =>
{
return new RedisAccessor();
});
private static string _connectionString;
private RedisAccessor()
{
ConnectionMultiplexer = ConnectionMultiplexer.Connect(_connectionString);
}
public static void Init(string connectionString)
{
_connectionString = connectionString;
}
}
然後是Demo的Service
public sealed class DemoService
{
private readonly IDatabase _redisdb;
private const string value = "Hello World";
public DemoService(IDatabase redisdb)
{
_redisdb = redisdb;
}
public async Task GetLockAsync(string key, string user)
{
if (await _redisdb.LockTakeAsync(key, value, TimeSpan.FromSeconds(10)))
{
Console.WriteLine($"Thread Id:{Thread.CurrentThread.ManagedThreadId}, {user} 成功取得鎖");
try
{
Console.WriteLine("做些什麼......");
}
finally
{
await _redisdb.LockReleaseAsync(key, value);
Console.WriteLine("鎖已被釋放!");
Console.WriteLine("====================================================================");
}
}
else
{
Console.WriteLine($"Thread Id:{Thread.CurrentThread.ManagedThreadId}, {user} 沒能取得鎖,稍後重試");
await Task.Delay(1000);
await GetLockAsync(key, user);
}
}
}
再來是執行的流程
async Task Main()
{
RedisDbFactory.Init("127.0.0.1:6379");
var demoService = new DemoService(RedisDbFactory.Instance);
var users = new string[5] { "UserA", "UserB", "UserC", "UserD", "UserE" };
var tasks = new List<Task>();
foreach (var user in users)
{
tasks.Add(Task.Run(async () =>
{
await demoService.GetLockAsync("Hello",user);
}));
}
}
執行結果如下