WPFでChannelを使ったワーカーのデモプログラムをFuncを使いライブラリ化

コンピュータ

Channelを使ったワーカーの処理を確認しましたが、アプリに組み込む場合ライブラリ化出来るとコードが減って幸せになります。

今回は、Funcとジェネリック(T型)を使いワーカーの処理部分を再利用できるライブラリにしてみました。

ソースコード

ファイル名:WpfChannelWorkerFuncDemo.csproj

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <OutputType>WinExe</OutputType>
    <TargetFramework>net10.0-windows</TargetFramework>
    <Nullable>enable</Nullable>
    <ImplicitUsings>enable</ImplicitUsings>
    <UseWPF>true</UseWPF>
  </PropertyGroup>

</Project>

ファイル名:App.xaml.cs

using System.Windows;


using MwChannelWorker;

namespace WpfChannelWorkerFuncDemo;

public partial class App : Application
{
    /// <summary>
    /// アプリ全体で共有するワーカー
    /// AppWorkerContextはアプリごとにワーカーで必要なプロパティを定義すること。
    /// </summary>
    public static  ChannelWorker<AppWorkerContext> Worker { get; }
        = new(new AppWorkerContext());

    // 将来、後始末が必要になったら以下コードを有効化する。
    //
    // 例:
    // - 設定ファイルの保存
    // - ワーカーの停止(Channel.Complete)
    // - 外部リソースの解放
    // - ログのフラッシュ
    //
    // protected override void OnExit(ExitEventArgs e)
    // {
    //     TODO:
    //     - Worker.Stop()
    //     - Channel.Complete()
    //     - 実行中処理の終了待ち
    //     base.OnExit(e);
    // }
}

ファイル名:AppWorkerContext.cs

// アプリで使用するワーカーに引数として渡したいプロパティを定義するクラス。
// アプリごとに内容が異なることになります。
// 
namespace WpfChannelWorkerFuncDemo;

public sealed class AppWorkerContext
{
    public int Counter {get; set;} = 0;
}

ファイル名:MainWindow.xaml.cs

using System.Windows;
using MwChannelWorker;

namespace WpfChannelWorkerFuncDemo;

public partial class MainWindow : Window
{
    // コンストラクタ
    public MainWindow()
    {
        InitializeComponent();

        // イベントとメソッドを紐づけ
        BtnCountUp.Click += BtnCountUp_Click;
        BtnReset.Click += BtnReset_Click;
    }
    // カウントアップボタン
    private async void BtnCountUp_Click(object sender, RoutedEventArgs e)
    {
        // 実行中を表示
        SetBusy(true, "Counting...");

        try
        {
            // ワーカーに処理を投げる
            WorkerResult<int> result
                = await App.Worker.Enqueue(async ctx =>
            {
                await Task.Delay(300);          // 疑似的な重い処理
                ctx.Counter++;                  // カウントアップ
                return new WorkerResult<int>    // 結果を返し await 後にUIスレッドへ復帰
                {
                    Value = ctx.Counter,
                };
            });

            // UIスレッドに戻りViewを書き換え
            TxtCounter.Text = result.Value.ToString();
            TxtStatus.Text = "Done.";
        }
        catch (Exception ex)
        {
            // 例外をメッセージ表示
            TxtStatus.Text = ex.Message;
        }
        finally
        {
            // 表示をリセット
            SetBusy(false, "");
        }
    }
    // リセットボタン
    private async void BtnReset_Click(object sender, RoutedEventArgs e)
    {
        // リセット中を表示
        SetBusy(true, "Resetting...");

        try
        {
            // ワーカーに処理を投げる
            WorkerResult<int> result
                = await App.Worker.Enqueue(async ctx =>
            {
                ctx.Counter = 0;                // 0にリセット
                return new WorkerResult<int>    // 結果を返し await 後にUIスレッドへ復帰
                {
                    Value = ctx.Counter,
                };
            });

            // UIスレッドに戻りViewを書き換え
            TxtCounter.Text = result.Value.ToString();
            TxtStatus.Text = "Reset done.";
        }
        catch (Exception ex)
        {
            TxtStatus.Text = ex.Message;
        }
        finally
        {
            SetBusy(false, "");
        }
    }

    // メッセージ表示
    private void SetBusy(bool busy, string message)
    {
        BtnCountUp.IsEnabled = !busy;   // busyの場合ボタンを無効化
        BtnReset.IsEnabled = !busy;     // busyの場合ボタンを無効化
        TxtStatus.Text = message;
    }
}

ファイル名:Workers\ChannelWorker.cs

using System.Threading.Channels;

namespace MwChannelWorker;

