キューをBlockingCollection<T>の代わりにChannel<T>にすると非同期処理にできると教わったので試してみます。
BlockingCollection<T>版の記事


C#でワーカースレッド+キューの基本パターンを試す。
ワーカースレッドとキューの動きを確認するために、コンソールで入力した文字をコンソールに出力するだけのサンプルコードを作成しました。コンソールプロジェクトで作成していますが、多分WinFormsやWPFでも動くと思われます。ファイル名:Bas...
ファイル名:BasicWorkerQueue.cs
using System.Threading.Channels;
namespace WorkerQueSample01;
public class BasicWorkerQueue : IDisposable
{
private readonly Channel<string> _channel = Channel.CreateUnbounded<string>();
private CancellationTokenSource? _cts;
private Task? _workerTask;
public void Start()
{
if (_workerTask is not null && !_workerTask.IsCompleted) return; // 多重実行の禁止
_cts = new CancellationTokenSource();
_workerTask = Task.Run(() => WorkerAsync(_cts.Token));
}
public void Enqueue(string item)
{
if (!_channel.Writer.TryWrite(item))
{
Console.WriteLine("[Main] 書き込みに失敗しました。チャンネルが閉じられた可能性があります。");
}
}
public void Stop()
{
_cts?.Cancel();
_channel.Writer.TryComplete();
try
{
_workerTask?.Wait();
}
catch (AggregateException ae) when (ae.InnerException is OperationCanceledException)
{
Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] キャンセルを検知しました。");
}
}
private async Task WorkerAsync(CancellationToken token)
{
try
{
await foreach (var item in _channel.Reader.ReadAllAsync(token))
{
Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] {item}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] キャンセルされました。");
}
}
public void Dispose()
{
Stop();
_cts?.Dispose();
}
}
Program.csはBlockingCollection<T>版と同じなので割愛
Channelにしたおかげで、foreachの処理をawaitで待つ非同期処理にすることが出来ました。
await foreach (var item in _channel.Reader.ReadAllAsync(token))
{
Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] {item}");
}
サンプルの文字を表示する処理では、ありがたみが感じられませんが、I/Oの絡むような重たい処理では効果を発揮すると思われます。
追記:20250721
多段パイプラインのサンプル
ファイル名:PipelineStage.cs
// パイプラインステージ
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
public class PipelineStage<TIn, TOut>
{
private readonly Func<TIn, Task<TOut>> _handler;
private Channel<TIn> _input = Channel.CreateUnbounded<TIn>();
private Channel<TOut> _output = Channel.CreateUnbounded<TOut>();
private readonly CancellationToken _token;
public ChannelWriter<TIn> Input => _input.Writer;
public ChannelReader<TOut> Output => _output.Reader;
public Task? WorkerTask { get; private set; }
public PipelineStage(Func<TIn, Task<TOut>> handler, CancellationToken token = default)
{
_handler = handler;
_token = token;
}
// 型安全なConnectFrom
public void ConnectFrom<TPrev>(PipelineStage<TPrev, TIn> prevStage)
{
// 中継タスクを定義
Task.Run(async () =>
{
await foreach (var item in prevStage.Output.ReadAllAsync(_token))
{
await _input.Writer.WriteAsync(item, _token);
}
_input.Writer.Complete();
});
}
public async Task StartAsync()
{
WorkerTask = Task.Run(async () =>
{
try
{
await foreach (var item in _input.Reader.ReadAllAsync(_token))
{
var result = await _handler(item);
await _output.Writer.WriteAsync(result, _token);
}
}
catch (Exception ex)
{
Debug.Print($"[Error] {typeof(TIn).Name}→{typeof(TOut).Name}: {ex.Message}");
}
finally
{
_output.Writer.Complete();
}
}, _token);
await Task.CompletedTask;
}
}
ファイル名:Program.cs
using System;
using System.Linq;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
Console.WriteLine("開始");
var cts = new CancellationTokenSource();
var stage1 = new PipelineStage<string, int>(async s =>
{
await Task.Delay(1000);
//return Task.FromResult(s.Length);
return s.Length;
}, cts.Token);
var stage2 = new PipelineStage<int, string>(i =>
{
if (i == 5) throw new Exception("禁止値: 5");
return Task.FromResult($"Len={i}");
}, cts.Token);
var stage3 = new PipelineStage<string, string>(s =>
{
return Task.FromResult($"Result: {s}");
}, cts.Token);
// 接続
stage2.ConnectFrom(stage1);
stage3.ConnectFrom(stage2);
// 起動
await stage1.StartAsync();
await stage2.StartAsync();
await stage3.StartAsync();
// データ投入
await stage1.Input.WriteAsync("abc"); // → 3 → OK
await stage1.Input.WriteAsync("hello"); // → 5 → エラー
await stage1.Input.WriteAsync("x"); // → 1 → OK
stage1.Input.Complete();
// 終了待ち
await Task.WhenAll( stage1.WorkerTask!, stage2.WorkerTask!, stage3.WorkerTask! );
// 出力
Console.WriteLine("---- 最終出力 ----");
await foreach (var result in stage3.Output.ReadAllAsync())
{
Console.WriteLine(result);
}
Console.WriteLine("終了");
}
}
実行結果
開始
[Error] Int32→String: 禁止値: 5
---- 最終出力 ----
Result: Len=3
終了
追記:20250722
ジェネリック + 処理注入対応版
ファイル名:BasicWorkerQueue.cs
// ジェネリック + 処理注入対応版
using System.Threading.Channels;
using System.Diagnostics;
namespace WorkerQueSample04;
public class BasicWorkerQueue<T>
{
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();
private Task? _workerTask;
private readonly Func<T, Task> _workerAction;
public BasicWorkerQueue(Func<T, Task> workerAction)
{
_workerAction = workerAction ?? throw new ArgumentNullException(nameof(workerAction));
}
public void Start()
{
if (_workerTask is not null && !_workerTask.IsCompleted) return;
_workerTask = Task.Run(WorkerAsync);
}
public void Enqueue(T item)
{
if (!_channel.Writer.TryWrite(item))
{
Debug.Print("[Main] 書き込み失敗(チャンネルが閉じられている可能性)");
}
}
public async Task StopAsync()
{
_channel.Writer.Complete();
if (_workerTask is not null)
{
await _workerTask;
}
}
private async Task WorkerAsync()
{
await foreach (var item in _channel.Reader.ReadAllAsync())
{
await _workerAction(item); // 非同期実行
}
Debug.Print($"[Worker TID:{Environment.CurrentManagedThreadId}] チャンネル終了を検知");
}
}
ファイル名:Program.cs
// 使用例(stringを処理するワーカースレッド)
namespace WorkerQueSample04;
class Program
{
static async Task Main()
{
var queue = new BasicWorkerQueue<string>(async item =>
{
Console.WriteLine($"[Worker] 処理開始: {item}");
await Task.Delay(100); // 模擬的な処理時間
Console.WriteLine($"[Worker] 処理完了: {item}");
});
queue.Start();
queue.Enqueue("A");
queue.Enqueue("B");
queue.Enqueue("C");
await queue.StopAsync();
}
}
実行結果
[Worker] 処理開始: A
[Worker] 処理完了: A
[Worker] 処理開始: B
[Worker] 処理完了: B
[Worker] 処理開始: C
[Worker] 処理完了: C
[Worker TID:4] チャンネル終了を検知
サンプルではstring型にしましたが、こちらを各ステージで使う引数と戻り値をセットするユーザークラスにすることになります。
多段のパイプラインにする場合、次のステージが見えないとかコードを書くことが出来ないので、最終ステージから書くことになると思われます。
この書き方がシンプルで好みです。
追記:20250729
上記コードに並列処理が出来るように変更してみました。
public void Start(int degreeOfParallelism = 1)の引数が並列数になっています。
// ジェネリック + 処理注入 + 並列処理対応版
using System.Threading.Channels;
using System.Diagnostics;
namespace CommonLib;
public class BasicWorkerQueue<T>
{
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();
private readonly List<Task> _workerTasks = [];
private readonly Func<T, Task> _workerAction;
public BasicWorkerQueue(Func<T, Task> workerAction)
{
_workerAction = workerAction ?? throw new ArgumentNullException(nameof(workerAction));
}
public void Start(int degreeOfParallelism = 1)
{
for (int i = 0; i < degreeOfParallelism; i++)
{
_workerTasks.Add(Task.Run(WorkerAsync));
}
}
public void Enqueue(T item)
{
if (!_channel.Writer.TryWrite(item))
{
Debug.Print("[Main] 書き込み失敗(チャンネルが閉じられている可能性)");
}
}
public async Task StopAsync()
{
_channel.Writer.Complete();
await Task.WhenAll(_workerTasks);
}
private async Task WorkerAsync()
{
await foreach (var item in _channel.Reader.ReadAllAsync())
{
await _workerAction(item); // 非同期実行
}
Debug.Print($"[Worker TID:{Environment.CurrentManagedThreadId}] チャンネル終了を検知");
}
}
/*
// 使用例(stringを処理するワーカースレッド)
namespace CommonLib;
class Program
{
static async Task Main()
{
var queue = new BasicWorkerQueue<string>(async item =>
{
Console.WriteLine($"[Worker] 処理開始: {item}");
await Task.Delay(100); // 模擬的な処理時間
Console.WriteLine($"[Worker] 処理完了: {item}");
});
queue.Start();
queue.Enqueue("A");
queue.Enqueue("B");
queue.Enqueue("C");
await queue.StopAsync();
}
}
シンプルなのでワーカー+チャンネルの多段パイプラインはこれを使う。
ワーカー+チャンネル、ジェネリック(T)他ステージへのキューの登録(パイプライン)は自前で行う。
Tはステージ感で利用する引数、戻り値をプロパティにしたDTOクラス
並列数をdegreeOfParallelismで指定
public void Start(int degreeOfParallelism = 1)
*/
コメント