TaskCompletionSource<TResult>を使ったコードを試してみます。
前回記事
https://maywork.net/computer/cshap-worker-and-que-basic-pattern2-channel/
ファイル名:BasicWorkerQueue.cs
using System.Threading.Channels;
namespace WorkerQueSample01;
public class BasicWorkerQueue : IAsyncDisposable
{
private readonly Channel<JobItem<string, string>> _channel = Channel.CreateUnbounded<JobItem<string, string>>();
private CancellationTokenSource? _cts;
private Task? _workerTask;
public void Start()
{
if (_workerTask is not null && !_workerTask.IsCompleted) return; // 多重実行の禁止
_cts = new CancellationTokenSource();
_workerTask = Task.Run(() => WorkerAsync(_cts.Token));
}
public void Enqueue(JobItem<string, string> item)
{
if (!_channel.Writer.TryWrite(item))
{
Console.WriteLine("[Main] 書き込みに失敗しました。チャンネルが閉じられた可能性があります。");
}
}
public void Stop()
{
_cts?.Cancel();
_channel.Writer.TryComplete();
try
{
_workerTask?.Wait();
}
catch (AggregateException ae) when (ae.InnerException is OperationCanceledException)
{
Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] キャンセルを検知しました。");
}
}
private async Task WorkerAsync(CancellationToken token)
{
try
{
await foreach (var item in _channel.Reader.ReadAllAsync(token))
{
await Task.Delay(1000, token); // 重たい処理
item.Completion.SetResult($"{item.Payload}完了");
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"[Worker TID:{Environment.CurrentManagedThreadId}] キャンセルされました。");
}
}
public async ValueTask DisposeAsync()
{
if (_cts is not null)
{
_channel.Writer.TryComplete(); // チャンネルを先に閉じる
_cts.Cancel();
if (_workerTask is not null)
{
try
{
await _workerTask;
}
catch (OperationCanceledException)
{
// 通常のキャンセルなので無視
}
}
_cts.Dispose();
_cts = null;
}
}
}
ファイル名:JobItem.cs
namespace WorkerQueSample01;
public class JobItem<TIn, TOut>
{
public TIn Payload { get; }
public TaskCompletionSource<TOut> Completion { get; }
public JobItem(TIn payload)
{
Payload = payload;
Completion = new();
}
}
ファイル名:Program.cs
namespace WorkerQueSample01;
class Program
{
static async Task Main()
{
var workerQueue = new BasicWorkerQueue();
workerQueue.Start();
var job = new JobItem<string, string>("Task1");
var job2 = new JobItem<string, string>("Task2");
workerQueue.Enqueue(job);
workerQueue.Enqueue(job2);
var result1 = await job.Completion.Task;
Console.WriteLine(result1);
var result2 = await job2.Completion.Task;
Console.WriteLine(result2);
await workerQueue.DisposeAsync();
}
}
前回の記事のコードは、処理をワーカースレッドに依頼し結果を受け取りは無しのコードです。実用的にはキャッシュ機構に先読み(プリフェッチ)などには使えると思います。
今回のコードはTaskCompletionSource<TResult>をつかいワーカースレッドで処理した結果を受け取るコードになっています。
どうにか動作するようになりましたが、実際このコードをそのまま適用しても、余りメリットが無く、awat Task.Run()
で非同期処理しているのと大差がない振る舞いに成ります。
多段のパイプラインにしたり並列処理をしたりすることでこちらのコードが生きてくると考えらます。
コメント