WPFアプリに組み込むワーカーとキューの考え方

コンピュータ

まず、ワーカーは言語機能や特定のライブラリを指す言葉ではありません。

メインスレッド(WPFならUIスレッド)とは別のスレッドで処理を引き受ける、
プログラム上の設計(役割) を指します。

ワーカーは、ローカルで、同一プロセス内に立ち上げる小さなサーバーのようなものです。

通常、サーバーを使ったプログラミングでは、

「サーバーを利用する側」(クライアント)を書くか

「サーバーの処理ロジックを書く側」(サーバーサイド)を書くか

のどちらかになります。

しかしワーカーの場合は違います。

ワーカーは、サーバーそのものをアプリ内部に自作する ことになります。

使い所

ワーカーは、別スレッドで動作し、かつ、単発では完結しない継続的な処理を扱うための構造です。

単発で完結する処理であれば、

同一スレッドなら通常のメソッド(いわゆる関数)で十分です。

別スレッドで実行したいだけなら、非同期処理(Task や async/await)で足ります。

◯用途別分類

  • 単発・同一スレッド → 関数
  • 単発・別スレッド → 非同期処理
  • 継続的・別スレッド → ワーカー

例えば、非同期処理で実行回数を管理する場合、
カウンターは呼び出し元に置くことになり、別スレッドから更新するため排他制御が必要になります。

int _counter = 0;
object _lock = new();

async Task DoWorkAsync()
{
    await Task.Run(() =>
    {
        // 別スレッドで実行される
        lock (_lock)          // 排他制御が必要
        {
            _counter++;
        }
    });
}

一方ワーカーでは、カウンターを内部に持たせることができるため、
状態はワーカー専有となり、スレッド間の共有や排他制御を意識する必要がありません。

class CounterWorker
{
    private int _counter = 0;   // 状態はワーカー内部

    public Task<int> CountUpAsync()
    {
        // 依頼をキューに積むイメージ
        return EnqueueAsync(() =>
        {
            _counter++;         // 同一スレッド内で更新
            return _counter;
        });
    }

    private Task<T> EnqueueAsync<T>(Func<T> work)
    {
        // 実際はキュー+常駐ループ
        // イメージだけ
    }
}

最小構成

別スレッドで動かす

UIスレッドとは独立して動作させる必要があります。

そのために Task.Run() などで実行します。

常駐するループ

ワーカーは単発処理ではありません。

依頼を待ち続ける必要があるため、
内部に常駐ループを持ちます。

while (true)
{
    // 依頼を待つ
}

呼び出し元から依頼を受け取るキュー

ワーカーは直接呼び出されるのではなく、
依頼を受け取る仕組みが必要です。

そのために Channel などのキューを使います。

  1. 呼び出し元 → キューに積む
  2. ワーカー → キューから取り出す

という構造になります。

終了させるためのキャンセル機構

常駐ループは止められなければなりません。

そのため、

  1. CancellationToken
  2. Channelの完了通知

などで外部から停止できる仕組みを用意します。

実装例

ワーカー実装例(カウントアップ)

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

public sealed class CounterWorker : IDisposable
{
    private readonly Channel<TaskCompletionSource<int>> _channel;
    private readonly CancellationTokenSource _cts = new();
    private readonly Task _workerTask;

    private int _counter = 0;   // 内部状態

    public CounterWorker()
    {
        _channel = Channel.CreateUnbounded<TaskCompletionSource<int>>();

        // 別スレッドで常駐開始
        _workerTask = Task.Run(WorkerLoopAsync);
    }

    // 呼び出し側API
    public async Task<int> CountUpAsync()
    {
        var tcs = new TaskCompletionSource<int>(
            TaskCreationOptions.RunContinuationsAsynchronously);

        await _channel.Writer.WriteAsync(tcs);
        return await tcs.Task;
    }

    // ワーカーループ
    private async Task WorkerLoopAsync()
    {
        try
        {
            await foreach (var tcs in _channel.Reader.ReadAllAsync(_cts.Token))
            {
                // 疑似重処理
                await Task.Delay(500, _cts.Token);

                _counter++;          // 内部状態更新
                tcs.SetResult(_counter);
            }
        }
        catch (OperationCanceledException)
        {
            // 正常終了
        }
    }

