キューをBlockingCollection<T>の代わりにChannel<T>にすると非同期処理にできると教わったので試してみます。
BlockingCollection<T>版の記事


C#でワーカースレッド+キューの基本パターンを試す。
ワーカースレッドとキューの動きを確認するために、コンソールで入力した文字をコンソールに出力するだけのサンプルコードを作成しました。コンソールプロジェクトで作成していますが、多分WinFormsやWPFでも動くと思われます。ファイル名:Bas...
ファイル名:BasicWorkerQueue.cs
using System.Threading.Channels;
namespace WorkerQueSample01;
public class BasicWorkerQueue : IDisposable
{
private readonly Channel<string> _channel = Channel.CreateUnbounded<string>();
private CancellationTokenSource? _cts;
private Task? _workerTask;
public void Start()
{
if (_workerTask is not null && !_workerTask.IsCompleted) return; // 多重実行の禁止
_cts = new CancellationTokenSource();
_workerTask = Task.Run(() => WorkerAsync(_cts.Token));
}
public void Enqueue(string item)
{
if (!_channel.Writer.TryWrite(item))
{
Console.WriteLine("[Main] 書き込みに失敗しました。チャンネルが閉じられた可能性があります。");
}
}
public void Stop()
{
_cts?.Cancel();
_channel.Writer.TryComplete();
try
{
_workerTask?.Wait();
}
catch (AggregateException ae) when (ae.InnerException is OperationCanceledException)
{
Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] キャンセルを検知しました。");
}
}
private async Task WorkerAsync(CancellationToken token)
{
try
{
await foreach (var item in _channel.Reader.ReadAllAsync(token))
{
Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] {item}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] キャンセルされました。");
}
}
public void Dispose()
{
Stop();
_cts?.Dispose();
}
}
Program.csはBlockingCollection<T>版と同じなので割愛
Channelにしたおかげで、foreachの処理をawaitで待つ非同期処理にすることが出来ました。
await foreach (var item in _channel.Reader.ReadAllAsync(token))
{
Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] {item}");
}
サンプルの文字を表示する処理では、ありがたみが感じられませんが、I/Oの絡むような重たい処理では効果を発揮すると思われます。
コメント