まず、ワーカーは言語機能や特定のライブラリを指す言葉ではありません。
メインスレッド(WPFならUIスレッド)とは別のスレッドで処理を引き受ける、
プログラム上の設計(役割) を指します。
ワーカーは、ローカルで、同一プロセス内に立ち上げる小さなサーバーのようなものです。
通常、サーバーを使ったプログラミングでは、
「サーバーを利用する側」(クライアント)を書くか
「サーバーの処理ロジックを書く側」(サーバーサイド)を書くか
のどちらかになります。
しかしワーカーの場合は違います。
ワーカーは、サーバーそのものをアプリ内部に自作する ことになります。
使い所
ワーカーは、別スレッドで動作し、かつ、単発では完結しない継続的な処理を扱うための構造です。
単発で完結する処理であれば、
同一スレッドなら通常のメソッド(いわゆる関数)で十分です。
別スレッドで実行したいだけなら、非同期処理(Task や async/await)で足ります。
◯用途別分類
- 単発・同一スレッド → 関数
- 単発・別スレッド → 非同期処理
- 継続的・別スレッド → ワーカー
例えば、非同期処理で実行回数を管理する場合、
カウンターは呼び出し元に置くことになり、別スレッドから更新するため排他制御が必要になります。
int _counter = 0;
object _lock = new();
async Task DoWorkAsync()
{
await Task.Run(() =>
{
// 別スレッドで実行される
lock (_lock) // 排他制御が必要
{
_counter++;
}
});
}
一方ワーカーでは、カウンターを内部に持たせることができるため、
状態はワーカー専有となり、スレッド間の共有や排他制御を意識する必要がありません。
class CounterWorker
{
private int _counter = 0; // 状態はワーカー内部
public Task<int> CountUpAsync()
{
// 依頼をキューに積むイメージ
return EnqueueAsync(() =>
{
_counter++; // 同一スレッド内で更新
return _counter;
});
}
private Task<T> EnqueueAsync<T>(Func<T> work)
{
// 実際はキュー+常駐ループ
// イメージだけ
}
}
最小構成
別スレッドで動かす
UIスレッドとは独立して動作させる必要があります。
そのために Task.Run() などで実行します。
常駐するループ
ワーカーは単発処理ではありません。
依頼を待ち続ける必要があるため、
内部に常駐ループを持ちます。
while (true)
{
// 依頼を待つ
}
呼び出し元から依頼を受け取るキュー
ワーカーは直接呼び出されるのではなく、
依頼を受け取る仕組みが必要です。
そのために Channel などのキューを使います。
- 呼び出し元 → キューに積む
- ワーカー → キューから取り出す
という構造になります。
終了させるためのキャンセル機構
常駐ループは止められなければなりません。
そのため、
- CancellationToken
- Channelの完了通知
などで外部から停止できる仕組みを用意します。
実装例
ワーカー実装例(カウントアップ)
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
public sealed class CounterWorker : IDisposable
{
private readonly Channel<TaskCompletionSource<int>> _channel;
private readonly CancellationTokenSource _cts = new();
private readonly Task _workerTask;
private int _counter = 0; // 内部状態
public CounterWorker()
{
_channel = Channel.CreateUnbounded<TaskCompletionSource<int>>();
// 別スレッドで常駐開始
_workerTask = Task.Run(WorkerLoopAsync);
}
// 呼び出し側API
public async Task<int> CountUpAsync()
{
var tcs = new TaskCompletionSource<int>(
TaskCreationOptions.RunContinuationsAsynchronously);
await _channel.Writer.WriteAsync(tcs);
return await tcs.Task;
}
// ワーカーループ
private async Task WorkerLoopAsync()
{
try
{
await foreach (var tcs in _channel.Reader.ReadAllAsync(_cts.Token))
{
// 疑似重処理
await Task.Delay(500, _cts.Token);
_counter++; // 内部状態更新
tcs.SetResult(_counter);
}
}
catch (OperationCanceledException)
{
// 正常終了
}
}
// 外部から停止
public void Dispose()
{
_cts.Cancel();
_channel.Writer.TryComplete();
_workerTask.Wait();
_cts.Dispose();
}
}
使い方イメージ
var worker = new CounterWorker();
int a = await worker.CountUpAsync(); // 1
int b = await worker.CountUpAsync(); // 2
int c = await worker.CountUpAsync(); // 3
worker.Dispose(); // 停止
TaskCompletionSource<T>
Task を作り、その Task を外部から完了させることができるクラス。
呼び出し側:
var result = await worker.CountUpAsync();
内部では:
var tcs = new TaskCompletionSource<int>();
_channel.Writer.WriteAsync(tcs);
return tcs.Task; // ここでTaskを返す
そしてワーカー側で:
_counter++;
tcs.SetResult(_counter); // ← ここでTaskを完了させる
Channel<T>
Channelはスレッド間で安全にデータを受け渡すためのキューになります。
var channel = Channel.CreateUnbounded<int>();
書き込み側(Producer)
await channel.Writer.WriteAsync(1);
データをキューに入れます。
読み取り側(Consumer)
var value = await channel.Reader.ReadAsync();
キューから取り出します。
具体的には、次のような流れになります。
呼び出し元は、ワーカーに「処理の依頼」を出します。
ワーカーはその依頼を受け取り、
まずキュー(Channel)に積みます。
常駐しているワーカーループは、
キューから依頼を1つずつ取り出し、順番に実行します。
CancellationTokenSource
CancellationTokenSourceは処理を外部から止めるためのスイッチです。
var cts = new CancellationTokenSource();
var token = cts.Token;
- CancellationTokenSource → キャンセルを発行する側
- CancellationToken → キャンセルを受け取る側
止める側(外部):
cts.Cancel();
受け取る側(ワーカー):
while (!token.IsCancellationRequested)
{
// 処理
}
又は
await Task.Delay(1000, token);
実装例:
private async Task WorkerLoopAsync(CancellationToken token)
{
try
{
while (true)
{
token.ThrowIfCancellationRequested();
var item = await channel.Reader.ReadAsync(token);
// 処理
}
}
catch (OperationCanceledException)
{
// 正常なキャンセル終了
}
}
外部から cts.Cancel(); が呼ばれると、
ワーカーループ内の token.ThrowIfCancellationRequested() が
OperationCanceledException を送出します。
この例外を try-catch で捕まえることで、
ワーカーループを安全に終了させることができます。
キャンセルトークン付き Async メソッドとは
async メソッドは、外部からキャンセルできるように
CancellationToken を引数として受け取ることができます。
async Task DoWorkAsync(CancellationToken token)
{
await Task.Delay(1000, token);
}
このtokenは、
「止めてほしい」という外部からの通知
を受け取るためのものです。
キャンセルトークン付きAsyncメソッドの基本形:
async Task DoWorkAsync(CancellationToken token)
{
token.ThrowIfCancellationRequested(); // キャンセル要求チェック
await Task.Delay(500, token); // キャンセルトークン付きのメソッドを呼び出す例
token.ThrowIfCancellationRequested(); // キャンセル要求チェック
// 処理
}
処理を行う場合は
token.ThrowIfCancellationRequested();
を実行して、キャンセル要求が来ていないかを確認します。
このチェックの粒度が細かいほど、
キャンセルはスムーズに動作します。
逆に、チェックの間隔が大きいと、
キャンセル要求が出されてもすぐには停止せず、
処理が続いてしまいます。
await できるメソッドには token を渡す
await Task.Delay(500, token);
今回のサンプルでは、ワーカーループを終了させるためのキャンセル機構となります。
個別のタスクをキャンセルする機構を組むことも出来ますが、今回の記事では扱いません。
さいごに
ワーカーは便利な設計手法ですが、
無理に使う必要はありません。
単発で完結する処理であれば、
同一スレッドなら通常のメソッドで十分です。
別スレッドで実行したいだけなら、非同期処理で足ります。
ワーカーが必要になるのは、
別スレッドで継続的に処理を行い
内部に状態を持ち
順番制御を行いたい場合
に限られます。

コメント