    // 外部から停止
    public void Dispose()
    {
        _cts.Cancel();
        _channel.Writer.TryComplete();
        _workerTask.Wait();
        _cts.Dispose();
    }
}

使い方イメージ

var worker = new CounterWorker();

int a = await worker.CountUpAsync(); // 1
int b = await worker.CountUpAsync(); // 2
int c = await worker.CountUpAsync(); // 3

worker.Dispose(); // 停止

TaskCompletionSource<T>

Task を作り、その Task を外部から完了させることができるクラス。

呼び出し側:

var result = await worker.CountUpAsync();

内部では:

var tcs = new TaskCompletionSource<int>();
_channel.Writer.WriteAsync(tcs);
return tcs.Task;   // ここでTaskを返す

そしてワーカー側で:

_counter++;
tcs.SetResult(_counter);   // ← ここでTaskを完了させる

Channel<T>

Channelはスレッド間で安全にデータを受け渡すためのキューになります。


var channel = Channel.CreateUnbounded<int>();

書き込み側(Producer)

await channel.Writer.WriteAsync(1);

データをキューに入れます。


読み取り側(Consumer)

var value = await channel.Reader.ReadAsync();

キューから取り出します。


具体的には、次のような流れになります。

呼び出し元は、ワーカーに「処理の依頼」を出します。

ワーカーはその依頼を受け取り、
まずキュー(Channel)に積みます。

常駐しているワーカーループは、
キューから依頼を1つずつ取り出し、順番に実行します。

CancellationTokenSource

CancellationTokenSourceは処理を外部から止めるためのスイッチです。

var cts = new CancellationTokenSource();
var token = cts.Token;
  • CancellationTokenSource → キャンセルを発行する側
  • CancellationToken → キャンセルを受け取る側

止める側(外部):

cts.Cancel();

受け取る側(ワーカー):

while (!token.IsCancellationRequested)
{
    // 処理
}

又は

await Task.Delay(1000, token);

実装例:

private async Task WorkerLoopAsync(CancellationToken token)
{
    try
    {
        while (true)
        {
            token.ThrowIfCancellationRequested();

            var item = await channel.Reader.ReadAsync(token);

            // 処理
        }
    }
    catch (OperationCanceledException)
    {
        // 正常なキャンセル終了
    }
}

外部から cts.Cancel(); が呼ばれると、
ワーカーループ内の token.ThrowIfCancellationRequested() が
OperationCanceledException を送出します。

この例外を try-catch で捕まえることで、
ワーカーループを安全に終了させることができます。

キャンセルトークン付き Async メソッドとは

async メソッドは、外部からキャンセルできるように
CancellationToken を引数として受け取ることができます。

async Task DoWorkAsync(CancellationToken token)
{
    await Task.Delay(1000, token);
}

このtokenは、

「止めてほしい」という外部からの通知

を受け取るためのものです。

キャンセルトークン付きAsyncメソッドの基本形:

async Task DoWorkAsync(CancellationToken token)
{
    token.ThrowIfCancellationRequested(); // キャンセル要求チェック

    await Task.Delay(500, token); // キャンセルトークン付きのメソッドを呼び出す例

    token.ThrowIfCancellationRequested(); // キャンセル要求チェック

    // 処理
}

処理を行う場合は
token.ThrowIfCancellationRequested();
を実行して、キャンセル要求が来ていないかを確認します。

このチェックの粒度が細かいほど、
キャンセルはスムーズに動作します。

逆に、チェックの間隔が大きいと、
キャンセル要求が出されてもすぐには停止せず、
処理が続いてしまいます。

await できるメソッドには token を渡す

await Task.Delay(500, token);

今回のサンプルでは、ワーカーループを終了させるためのキャンセル機構となります。
個別のタスクをキャンセルする機構を組むことも出来ますが、今回の記事では扱いません。

さいごに

ワーカーは便利な設計手法ですが、
無理に使う必要はありません。

単発で完結する処理であれば、

同一スレッドなら通常のメソッドで十分です。

別スレッドで実行したいだけなら、非同期処理で足ります。

ワーカーが必要になるのは、

別スレッドで継続的に処理を行い

内部に状態を持ち

順番制御を行いたい場合

に限られます。

コメント