C#でワーカースレッド+キューの基本パターンを試す2。「Channel版」

コンピュータ

キューをBlockingCollection<T>の代わりにChannel<T>にすると非同期処理にできると教わったので試してみます。

ファイル名:BasicWorkerQueue.cs

using System.Threading.Channels;

namespace WorkerQueSample01;
public class BasicWorkerQueue : IDisposable
{
    private readonly Channel<string> _channel = Channel.CreateUnbounded<string>();
    private CancellationTokenSource? _cts;
    private Task? _workerTask;

    public void Start()
    {
        if (_workerTask is not null && !_workerTask.IsCompleted) return; // 多重実行の禁止

        _cts = new CancellationTokenSource();
        _workerTask = Task.Run(() => WorkerAsync(_cts.Token));
    }

    public void Enqueue(string item)
    {
        if (!_channel.Writer.TryWrite(item))
        {
            Console.WriteLine("[Main] 書き込みに失敗しました。チャンネルが閉じられた可能性があります。");
        }
    }

    public void Stop()
    {
        _cts?.Cancel();
        _channel.Writer.TryComplete();

        try
        {
            _workerTask?.Wait();
        }
        catch (AggregateException ae) when (ae.InnerException is OperationCanceledException)
        {
            Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] キャンセルを検知しました。");
        }
    }

    private async Task WorkerAsync(CancellationToken token)
    {
        try
        {
            await foreach (var item in _channel.Reader.ReadAllAsync(token))
            {
                Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] {item}");
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] キャンセルされました。");
        }
    }

    public void Dispose()
    {
        Stop();
        _cts?.Dispose();
    }
}

Program.csはBlockingCollection<T>版と同じなので割愛

Channelにしたおかげで、foreachの処理をawaitで待つ非同期処理にすることが出来ました。

await foreach (var item in _channel.Reader.ReadAllAsync(token))
{
    Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] {item}");
}

サンプルの文字を表示する処理では、ありがたみが感じられませんが、I/Oの絡むような重たい処理では効果を発揮すると思われます。

追記:20250721

多段パイプラインのサンプル

ファイル名:PipelineStage.cs

// パイプラインステージ
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

public class PipelineStage<TIn, TOut>
{
    private readonly Func<TIn, Task<TOut>> _handler;
    private Channel<TIn> _input = Channel.CreateUnbounded<TIn>();
    private Channel<TOut> _output = Channel.CreateUnbounded<TOut>();
    private readonly CancellationToken _token;

    public ChannelWriter<TIn> Input => _input.Writer;
    public ChannelReader<TOut> Output => _output.Reader;
    public Task? WorkerTask { get; private set; }

    public PipelineStage(Func<TIn, Task<TOut>> handler, CancellationToken token = default)
    {
        _handler = handler;
        _token = token;
    }

    // 型安全なConnectFrom
    public void ConnectFrom<TPrev>(PipelineStage<TPrev, TIn> prevStage)
    {
        // 中継タスクを定義
        Task.Run(async () =>
        {
            await foreach (var item in prevStage.Output.ReadAllAsync(_token))
            {
                await _input.Writer.WriteAsync(item, _token);
            }
            _input.Writer.Complete();
        });
    }

    public async Task StartAsync()
    {
        WorkerTask = Task.Run(async () =>
        {
            try
            {
                await foreach (var item in _input.Reader.ReadAllAsync(_token))
                {
                    var result = await _handler(item);
                    await _output.Writer.WriteAsync(result, _token);
                }
            }
            catch (Exception ex)
            {
                Debug.Print($"[Error] {typeof(TIn).Name}→{typeof(TOut).Name}: {ex.Message}");
            }
            finally
            {
                _output.Writer.Complete();
            }
        }, _token);

        await Task.CompletedTask;
    }
}

ファイル名:Program.cs

using System;
using System.Linq;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        Console.WriteLine("開始");

        var cts = new CancellationTokenSource();

        var stage1 = new PipelineStage<string, int>(async s =>
        {
            await Task.Delay(1000);
            //return Task.FromResult(s.Length);
            return s.Length;
        }, cts.Token);

        var stage2 = new PipelineStage<int, string>(i =>
        {
            if (i == 5) throw new Exception("禁止値: 5");
            return Task.FromResult($"Len={i}");
        }, cts.Token);

        var stage3 = new PipelineStage<string, string>(s =>
        {
            return Task.FromResult($"Result: {s}");
        }, cts.Token);

        // 接続
        stage2.ConnectFrom(stage1);
        stage3.ConnectFrom(stage2);

        // 起動
        await stage1.StartAsync();
        await stage2.StartAsync();
        await stage3.StartAsync();

        // データ投入
        await stage1.Input.WriteAsync("abc");    // → 3 → OK
        await stage1.Input.WriteAsync("hello");  // → 5 → エラー
        await stage1.Input.WriteAsync("x");      // → 1 → OK
        stage1.Input.Complete();

        // 終了待ち
        await Task.WhenAll( stage1.WorkerTask!, stage2.WorkerTask!, stage3.WorkerTask! );

        // 出力
        Console.WriteLine("---- 最終出力 ----");
        await foreach (var result in stage3.Output.ReadAllAsync())
        {
            Console.WriteLine(result);
        }

        Console.WriteLine("終了");
    }
}

