Redis的分布式鎖

Posted by Lynn on Saturday, November 26, 2022

前言

最近在公司遇到一個問題,廠商提供我們API查詢訂單,我們會在對訂單做後續的處理。 但廠商有幾個查詢的條件

  1. 每次查詢的時間範圍是有限的,最短一分鐘最多一小時
  2. 每五分鐘只能有一台Server發出請求

查詢的時間範圍有限,那我們只要記錄著最後查詢時間用排程就好了,所以這不是問題, 那問題二呢?要如何確保同時只能有一台Server做這件事呢?

我的想法有幾個

  1. 限制我們佈署的那台Server,讓他永遠只能有這一台在工作。 結論:被否決了,理由是因為這件事不該由佈署決定,而且你也有辦法保證以後不管是誰佈署都要知道他只能擁有一台。
  2. 存在SQL裡面,多一個欄位是判斷最近有沒有做事過,如果true的話就不做事。 結論:也被否決了,原因是為了這件事多開張表或是欄位很浪費。

最後的做法就是標題啦,Redis的分布式鎖,其實概念跟第二點很相似,雖然說要注意Redis資料的持久性問題,但以這個Case來說有Log就可以解決這個問題了。

這是不是最好的方法我也不知道,有聽到一個好方法是做一張萬用Table表,專門存這些比較特殊的需求的,我覺得這方法還蠻不錯的,而且用Mongo DB的特性就很適合做這件事情。

接下來進入正題,介紹一下Redis的分布式鎖

這其實就跟在多執行緒中,我們常用的lock是一樣的,只是Redis扮演著分部式快取的這個角色,那他的鎖也是,只有拿到鎖的人才能去做後續的動作。

用簡單的方式比喻就是,今天大家搶著做一件事情,如果沒有人決定順序那大家一定會亂成一團做一堆重工的事,但今天假設他們做這些事情的時候都先問一個人呢? 這樣問題是不是就解決了 :laughing:

st=>start: 開始
e=>end: 結束
c=>inputoutput: 1.取得鎖
cond=>condition: 可取得嗎?
op2=>operation: 2.做些事情
op3=>operation: 3.釋放鎖
st(right)->c->cond(right)->op2(right)->op3(right)->e
cond(true)->op2
cond(false)->c

假設今天在第二步程式錯誤了沒有釋放,那你的鎖便會一直存在,這樣會導致再也沒有人可以取得鎖(死鎖),所以釋放鎖這件事情非常重要,設定鎖的過期時間,流程控制在finally釋放都是防止死鎖的好方法。

Redis的分布式鎖有兩個指令

  1. 如果沒有的話就設定 (SET if Not eXists.)
SETNX Key Value

可以看到第一次回傳True,之後的會回傳False 2. 設定過期時間 (SET EXpire value.)

SETEX Key Seconds Value

用ttl來查看剩餘時間

再來就是用C#,因為功能很簡單所以我直接用LinqPad示範。 先建立對DB的連線

public sealed class RedisAccessor
{
	public static IDatabase Instance
	{
		get { return _connection.Value.ConnectionMultiplexer.GetDatabase(); }
	}
	public readonly ConnectionMultiplexer ConnectionMultiplexer;
	private static readonly Lazy<RedisAccessor> _connection = new Lazy<RedisAccessor>(() =>
	{
		return new RedisAccessor();
	});
	private static string _connectionString;

	private RedisAccessor()
	{
		ConnectionMultiplexer = ConnectionMultiplexer.Connect(_connectionString);
	}

	public static void Init(string connectionString)
	{
		_connectionString = connectionString;
	}
}

然後是Demo的Service

public sealed class DemoService
{
	private readonly IDatabase _redisdb;
	private const string value = "Hello World";
	public DemoService(IDatabase redisdb)
	{
		_redisdb = redisdb;
	}

	public async Task GetLockAsync(string key, string user)
	{
		if (await _redisdb.LockTakeAsync(key, value, TimeSpan.FromSeconds(10)))
		{
			Console.WriteLine($"Thread Id:{Thread.CurrentThread.ManagedThreadId}, {user} 成功取得鎖");
			try
			{
				Console.WriteLine("做些什麼......");
			}
			finally
			{
				await _redisdb.LockReleaseAsync(key, value);
				Console.WriteLine("鎖已被釋放!");
				Console.WriteLine("====================================================================");
			}
		}
		else
		{
			Console.WriteLine($"Thread Id:{Thread.CurrentThread.ManagedThreadId}, {user} 沒能取得鎖,稍後重試");
			await Task.Delay(1000);
			await GetLockAsync(key, user);
		}
	}
}

再來是執行的流程

async Task Main()
{
	RedisDbFactory.Init("127.0.0.1:6379");
	var demoService = new DemoService(RedisDbFactory.Instance);
	var users = new string[5] { "UserA", "UserB", "UserC", "UserD", "UserE" };
	var tasks = new List<Task>();
	foreach (var user in users)
	{
		tasks.Add(Task.Run(async () =>
		{
			await demoService.GetLockAsync("Hello",user);
		}));
	}
}

執行結果如下