以往我們比較熟悉的是Queue和ConcurrentQueue,偶爾也會看到BlockingCollection,Queue的主要概念就是先進先出的原則,舊有的Queue基本的方法就是Enqueue
、Dequeue
、Peek
,但不是Thread safe的,而ConcurrentQueue雖然是Thread safe,但卻不能限制Queue的長度大小,而最新的BlockingCollection雖然可以限制長度了,但相比之下少了Async的支援和一些靈活性,以下是整理比較。
Queue 基本概念
簡單的介紹常用Function
- Enqueue 入隊
- Dequeue 出隊
- Peek 查看隊首的元素,但不移除他
為什麼會需要Peek? 假設在某些情況下從Queue拿取的元素不能成功做完(例如中間發生了Exception)等等的狀況,但此時你又不想要直接把那筆元素塞回Queue,因為這會導致順序問題,這種時候Peek就非常好用,可以讓你順利把事做完之後再讓那筆元素出隊。
Queue
Queue不是Thread Safe,如果要多執行緒讀取,會需要使用Lock來保護Queue。 也就是說在中間的While迴圈TryDequeue讀取完後,TryPeek那段是不會執行的,而且假設。
void Main()
{
var queue = new Queue<int>();
queue.Enqueue(1);
queue.Enqueue(2);
queue.Enqueue(3);
while (queue.TryDequeue(out var front))
{
Console.WriteLine(front);
}
//out : 1, 2, 3
}
ConcurrentQueue
ConcurrentQueue 是一種Thread safe的集合,它可以在多個Thread中被操作而不會導致數據不一致。主要的方法與 Queue 也基本上一致
void Main()
{
var concurrentQueue = new ConcurrentQueue<int>();
concurrentQueue.Enqueue(1);
concurrentQueue.Enqueue(2);
concurrentQueue.Enqueue(3);
while (concurrentQueue.TryDequeue(out var item))
{
Console.WriteLine(item);
}
}
//out : 1, 2, 3
BlockingCollection
BlockingCollection 是一種特殊的集合,功能上也是最強大的,也可以定義Queue中有多少個元素,而且BlockingCollection有一個特性叫做阻塞,相比Queue和ConcurrentQueue,再執行TryDequeue之後,假設Queue內沒有任何元素會立刻返回,而BlockingCollection會卡在條Thread直到有新的元素被新增。
void Main()
{
var blockingCollection = new BlockingCollection<int>(3);
blockingCollection.Add(1);
blockingCollection.Add(2);
blockingCollection.Add(3);
while (blockingCollection.TryTake(out var item))
{
Console.WriteLine(item);
}
}
懶人包:
- 單一情況用Queue
- 多Thread 用ConcurrentQueue
- 多Thread 又要限制元素或有可能Cancel用BlockingCollection
Channels
廢話了一堆,終於到了主角Channels,這是 .NET Core3 以後的產物,同時也支持 .NET Standard,他引入了Producer/Consumer的概念,但其實他並沒有實作IProducerConsumerCollection
這介面,而是根據ConcurrentQueue
做封裝,而且獨特的是Channels可以支持多個Producer和多個Consumer,非常厲害XD
預設Channel有分兩種,Bounded和UnBounded,主要差別在有沒有限制Channels的長度,不限制長度的話沒什麼好考慮的,你只要不要來不及Consumer就沒事,但假如說Produce大於Consumer的速度,那就會造成Back Pressure,這其實是一個專業術語,意思是當任何資源受到壓力,必須要讓服務中的其他人知道這件事,給他一點時間來處理,在Channels這邊有四種模式,可以依情況選擇。
public enum BoundedChannelFullMode
{
Wait, //default
DropNewest, //移除最新的
DropOldest, //移除最舊的
DropWrite //給你寫但我直丟
}
接著是一個Sample code
async Task Main()
{
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.DropOldest,
AllowSynchronousContinuations = true
});
var producers = new Task[3];
for (int p = 0; p < producers.Length; p++)
{
var producerId = p;
producers[p] = Task.Run(async () =>
{
for (int i = 0; i < 5; i++)
{
await channel.Writer.WriteAsync($"[{i}] by Producer-{producerId}");
}
});
}
var consumers = new Task[3];
for (int c = 0; c < consumers.Length; c++)
{
var consumerId = c;
consumers[c] = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Read {item} by ConsumerId -{consumerId}");
}
});
}
await Task.WhenAll(producers);
await Task.WhenAll(consumers);
channel.Writer.Complete();
}
output:
Read [1] by Producer-0 by ConsumerId -2
Read [2] by Producer-0 by ConsumerId -0
Read [4] by Producer-0 by ConsumerId -0
Read [0] by Producer-1 by ConsumerId -0
Read [1] by Producer-1 by ConsumerId -0
Read [0] by Producer-0 by ConsumerId -1
Read [3] by Producer-1 by ConsumerId -1
Read [3] by Producer-0 by ConsumerId -2
Read [0] by Producer-2 by ConsumerId -2
Read [2] by Producer-1 by ConsumerId -0
Read [4] by Producer-1 by ConsumerId -1
Read [3] by Producer-2 by ConsumerId -1
Read [1] by Producer-2 by ConsumerId -2
Read [2] by Producer-2 by ConsumerId -0
Read [4] by Producer-2 by ConsumerId -1
Benchmark
不免俗地還是要來個壓測,但這是完全沒意義的壓測,因為在單一Thread來說,Queue一定是最快的,因為資料結構是最單純,也都不需要考慮其他狀況。
void Main()
{
BenchmarkRunner.Run<BenchmarkTests>();
}
public class BenchmarkTests
{
private Queue<int> _queue = new Queue<int>();
private ConcurrentQueue<int> _concurrentQueue = new ConcurrentQueue<int>();
private BlockingCollection<int> _blockingCollection = new BlockingCollection<int>();
private Channel<int> _channel;
[Params(10000, 100000, 1000000)]
public int Count { get; set; }
[Benchmark]
public void QueueTest()
{
_queue = new Queue<int>(Count);
for (int i = 0; i < Count; i++)
{
_queue.Enqueue(i);
}
while (_queue.Count > 0)
{
_queue.Dequeue();
}
}
[Benchmark]
public void ConcurrentQueueTest()
{
_concurrentQueue = new ConcurrentQueue<int>();
for (int i = 0; i < Count; i++)
{
_concurrentQueue.Enqueue(i);
}
while (_concurrentQueue.TryDequeue(out _))
{
}
}
[Benchmark]
public void BlockingCollectionTest()
{
_blockingCollection = new BlockingCollection<int>(Count);
for (int i = 0; i < Count; i++)
{
_blockingCollection.Add(i);
}
while (_blockingCollection.Count > 0)
{
_blockingCollection.Take();
}
}
[Benchmark]
public void ChannelsTestSync()
{
_channel = Channel.CreateBounded<int>(new BoundedChannelOptions(Count)
{
SingleReader = true,
SingleWriter = true
});
for (var i = 0; i < Count; i++)
{
_channel.Writer.TryWrite(i);
}
while (_channel.Reader.TryRead(out _))
{
}
_channel.Writer.Complete();
}
}
// * Summary *
BenchmarkDotNet=v0.13.5, OS=Windows 11 (10.0.22621.1848/22H2/2022Update/SunValley2)
AMD Ryzen 7 3800X, 1 CPU, 16 logical and 8 physical cores
.NET SDK=8.0.100-preview.5.23303.2
[Host] : .NET 6.0.18 (6.0.1823.26907), X64 RyuJIT AVX2
| Method | Count | Mean | Error | StdDev |
|----------------------- |-------- |-------------:|-------------:|-------------:|
| QueueTest | 10000 | 62.65 μs | 1.240 μs | 1.428 μs |
| ConcurrentQueueTest | 10000 | 125.27 μs | 1.445 μs | 1.352 μs |
| BlockingCollectionTest | 10000 | 930.24 μs | 2.925 μs | 2.736 μs |
| ChannelsTestSync | 10000 | 298.28 μs | 2.162 μs | 1.805 μs |
| QueueTest | 100000 | 719.52 μs | 5.511 μs | 5.155 μs |
| ConcurrentQueueTest | 100000 | 1,587.05 μs | 22.041 μs | 20.618 μs |
| BlockingCollectionTest | 100000 | 9,614.03 μs | 34.986 μs | 31.014 μs |
| ChannelsTestSync | 100000 | 3,354.74 μs | 49.013 μs | 45.847 μs |
| QueueTest | 1000000 | 6,955.82 μs | 36.579 μs | 34.216 μs |
| ConcurrentQueueTest | 1000000 | 13,214.31 μs | 134.758 μs | 126.053 μs |
| BlockingCollectionTest | 1000000 | 94,453.50 μs | 1,484.605 μs | 1,388.701 μs |
| ChannelsTestSync | 1000000 | 34,101.46 μs | 422.561 μs | 395.264 μs |
直接從表上可以看到,BlockingCollection是比較消耗資源的,在Channels之前如果需要Queue的上限和Cancel的功能只能選擇BlockingCollection,但現在多了Channels可以選擇。
Reference:
- https://learn.microsoft.com/en-us/dotnet/core/extensions/channels?WT.mc_id=DT-MVP-4015686
- https://jeremybytes.blogspot.com/2021/02/whats-difference-between-channel-and.html#:~:text=A%20functional%20difference%20between%20Channel,is%20no%20%22Peek%22%20functionality.
- https://medium.com/@niteshsinghal85/using-channels-for-asynchronous-queuing-in-c-ed96c51d4576