実行結果

開始
[Error] Int32→String: 禁止値: 5
---- 最終出力 ----
Result: Len=3
終了

追記:20250722
ジェネリック + 処理注入対応版

ファイル名:BasicWorkerQueue.cs

// ジェネリック + 処理注入対応版
using System.Threading.Channels;
using System.Diagnostics;

namespace WorkerQueSample04;

public class BasicWorkerQueue<T>
{
    private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();
    private Task? _workerTask;
    private readonly Func<T, Task> _workerAction;

    public BasicWorkerQueue(Func<T, Task> workerAction)
    {
        _workerAction = workerAction ?? throw new ArgumentNullException(nameof(workerAction));
    }

    public void Start()
    {
        if (_workerTask is not null && !_workerTask.IsCompleted) return;

        _workerTask = Task.Run(WorkerAsync);
    }

    public void Enqueue(T item)
    {
        if (!_channel.Writer.TryWrite(item))
        {
            Debug.Print("[Main] 書き込み失敗(チャンネルが閉じられている可能性)");
        }
    }

    public async Task StopAsync()
    {
        _channel.Writer.Complete();
        if (_workerTask is not null)
        {
            await _workerTask;
        }
    }

    private async Task WorkerAsync()
    {
        await foreach (var item in _channel.Reader.ReadAllAsync())
        {
            await _workerAction(item);  // 非同期実行
        }

        Debug.Print($"[Worker TID:{Environment.CurrentManagedThreadId}] チャンネル終了を検知");
    }
}

ファイル名:Program.cs

// 使用例(stringを処理するワーカースレッド)
namespace WorkerQueSample04;

class Program
{
    static async Task Main()
    {
        var queue = new BasicWorkerQueue<string>(async item =>
        {
            Console.WriteLine($"[Worker] 処理開始: {item}");
            await Task.Delay(100); // 模擬的な処理時間
            Console.WriteLine($"[Worker] 処理完了: {item}");
        });

        queue.Start();

        queue.Enqueue("A");
        queue.Enqueue("B");
        queue.Enqueue("C");

        await queue.StopAsync();
    }
}

実行結果

[Worker] 処理開始: A
[Worker] 処理完了: A
[Worker] 処理開始: B
[Worker] 処理完了: B
[Worker] 処理開始: C
[Worker] 処理完了: C
[Worker TID:4] チャンネル終了を検知

サンプルではstring型にしましたが、こちらを各ステージで使う引数と戻り値をセットするユーザークラスにすることになります。
多段のパイプラインにする場合、次のステージが見えないとかコードを書くことが出来ないので、最終ステージから書くことになると思われます。
この書き方がシンプルで好みです。

追記:20250729
上記コードに並列処理が出来るように変更してみました。
public void Start(int degreeOfParallelism = 1)の引数が並列数になっています。

// ジェネリック + 処理注入 + 並列処理対応版
using System.Threading.Channels;
using System.Diagnostics;

namespace CommonLib;

public class BasicWorkerQueue<T>
{
    private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();
    private readonly List<Task> _workerTasks = [];
    private readonly Func<T, Task> _workerAction;

    public BasicWorkerQueue(Func<T, Task> workerAction)
    {
        _workerAction = workerAction ?? throw new ArgumentNullException(nameof(workerAction));
    }

    public void Start(int degreeOfParallelism = 1)
    {
        for (int i = 0; i < degreeOfParallelism; i++)
        {
            _workerTasks.Add(Task.Run(WorkerAsync));
        }
    }

    public void Enqueue(T item)
    {
        if (!_channel.Writer.TryWrite(item))
        {
            Debug.Print("[Main] 書き込み失敗(チャンネルが閉じられている可能性)");
        }
    }

    public async Task StopAsync()
    {
        _channel.Writer.Complete();
        await Task.WhenAll(_workerTasks);
    }

    private async Task WorkerAsync()
    {
        await foreach (var item in _channel.Reader.ReadAllAsync())
        {
            await _workerAction(item);  // 非同期実行
        }

        Debug.Print($"[Worker TID:{Environment.CurrentManagedThreadId}] チャンネル終了を検知");
    }
}
/*
// 使用例(stringを処理するワーカースレッド)
namespace CommonLib;

class Program
{
    static async Task Main()
    {
        var queue = new BasicWorkerQueue<string>(async item =>
        {
            Console.WriteLine($"[Worker] 処理開始: {item}");
            await Task.Delay(100); // 模擬的な処理時間
            Console.WriteLine($"[Worker] 処理完了: {item}");
        });

        queue.Start();

        queue.Enqueue("A");
        queue.Enqueue("B");
        queue.Enqueue("C");

        await queue.StopAsync();
    }
}


シンプルなのでワーカー+チャンネルの多段パイプラインはこれを使う。
ワーカー+チャンネル、ジェネリック(T)他ステージへのキューの登録(パイプライン)は自前で行う。
Tはステージ感で利用する引数、戻り値をプロパティにしたDTOクラス

並列数をdegreeOfParallelismで指定
public void Start(int degreeOfParallelism = 1)

*/

コメント