C#で直列処理とパイプライン処理で測定結果の比較

コンピュータ

直列処理をワーカースレッド+キューで行う多段のパイプライン処理で処理時間がどれほどの差があるか確認してみました。

処理内容は、次の手順になります。
1.画像ファイルを読み込み
2.デコード
3.拡大縮小処理
4.PNG形式で保存

直列処理は1~4までの各処理時間の合計が処理時間となります。
パラレルの場合、1~4が独自のスレッドで連動して処理されるため、連続処理において一番遅い処理工程の処理時間が1~4全体の処理時間になります。
ちなみにデコードが一番遅いので、直列処理は1~4の処理時間のところ、パイプラインは2の処理時間に短縮されることが期待できます。

ファイル名:ImageCacheDto.cs

using System.IO;
using System.Windows.Media.Imaging;

public class ImageCacheDto
{
    public string InputPath { get; init; }
    public string OutputPath { get; init; }
    public int Width { get; init; }
    public int Height { get; init; }

    public ImageCacheDto(string inputPath, string outputPath, int width, int height)
    {
        InputPath = inputPath;
        OutputPath = outputPath;
        Width = width;
        Height = height;
    }

    public MemoryStream? Stream { get; set; }
    public BitmapSource? Bitmap { get; set; }
}

ファイル名:ImageHelper.cs

using System.IO;
using System.IO.Compression;
using System.Windows;
using System.Windows.Media;
using System.Windows.Media.Imaging;

public static class ImageHelper
{
    // --- ① 読み込み・デコード・リサイズはそのまま ---
    public static MemoryStream LoadFile(string path)
    {
        const int bufferSize = 16 * 1024 * 1024;
        byte[] buffer = new byte[bufferSize];
        MemoryStream ms = new();

        if (path.Contains('|'))
        {
            // ZIPファイルのパスと内部ファイル名を分割
            var parts = path.Split('|', 2);
            string zipPath = parts[0];
            string entryName = parts[1];

            using var zip = ZipFile.OpenRead(zipPath);
            var entry = zip.GetEntry(entryName);
            if (entry == null)
                throw new FileNotFoundException($"ZIP内にファイルが見つかりません: {entryName}");

            using var zs = entry.Open();
            int bytesRead;
            while ((bytesRead = zs.Read(buffer, 0, buffer.Length)) > 0)
            {
                ms.Write(buffer, 0, bytesRead);
            }
        }
        else
        {
            // 通常ファイル読み込み
            using var fs = new FileStream(path, FileMode.Open, FileAccess.Read);
            int bytesRead;
            while ((bytesRead = fs.Read(buffer, 0, buffer.Length)) > 0)
            {
                ms.Write(buffer, 0, bytesRead);
            }
        }

        ms.Position = 0;
        return ms;
    }

    public static BitmapSource Decode(MemoryStream ms)
    {
        try {
            var bitmap = new BitmapImage();
            bitmap.BeginInit();
            bitmap.StreamSource = ms;
            bitmap.CacheOption  = BitmapCacheOption.OnLoad;
            bitmap.CreateOptions = BitmapCreateOptions.IgnoreColorProfile
                                | BitmapCreateOptions.PreservePixelFormat;
            bitmap.EndInit();
            bitmap.Freeze();
            return bitmap;

        }
        finally {
            ms.Dispose();
        }
    }

