C#でワーカースレッド+キューを使ってキャッシュファイルの保存を非同期処理する方法

コンピュータ

以前、画像ファイルをキャッシュするプログラムを書きましたが、通常のファイルもキャッシュするプログラムを考えてみます。

ファイル名:FileCacheStore.cs


using System.Collections.Concurrent;
using System.Security.Cryptography;
using System.Text;

class FileCacheStore : IAsyncDisposable
{
    // ハッシュコードの生成
    public static string GetCacheKey(string key)
    {
        using var md5 = MD5.Create();
        byte[] inputBytes = Encoding.UTF8.GetBytes(key);
        byte[] hashBytes = md5.ComputeHash(inputBytes);
        var sb = new StringBuilder();
        foreach (var b in hashBytes)
        {
            sb.Append(b.ToString("x2"));
        }
        return sb.ToString();        
    }

   // ストリームの取得
    public static async Task<MemoryStream> LoadMemoryStreamAsync(string filePath)
    {
        var ms = new MemoryStream();

        using (var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read))
        {
            await fs.CopyToAsync(ms);
        }

        return ms;
    }
    // キャッシュファイルの保存
    public static async Task SaveToCacheAsync(string cacheFile, MemoryStream ms)
    {
        using (var stream = new FileStream(cacheFile, FileMode.Create, FileAccess.Write))
        {
            ms.Seek(0, SeekOrigin.Begin);
            await ms.CopyToAsync(stream);
        }
    }






    // キャッシュファイル保存ディレクトリ
    private readonly string _cacheDir;

    // キャッシュファイルの差台数
    private readonly int _cacheCapacity;

    // リクエストキュー
    private readonly BlockingCollection<string> _queue = [];

    // ワーカータスク
    private Task? _workerTask;

    // キャンセルトークンソース
    private readonly CancellationTokenSource _cts = new();

    // コンストラクタ
    public FileCacheStore(string cacheDir = "./.cacheDir", int cacheCapacity = 1000)
    {
        _cacheDir = cacheDir;
        if (!Directory.Exists(_cacheDir))
            Directory.CreateDirectory(_cacheDir);
        _cacheCapacity = cacheCapacity;

        // プリフェッチ(先読み)用ワーカーの起動
        StartPrefetcher();
    }
    // ワーカスレッドの完了待ち
    public async Task CompleteAsync()
    {
        // 完了を通知してから待つ
        _queue.CompleteAdding();
        try
        {
            if (_workerTask != null)
                await _workerTask;
        }
        catch (OperationCanceledException) { }
    }
    // 開放処理
    public async ValueTask DisposeAsync()
    {
        _cts.Cancel();           // ワーカーにキャンセルを伝える
        _queue.CompleteAdding(); // もう追加しない

        if (_workerTask != null)
        {
            try
            {
                await _workerTask; // 完了を待つ(ここで例外が投げられる可能性あり)
            }
            catch (OperationCanceledException)
            {
                // キャンセルなら無視
            }
            catch (ObjectDisposedException)
            {
                // Dispose済みで GetConsumingEnumerable が例外を投げたケース
            }
        }

        // 完全に使い終わった後に破棄する
        _queue.Dispose();
        _cts.Dispose();

    }
    // キャッシュの削除   
    private Task TrimCacheAsync(int maxFiles)
    {
        return Task.Run(() =>
        {
            var files = new DirectoryInfo(_cacheDir)
                .GetFiles("*.bin")
                .OrderBy(f => f.LastAccessTime)
                .ToList();

            int over = files.Count - maxFiles;
            if (over <= 0) return;

            foreach (var file in files.Take(over))
            {
                try
                {
                    file.Delete();
                }
                catch (Exception ex)
                {
                    System.Diagnostics.Debug.WriteLine($"[WARN] 削除失敗: {file.Name} → {ex.Message}");
                }
            }
        });
    }
    // 先読み開始
    void StartPrefetcher()
    {
        _workerTask = Task.Run(async () =>
        {
            try
            {
                foreach (var reqest in _queue.GetConsumingEnumerable(_cts.Token))
                {
                    // ハッシュコードを生成
                    string key = GetCacheKey(reqest);

                    // キャッシュファイルが有る場合次へ
                    string cacheFile = Path.Join(_cacheDir, $"{key}.bin");
                    if (File.Exists(cacheFile)) continue;

                    // ファイルの読み込み
                    using (var ms = await LoadMemoryStreamAsync(reqest))
                    {
                        // キャッシュファイル保存
                        await SaveToCacheAsync(cacheFile, ms);
                        await TrimCacheAsync(_cacheCapacity);
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // キャンセルは正常終了として無視
            }
        }, _cts.Token);
}

    // キューにリクエストを追加
    public void EnqueueFile(string request) => _queue.Add(request);

    // ファイルをロード
    public async Task<MemoryStream> LoadFileAsync(string filePath)
    {
        // ハッシュコードを生成
        string key = GetCacheKey(filePath);

        // キャッシュファイルが有る場合はキャッシュを返す。
        string cacheFile = Path.Join(_cacheDir, $"{key}.bin");
        if (File.Exists(cacheFile))
        {
            System.Diagnostics.Debug.Print("Hit");
            // ファイルの読み込み
            return await LoadMemoryStreamAsync(cacheFile);
        }

        // キャッシュにない場合はオリジナルを返す。
        return await LoadMemoryStreamAsync(filePath);
    }    

}

ファイル名:Program.cs


public class Program
{
    static async Task Main()
    {
        FileCacheStore _fileCacheStore = new();

        string filePath1 = "G:\\csharp\\console\\FileCacheStore01\\hello.txt";
        string filePath2 = "G:\\csharp\\console\\FileCacheStore01\\hello2.txt";
        string filePath3 = "G:\\csharp\\console\\FileCacheStore01\\hello3.txt";

        _fileCacheStore.EnqueueFile(filePath1);
        _fileCacheStore.EnqueueFile(filePath2);
        _fileCacheStore.EnqueueFile(filePath3);

        using (var ms = await _fileCacheStore.LoadFileAsync(filePath3))
        {
            string text = System.Text.Encoding.UTF8.GetString(ms.ToArray());
            Console.WriteLine(text);
        }

        await _fileCacheStore.DisposeAsync();
    }
}

コメント