C#でワーカースレッド+キューの基本パターンを試す2。「Channel版」

コンピュータ

キューをBlockingCollection<T>の代わりにChannel<T>にすると非同期処理にできると教わったので試してみます。

ファイル名:BasicWorkerQueue.cs

using System.Threading.Channels;

namespace WorkerQueSample01;
public class BasicWorkerQueue : IDisposable
{
    private readonly Channel<string> _channel = Channel.CreateUnbounded<string>();
    private CancellationTokenSource? _cts;
    private Task? _workerTask;

    public void Start()
    {
        if (_workerTask is not null && !_workerTask.IsCompleted) return; // 多重実行の禁止

        _cts = new CancellationTokenSource();
        _workerTask = Task.Run(() => WorkerAsync(_cts.Token));
    }

    public void Enqueue(string item)
    {
        if (!_channel.Writer.TryWrite(item))
        {
            Console.WriteLine("[Main] 書き込みに失敗しました。チャンネルが閉じられた可能性があります。");
        }
    }

    public void Stop()
    {
        _cts?.Cancel();
        _channel.Writer.TryComplete();

        try
        {
            _workerTask?.Wait();
        }
        catch (AggregateException ae) when (ae.InnerException is OperationCanceledException)
        {
            Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] キャンセルを検知しました。");
        }
    }

    private async Task WorkerAsync(CancellationToken token)
    {
        try
        {
            await foreach (var item in _channel.Reader.ReadAllAsync(token))
            {
                Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] {item}");
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] キャンセルされました。");
        }
    }

    public void Dispose()
    {
        Stop();
        _cts?.Dispose();
    }
}

Program.csはBlockingCollection<T>版と同じなので割愛

Channelにしたおかげで、foreachの処理をawaitで待つ非同期処理にすることが出来ました。

await foreach (var item in _channel.Reader.ReadAllAsync(token))
{
    Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] {item}");
}

サンプルの文字を表示する処理では、ありがたみが感じられませんが、I/Oの絡むような重たい処理では効果を発揮すると思われます。

コメント