    // スケーリングだけを行う
    public static BitmapSource ScaleToFit(BitmapSource image, int maxWidth, int maxHeight)
    {
        double scale;

        if (maxWidth > 0 && maxHeight > 0)
        {
            // 両方指定:縦横どちらかに収まるように縮小
            scale = Math.Min((double)maxWidth / image.PixelWidth,
                            (double)maxHeight / image.PixelHeight);
        }
        else if (maxWidth > 0)
        {
            // 横だけ指定
            scale = (double)maxWidth / image.PixelWidth;
        }
        else if (maxHeight > 0)
        {
            // 縦だけ指定
            scale = (double)maxHeight / image.PixelHeight;
        }
        else
        {
            // 両方0:スケーリングしない
            image.Freeze();
            return image;
        }

        // scale ≒ 1 のときは変換不要
        if (Math.Abs(scale - 1.0) < 0.0001)
        {
            image.Freeze();
            return image;
        }

        var transformed = new TransformedBitmap(image, new ScaleTransform(scale, scale));
        transformed.Freeze();
        if (maxWidth <= 0 || maxHeight <= 0)
        {
            return transformed;
        }

        int x = (maxWidth  - transformed.PixelWidth ) / 2;
        int y = (maxHeight - transformed.PixelHeight) / 2;

        var dv = new DrawingVisual();
        using (var ctx = dv.RenderOpen())
        {
            ctx.DrawImage(image, new Rect(x, y, transformed.PixelWidth, transformed.PixelHeight));
        }

        var render = new RenderTargetBitmap(maxWidth, maxHeight, 96, 96, PixelFormats.Pbgra32);
        render.Render(dv);
        render.Freeze();
        return render;
    }
    // --- ② PNG へエンコードしてメモリに保持 ---
    public static void SavePng(BitmapSource image, string path)
    {
        var encoder = new PngBitmapEncoder();
        encoder.Frames.Add(BitmapFrame.Create(image));
        using var fs = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None);
        encoder.Save(fs);
    }
}

ファイル名:PipelineStage.cs

// パイプラインステージ
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

public class PipelineStage<TIn, TOut>
{
    private readonly Func<TIn, Task<TOut>> _handler;
    private Channel<TIn> _input = Channel.CreateUnbounded<TIn>();
    private Channel<TOut> _output = Channel.CreateUnbounded<TOut>();
    private readonly CancellationToken _token;

    public ChannelWriter<TIn> Input => _input.Writer;
    public ChannelReader<TOut> Output => _output.Reader;
    public Task? WorkerTask { get; private set; }

    public PipelineStage(Func<TIn, Task<TOut>> handler, CancellationToken token = default)
    {
        _handler = handler;
        _token = token;
    }

    // 型安全なConnectFrom
    public void ConnectFrom<TPrev>(PipelineStage<TPrev, TIn> prevStage)
    {
        // 中継タスクを定義
        Task.Run(async () =>
        {
            await foreach (var item in prevStage.Output.ReadAllAsync(_token))
            {
                await _input.Writer.WriteAsync(item, _token);
            }
            _input.Writer.Complete();
        });
    }

    public async Task StartAsync()
    {
        WorkerTask = Task.Run(async () =>
        {
            try
            {
                await foreach (var item in _input.Reader.ReadAllAsync(_token))
                {
                    var result = await _handler(item);
                    await _output.Writer.WriteAsync(result, _token);
                }
            }
            catch (Exception ex)
            {
                Debug.Print($"[Error] {typeof(TIn).Name}→{typeof(TOut).Name}: {ex.Message}");
            }
            finally
            {
                _output.Writer.Complete();
            }
        }, _token);

        await Task.CompletedTask;
    }
}

ファイル名:Program.cs

using System.Diagnostics;
using System.IO;
using System.Security.Cryptography;
using System.Text;

