.NET Queue的進化之路 Channels

Posted by Lynn on Sunday, July 9, 2023

以往我們比較熟悉的是Queue和ConcurrentQueue,偶爾也會看到BlockingCollection,Queue的主要概念就是先進先出的原則,舊有的Queue基本的方法就是EnqueueDequeuePeek,但不是Thread safe的,而ConcurrentQueue雖然是Thread safe,但卻不能限制Queue的長度大小,而最新的BlockingCollection雖然可以限制長度了,但相比之下少了Async的支援和一些靈活性,以下是整理比較。

Queue 基本概念

簡單的介紹常用Function

  • Enqueue 入隊
  • Dequeue 出隊
  • Peek 查看隊首的元素,但不移除他

為什麼會需要Peek? 假設在某些情況下從Queue拿取的元素不能成功做完(例如中間發生了Exception)等等的狀況,但此時你又不想要直接把那筆元素塞回Queue,因為這會導致順序問題,這種時候Peek就非常好用,可以讓你順利把事做完之後再讓那筆元素出隊。

Queue

Queue不是Thread Safe,如果要多執行緒讀取,會需要使用Lock來保護Queue。 也就是說在中間的While迴圈TryDequeue讀取完後,TryPeek那段是不會執行的,而且假設。

void Main()
{
	var queue = new Queue<int>();

	queue.Enqueue(1);
	queue.Enqueue(2);
	queue.Enqueue(3);

	while (queue.TryDequeue(out var front))
	{
		Console.WriteLine(front);
	}
//out : 1, 2, 3
}

ConcurrentQueue

ConcurrentQueue 是一種Thread safe的集合,它可以在多個Thread中被操作而不會導致數據不一致。主要的方法與 Queue 也基本上一致

void Main()
{
    var concurrentQueue = new ConcurrentQueue<int>();

    concurrentQueue.Enqueue(1);
    concurrentQueue.Enqueue(2);
    concurrentQueue.Enqueue(3);

    while (concurrentQueue.TryDequeue(out var item))
    {
        Console.WriteLine(item);
    }
}
//out : 1, 2, 3

BlockingCollection

BlockingCollection 是一種特殊的集合,功能上也是最強大的,也可以定義Queue中有多少個元素,而且BlockingCollection有一個特性叫做阻塞,相比Queue和ConcurrentQueue,再執行TryDequeue之後,假設Queue內沒有任何元素會立刻返回,而BlockingCollection會卡在條Thread直到有新的元素被新增。

void Main()
{
    var blockingCollection = new BlockingCollection<int>(3);

    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);

    while (blockingCollection.TryTake(out var item))
    {
        Console.WriteLine(item);
    }
}

懶人包:

  • 單一情況用Queue
  • 多Thread 用ConcurrentQueue
  • 多Thread 又要限制元素或有可能Cancel用BlockingCollection

Channels

廢話了一堆,終於到了主角Channels,這是 .NET Core3 以後的產物,同時也支持 .NET Standard,他引入了Producer/Consumer的概念,但其實他並沒有實作IProducerConsumerCollection這介面,而是根據ConcurrentQueue做封裝,而且獨特的是Channels可以支持多個Producer和多個Consumer,非常厲害XD

預設Channel有分兩種,Bounded和UnBounded,主要差別在有沒有限制Channels的長度,不限制長度的話沒什麼好考慮的,你只要不要來不及Consumer就沒事,但假如說Produce大於Consumer的速度,那就會造成Back Pressure,這其實是一個專業術語,意思是當任何資源受到壓力,必須要讓服務中的其他人知道這件事,給他一點時間來處理,在Channels這邊有四種模式,可以依情況選擇。

public enum BoundedChannelFullMode
{ 
    Wait,  //default
    DropNewest, //移除最新的
    DropOldest, //移除最舊的
    DropWrite //給你寫但我直丟
}

接著是一個Sample code

async Task Main()
{
	var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
	{
		FullMode = BoundedChannelFullMode.DropOldest,
		AllowSynchronousContinuations = true
	});

	var producers = new Task[3];
	for (int p = 0; p < producers.Length; p++)
	{
		var producerId = p;
		producers[p] = Task.Run(async () =>
		{
			for (int i = 0; i < 5; i++)
			{
				await channel.Writer.WriteAsync($"[{i}] by Producer-{producerId}");
			}
		});
	}

	var consumers = new Task[3];
	for (int c = 0; c < consumers.Length; c++)
	{
		var consumerId = c;
		consumers[c] = Task.Run(async () =>
		{
			await foreach (var item in channel.Reader.ReadAllAsync())
			{
				Console.WriteLine($"Read {item} by ConsumerId -{consumerId}");
			}
		});
	}
	
	await Task.WhenAll(producers);
	await Task.WhenAll(consumers);		
	channel.Writer.Complete();
}