// このコードはライブラリなので基本変更しない

/// <summary>
/// Channel + Func による直列ワーカー
///
/// - 処理は Enqueue によりキューイングされる
/// - ワーカーは単一スレッドで順次実行される
/// - TContext に状態を集約することで lock を不要にする
///
/// 想定用途:
/// - GUI アプリのアプリケーションロジック
/// - ツールアプリのバックグラウンド処理
/// - 状態を持つ非同期ワーカー
///
/// 非想定:
/// - 高並列 CPU 処理
/// - 並列実行が必要な処理
/// </summary>
public sealed class ChannelWorker<TContext>
    where TContext : class
{
    // ワーカーで処理するactionの引数。
    private readonly TContext _context;

    // ワーカーに対するリクエストのキュー
    private readonly Channel<Func<TContext, Task>> _channel =
        Channel.CreateUnbounded<Func<TContext, Task>>(
            new UnboundedChannelOptions
            {
                SingleReader = true,
                SingleWriter = false
            });
    /*
        Channel
        Queue<T> を、マルチスレッドや async/await で使いやすく進化させた『上位互換の非同期キュー』
    */
    /*
        Func<TContext, Task>
        Func ... デリゲート(関数ポインタみたいなもの)
        TContext ... 引数のジェネリック型
        Task ... 戻り値、タスクを返す。
    */
    /*
        UnboundedChannelOptions (Channelの設定オプション)
        オプション内容
        SingleReader読み取る側が常に 1 つだけなのでtrue。trueの場合、読み取り処理が最適化されます。
        SingleWriter書き込む側が複数なので false。trueの場合、内部のロックが簡略化され高速になります。
    */

    // コンストラクタ
    public ChannelWorker(TContext context)
    {
        // 引数でcontextをメンバーの_contextにセット
        // 引数がnull(無効)の場合例外を投げる。
        _context = context ?? throw new ArgumentNullException(nameof(context));
        // ワーカーループの開始
        _ = Task.Run(WorkerLoop);
    }

    // ワーカー(メッセージループみたいなもの)
    private async Task WorkerLoop()
    {
        // _channelにactionが溜まっていたら一つとりだし
        await foreach (var action in _channel.Reader.ReadAllAsync())
        {
            try
            {
                // actionを実行する。
                await action(_context);
            }
            catch (Exception ex)
            {
                // TODO:
                // - ILogger 連携
                // - イベント通知
                // - 統一エラーハンドラ
                System.Diagnostics.Debug.WriteLine(ex);
            }
        }
    }

    /// <summary>
    /// 戻り値なし処理をキューに追加
    /// </summary>
    public Task Enqueue(Func<TContext, Task> action)
    {
        var tcs = new TaskCompletionSource(
            TaskCreationOptions.RunContinuationsAsynchronously);
        /*
          TaskCompletionSource
          SetResult()やSetException()でタスクの終了を元スレッドに伝えることが出来ます。

          TaskCreationOptions.RunContinuationsAsynchronously(オプション)
          結果を元スレッドに通知するだけで、すぐ制御を戻す。
        */

        // _channelに処理を積む
        _channel.Writer.TryWrite(async ctx =>
        {
            // ラムダ式の内容がワーカーで処理する内容になる。
            try
            {
                // ctx ... コンテキストを引数にactionを実行する。
                await action(ctx);
                // 終了を元スレッドへ伝える。
                tcs.SetResult();
            }
            catch (Exception ex)
            {
                // 例外を元スレッドへ伝える。
                tcs.SetException(ex);
            }
        });

        // タスクを返す。
        return tcs.Task;
    }

    /// <summary>
    /// 戻り値あり処理をキューに追加
    /// </summary>
    public Task<TResult> Enqueue<TResult>(
        Func<TContext, Task<TResult>> action)
    {
        var tcs = new TaskCompletionSource<TResult>(
            TaskCreationOptions.RunContinuationsAsynchronously);

        _channel.Writer.TryWrite(async ctx =>
        {
            try
            {
                // actionからの結果をresultで受け取り
                var result = await action(ctx);
                // 結果を元スレッドへ伝える。
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        });

        // タスクを返す。
        return tcs.Task;
    }
}

ファイル名:Workers\WorkerResult.cs

namespace MwChannelWorker;

// このコードはライブラリなので基本変更しない

/// <summary>
/// ワーカー処理結果を表す汎用結果クラス
///
/// UI層とワーカー層の責務分離のため、
/// 単純な値ではなく Result オブジェクトで返却する。
///
/// 将来的に情報を追加しても API 破壊が起きにくい。
/// </summary>
public sealed class WorkerResult<T>
{
    /// <summary>
    /// 実行結果の値
    /// </summary>
    public required T Value { get; init; }

    /// <summary>
    /// 成功フラグ(未使用でも将来拡張用)
    /// </summary>
    public bool Success { get; init; } = true;

    /// <summary>
    /// UI 表示用メッセージ等
    /// </summary>
    public string? Message { get; init; }

    // 拡張候補:
    // public Exception? Error { get; init; }
    // public object? Tag { get; init; }
}

ファイル名:MainWindow.xaml

<Window x:Class="WpfChannelWorkerFuncDemo.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        Title="ChannelWorker Func Demo"
        Height="220"
        Width="420">

    <Grid Margin="12">
        <Grid.RowDefinitions>
            <RowDefinition Height="Auto"/>
            <RowDefinition Height="Auto"/>
            <RowDefinition Height="*"/>
        </Grid.RowDefinitions>

        <TextBlock FontSize="14"
                   Text="ChannelWorker Func方式"/>

        <StackPanel Grid.Row="1"
                    Orientation="Horizontal"
                    Margin="0,12,0,0">

            <Button x:Name="BtnCountUp"
                    Width="140"
                    Height="40"
                    Content="カウントアップ"
                    Margin="0,0,10,0"/>

            <Button x:Name="BtnReset"
                    Width="140"
                    Height="40"
                    Content="リセット"/>
        </StackPanel>

        <Border Grid.Row="2"
                Margin="0,12,0,0"
                Padding="10"
                BorderBrush="Gray"
                BorderThickness="1">

            <StackPanel>
                <TextBlock Text="Counter:"/>
                <TextBlock x:Name="TxtCounter"
                           FontSize="24"
                           Text="0"/>
                <TextBlock x:Name="TxtStatus"
                           Margin="0,8,0,0"
                           Foreground="DimGray"/>
            </StackPanel>

        </Border>
    </Grid>
</Window>

実行例

カウントアップボタンを押すと、数値が増えます。
その際、ウェイトを入れているので、若干待ちが発生しますが、ボタンが無効化される用にしています。

UI側でも二重実行を禁止していますが、もし二重実行をしたとしても、ワーカー側で順番に処理されるようなコードになっています。

ファイル構成

WpfChannelWorkerFuncDemo
│
├─ App.xaml
│   └─ StartupUri で MainWindow.xaml を起動
│
├─ App.xaml.cs
│   └─ アプリ全体で共有する ChannelWorker を保持
│
├─ MainWindow.xaml
│   └─ 画面レイアウト(ボタン・表示)
│
├─ MainWindow.xaml.cs
│   └─ UIイベント処理
│      └─ ワーカーへ処理を Enqueue
│
├─ AppWorkerContext.cs
│   └─ ワーカーが保持するアプリ状態
│
└─ Workers
    ├─ ChannelWorker.cs
    │   └─ Func + Channel による直列ワーカー本体
    │
    └─ WorkerResult.cs
        └─ ワーカー処理結果の共通戻り値クラス

使い方

この、ワーカーは任意の処理を行う為のライブラリです。

処理の内容はメインスレッド側のコードにラムダ式で記述することが出来るので、

アプリのプログラミング中に、ワーカー側に任せたい処理を、

ラムダ式で記述しワーカーに処理するように依頼することが出来ます。

 

まず、ワーカーを動かすにあたり、ワーカー全体で共通な引数を定義する必要があります。

サンプルコードでは、AppWorkerContext.cs部分です。

 

次に、ワーカーをアプリにメンバーとして所属させます。

サンプルコードでは App.xaml.csで行っています。

Appに所属することで、ワーカーがアプリのどこからでも呼び出すことが出来るようになります。

 

実際、ワーカーに処理を依頼している部分はMainWindow.xaml.csになります。

引数はAppWorkerContextという形でワーカー全体で共有する設計ですが、

戻り値は処理ごとに任意の型を設定する設計になっています。

 

実際ワーカーに依頼する処理を記述している部分ですが、

非同期処理ほど簡潔では無いですが、基本的に同じような考え方に成るように作りました。

            // ワーカーに処理を投げる
            WorkerResult<int> result
                = await App.Worker.Enqueue(async ctx =>
            {
                await Task.Delay(300);          // 疑似的な重い処理
                ctx.Counter++;                  // カウントアップ
                return new WorkerResult<int>    // 結果を返し await 後にUIスレッドへ復帰
                {
                    Value = ctx.Counter,
                };
            });

 

 

Workers下はライブラリですので、使い方だけ理解できればよいです。

また、ライブラリですので、他のアプリに流用することも出来ます。

コメント