class Program
{
    static void Sirial(string zipFile, string outputDir, int width, int height)
    {
        var files = GetImageFiles(zipFile);

        int w = width;
        int h = height;

        var sw = Stopwatch.StartNew();
        sw.Restart();
        foreach (var file in files)
        {
            string inputPath = file;
            string key = GetMD5($"{inputPath}-{w}-{h}");
            string outputPath = Path.Join(outputDir, $"{key}.png");

            var ms = ImageHelper.LoadFile(inputPath);
            var original = ImageHelper.Decode(ms);
            var scaled = ImageHelper.ScaleToFit(original, w, h);
            ImageHelper.SavePng(scaled, outputPath);
        }
        Console.WriteLine($"Sirial : {sw.ElapsedMilliseconds} ms");
        // Sirial : 9434 ms
    }
    static async Task Parallel(string zipFile, string outputDir, int width, int height)
    {
        var files = GetImageFiles(zipFile);

        int w = width;
        int h = height;

 

        var cts = new CancellationTokenSource();
        
        var stage1 = new PipelineStage<ImageCacheDto, ImageCacheDto>(s =>
        {
            s.Stream = ImageHelper.LoadFile(s.InputPath);
            return Task.FromResult(s);
        }, cts.Token);
        var stage2 = new PipelineStage<ImageCacheDto, ImageCacheDto>(s =>
        {
            if (s.Stream is not null)
            {
                s.Bitmap = ImageHelper.Decode(s.Stream);
                s.Stream.Dispose();
            }
            return Task.FromResult(s);
        }, cts.Token);
        var stage3 = new PipelineStage<ImageCacheDto, ImageCacheDto>(s =>
        {
            if (s.Bitmap is not null)
            {
                s.Bitmap = ImageHelper.ScaleToFit(s.Bitmap, s.Width, s.Height);
            }
            return Task.FromResult(s);
        }, cts.Token);
        var stage4 = new PipelineStage<ImageCacheDto, ImageCacheDto>(s =>
        {
            if (s.Bitmap is not null)
            {
                ImageHelper.SavePng(s.Bitmap, s.OutputPath);
            }
            return Task.FromResult(s);
        }, cts.Token);

        // 接続
        stage2.ConnectFrom(stage1);
        stage3.ConnectFrom(stage2);
        stage4.ConnectFrom(stage3);

        // 起動
        await stage1.StartAsync();
        await stage2.StartAsync();
        await stage3.StartAsync();
        await stage4.StartAsync();

        var sw = Stopwatch.StartNew();
        sw.Restart();

        // データ投入
        foreach (var file in files)
        {
            string inputPath = file;
            string key = GetMD5($"{inputPath}-{w}-{h}");
            string outputPath = Path.Join(outputDir, $"{key}.png");
            await stage1.Input.WriteAsync(new ImageCacheDto(inputPath, outputPath, w, h));
        }
        // データ投入終了
        stage1.Input.Complete();

        // 終了待ち
        await Task.WhenAll(
            stage1.WorkerTask!,
            stage2.WorkerTask!,
            stage3.WorkerTask!,
            stage4.WorkerTask!
        );
        Console.WriteLine($"Parallel : {sw.ElapsedMilliseconds} ms");
        // Parallel : 6316ms
    }
    static readonly System.Collections.Generic.HashSet<string>
    _inImagesExtensions = new (
        new string[] {".PNG", ".JPG", ".JPEG", ".GIF", ".BMP"});
    public static bool InImageExtensions(string imgFile)
    {
        var ext = System.IO.Path.GetExtension(imgFile).ToUpper();
        return _inImagesExtensions.Contains(ext);
    }
    public static IEnumerable<string> GetImageFiles(string location)
    {
        if (Path.GetExtension(location).Equals(".zip", StringComparison.CurrentCultureIgnoreCase))
        {
            using var zip = System.IO.Compression.ZipFile.OpenRead(location);
            return zip.Entries
                .Where(x => InImageExtensions(x.FullName))
                .Select(x => $"{location}|{x.FullName}");
        }
        else
        {
            return Directory.EnumerateFiles(location)
                .Where(x => InImageExtensions(x));
        }
    }
    // MD5ハッシュ計算
    public static string GetMD5(string input)
    {

        byte[] inputBytes = Encoding.UTF8.GetBytes(input);
        byte[] hashBytes = MD5.HashData(inputBytes);

        var sb = new StringBuilder();
        foreach (var b in hashBytes)
            sb.Append(b.ToString("x2"));

        return sb.ToString();
    }
    static async Task Main()
    {
        string zipFile = @"J:\testdata\Archive.zip";
        string outputDir = @"./.cache";

        if (!Directory.Exists(outputDir))
            Directory.CreateDirectory(outputDir);
        
        Sirial(zipFile, outputDir, 256, 256);
        // Sirial : 8976 ms
        await Parallel(zipFile, outputDir, 256, 256);
        // Parallel : 5812 ms
        
    }
}

プロジェクトはコンソールですが、.csprojは以下のように変更しています。
ファイル名:ImageResze_WPFBitmap02.csproj

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net8.0-windows</TargetFramework>
    <ImplicitUsings>enable</ImplicitUsings>
    <Nullable>enable</Nullable>
    <UseWPF>true</UseWPF>
  </PropertyGroup>

</Project>

速度が直列処理で9秒のところパイプラインでは6秒と概ね60%程度に処理時間が短縮されました。

コメント