output:
Read [1] by Producer-0 by ConsumerId -2
Read [2] by Producer-0 by ConsumerId -0
Read [4] by Producer-0 by ConsumerId -0
Read [0] by Producer-1 by ConsumerId -0
Read [1] by Producer-1 by ConsumerId -0
Read [0] by Producer-0 by ConsumerId -1
Read [3] by Producer-1 by ConsumerId -1
Read [3] by Producer-0 by ConsumerId -2
Read [0] by Producer-2 by ConsumerId -2
Read [2] by Producer-1 by ConsumerId -0
Read [4] by Producer-1 by ConsumerId -1
Read [3] by Producer-2 by ConsumerId -1
Read [1] by Producer-2 by ConsumerId -2
Read [2] by Producer-2 by ConsumerId -0
Read [4] by Producer-2 by ConsumerId -1

Benchmark

不免俗地還是要來個壓測,但這是完全沒意義的壓測,因為在單一Thread來說,Queue一定是最快的,因為資料結構是最單純,也都不需要考慮其他狀況。

void Main()
{
	BenchmarkRunner.Run<BenchmarkTests>();
}

public class BenchmarkTests
{
	private Queue<int> _queue = new Queue<int>();
	private ConcurrentQueue<int> _concurrentQueue = new ConcurrentQueue<int>();
	private BlockingCollection<int> _blockingCollection = new BlockingCollection<int>();
	private Channel<int> _channel;

	[Params(10000, 100000, 1000000)]
	public int Count { get; set; }

	[Benchmark]
	public void QueueTest()
	{
		_queue = new Queue<int>(Count);
		for (int i = 0; i < Count; i++)
		{
			_queue.Enqueue(i);
		}
		while (_queue.Count > 0)
		{
			_queue.Dequeue();
		}
	}

	[Benchmark]
	public void ConcurrentQueueTest()
	{
		_concurrentQueue = new ConcurrentQueue<int>();
		for (int i = 0; i < Count; i++)
		{
			_concurrentQueue.Enqueue(i);
		}
		while (_concurrentQueue.TryDequeue(out _))
		{
		}
	}

	[Benchmark]
	public void BlockingCollectionTest()
	{
		_blockingCollection = new BlockingCollection<int>(Count);
		for (int i = 0; i < Count; i++)
		{
			_blockingCollection.Add(i);
		}
		while (_blockingCollection.Count > 0)
		{
			_blockingCollection.Take();
		}
	}

	[Benchmark]
	public void ChannelsTestSync()
	{
		_channel = Channel.CreateBounded<int>(new BoundedChannelOptions(Count)
		{
			SingleReader = true,
			SingleWriter = true
		});

		for (var i = 0; i < Count; i++)
		{
			_channel.Writer.TryWrite(i);
		}

		while (_channel.Reader.TryRead(out _))
		{
		}

		_channel.Writer.Complete();
	}

}

// * Summary *

BenchmarkDotNet=v0.13.5, OS=Windows 11 (10.0.22621.1848/22H2/2022Update/SunValley2)
AMD Ryzen 7 3800X, 1 CPU, 16 logical and 8 physical cores
.NET SDK=8.0.100-preview.5.23303.2
  [Host] : .NET 6.0.18 (6.0.1823.26907), X64 RyuJIT AVX2


|                 Method |   Count |         Mean |        Error |       StdDev |
|----------------------- |-------- |-------------:|-------------:|-------------:|
|              QueueTest |   10000 |     62.65 μs |     1.240 μs |     1.428 μs |
|    ConcurrentQueueTest |   10000 |    125.27 μs |     1.445 μs |     1.352 μs |
| BlockingCollectionTest |   10000 |    930.24 μs |     2.925 μs |     2.736 μs |
|       ChannelsTestSync |   10000 |    298.28 μs |     2.162 μs |     1.805 μs |
|              QueueTest |  100000 |    719.52 μs |     5.511 μs |     5.155 μs |
|    ConcurrentQueueTest |  100000 |  1,587.05 μs |    22.041 μs |    20.618 μs |
| BlockingCollectionTest |  100000 |  9,614.03 μs |    34.986 μs |    31.014 μs |
|       ChannelsTestSync |  100000 |  3,354.74 μs |    49.013 μs |    45.847 μs |
|              QueueTest | 1000000 |  6,955.82 μs |    36.579 μs |    34.216 μs |
|    ConcurrentQueueTest | 1000000 | 13,214.31 μs |   134.758 μs |   126.053 μs |
| BlockingCollectionTest | 1000000 | 94,453.50 μs | 1,484.605 μs | 1,388.701 μs |
|       ChannelsTestSync | 1000000 | 34,101.46 μs |   422.561 μs |   395.264 μs |

直接從表上可以看到,BlockingCollection是比較消耗資源的,在Channels之前如果需要Queue的上限和Cancel的功能只能選擇BlockingCollection,但現在多了Channels可以選擇。

Reference: