vaguely

和歌山に戻りました。ふらふらと色々なものに手を出す毎日。

【C#】Task の話 その 3

はじめに

前回満を持して?登場した ThreadPool クラスを追いかけますよ、というお話。

ThreadPool について

まず ThreadPool とは?という話から。

スレッドの生成には時間・リソース的な負荷が大きい、という問題があります。

プログラミング .NET Framework によると、Thread の生成時に 400 もの dll がロードされるとか。。。

これをスレッドが必要になるたびに繰り返すのは無駄が大きいため、生成したスレッドを使用後も破棄せず置いておき、必要に応じて再利用する、ためのもの、と理解しています。

Microsoft Docs によると、「タスクの実行、作業項目の送信、非同期 I/O の処理、他のスレッドの代理で行う待機、およびタイマーの処理に使用できるスレッドのプールを提供します」とのこと。

なおスレッドにはメインスレッド( UI スレッド)と、バックグラウンドで動くバックグラウンドスレッドがありますが、ここでプールされるのはバックグラウンドスレッドです。

ThreadPool クラスのコードを読む

では話を戻して ThreadPool.UnsafeQueueUserWorkItemInternal から追いかけてみましょう。

ThreadPool.cs

~省略~
internal static void UnsafeQueueUserWorkItemInternal(object callBack, bool preferLocal)
{
    Debug.Assert((callBack is IThreadPoolWorkItem) ^ (callBack is Task));

    EnsureVMInitialized();

    ThreadPoolGlobals.workQueue.Enqueue(callBack, forceGlobal: !preferLocal);
}
~省略~

名前からすれば処理が実行されるのは ThreadPoolGlobals.workQueue.Enqueue でしょう。

が、その前に気になるものがありますね。

VM 。。。?

EnsureVMInitialized()

ThreadPool.cs

~省略~
// The thread pool maintains a per-appdomain managed work queue.
// New thread pool entries are added in the managed queue.
// The VM is responsible for the actual growing/shrinking of 
// threads. 
private static void EnsureVMInitialized()
{
    if (!ThreadPoolGlobals.vmTpInitialized)
    {
        EnsureVMInitializedCore(); // separate out to help with inlining
    }
}

[MethodImpl(MethodImplOptions.NoInlining)]
private static void EnsureVMInitializedCore()
{
    InitializeVMTp(ref ThreadPoolGlobals.enableWorkerTracking);
    ThreadPoolGlobals.vmTpInitialized = true;
}
~省略~

VM の初期化が終わっていなければ初期化をする、というところは特に疑問もないところです。

コメントから、 VM というのはスレッドを増やしたり減らしたりをコントロールする役割を担っているようです。

ThreadPool.cs

~省略~
[DllImport(JitHelpers.QCall, CharSet = CharSet.Unicode)]
private static extern void InitializeVMTp(ref bool enableWorkerTracking);
~省略~

dll へと処理が移ってしまいました。

QCall 、というクラスがあるのか。。。?と思ったのですが、該当のメソッドは見つかりませんでした。

メソッド名から考えると、下記の ThreadPoolNative クラスが呼ばれているようです。

comthreadpool.cpp

~省略~
void QCALLTYPE ThreadPoolNative::InitializeVMTp(CLR_BOOL* pEnableWorkerTracking)
{
    QCALL_CONTRACT;

    BEGIN_QCALL;
    ThreadpoolMgr::EnsureInitialized();
    *pEnableWorkerTracking = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_EnableWorkerTracking) ? TRUE : FALSE;

    END_QCALL;
}
~省略~

で、この ThreadPoolMgr クラスというのは下記にあります。

。。。だんだん自分が何しているのかわからなくなってきましたが、まだまだ進めますよ。

win32threadpool.cpp

~省略~
void ThreadpoolMgr::EnsureInitialized()
{
    CONTRACTL
    {
        THROWS;         // Initialize can throw
        MODE_ANY;
        GC_NOTRIGGER;
    }
    CONTRACTL_END;

    if (IsInitialized())
        return;

    DWORD dwSwitchCount = 0;

retry:
    if (InterlockedCompareExchange(&Initialization, 1, 0) == 0)
    {
        if (Initialize())
            Initialization = -1;
        else
        {
            Initialization = 0;
            COMPlusThrowOM();
        }
    }
    else // someone has already begun initializing.
    {
        // wait until it finishes
        while (Initialization != -1)
        {
            __SwitchToThread(0, ++dwSwitchCount);
            goto retry;
        }
    }
}
~省略~

… ThreadpoolMgr::Initialize()

そっ

全く終わる気がしなくなってきたので、この先は次の機会にしたいと思います。

結局 VM とは誰なのか

プログラミング .NET Framework によると、 ThreadPool でスレッドを扱うのは CLR ということでした。

ということは、 .NET Core では CoreCLR ということになります。

そういえば途中で CLRConfig とか見たような気がしますね。

ThreadPoolGlobals.workQueue.Enqueue

ThreadPoolGlobals > ThreadPoolWorkQueue > Enqueue を見てみます。

ThreadPool.cs

~省略~
public void Enqueue(object callback, bool forceGlobal)
{
    Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task));

    if (loggingEnabled)
        System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback);

    ThreadPoolWorkQueueThreadLocals tl = null;
    if (!forceGlobal)
        tl = ThreadPoolWorkQueueThreadLocals.threadLocals;

    if (null != tl)
    {
        tl.workStealingQueue.LocalPush(callback);
    }
    else
    {
        workItems.Enqueue(callback);
    }

    EnsureThreadRequested();
}
~省略~

ThreadLocal

変数を全スレッドで共有するのではなく、スレッドごとに保持できるようにするもののようです。

今回は forceGlobal が true (のハズ)なので、 workItems.Enqueue が実行されます。

workItems.Enqueue

workItems というのは ThreadPoolGlobals > ThreadPoolWorkQueue > ConcurrentQueue< object> です。

この Enqueue を見てみます。

Queue クラスは以前触れてみましたが、 ConcurrentQueue はマルチスレッドで利用できる Queue クラスです。

スレッドセーフ

ConcurrentQueue はスレッドセーフとのことですが、複数のスレッドから同時に呼び出されても正しい順番でデータの追加や取り出しができるようです。

この仕組みについても気になるところではありますが、今回はスキップすることにします。

ThreadPool.cs

~省略~
internal void EnsureThreadRequested()
{
    //
    // If we have not yet requested #procs threads from the VM, then request a new thread
    // as needed
    //
    // Note that there is a separate count in the VM which will also be incremented in this case, 
    // which is handled by RequestWorkerThread.
    //
    int count = numOutstandingThreadRequests;
    while (count < ThreadPoolGlobals.processorCount)
    {
        int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count);
        if (prev == count)
        {
            ThreadPool.RequestWorkerThread();
            break;
        }
        count = prev;
    }
}
~省略~

ThreadPoolGlobals.processorCount の中身は Environment.ProcessorCount であり、実行している PC のプロセッサー数が取得できます。

ちなみに私の PC で実行したところ、 8 と返ってきました。

Interlocked.CompareExchange

Microsoft Docks によると、 numOutstandingThreadRequests と count が等しい場合、numOutstandingThreadRequests に count + 1 の値が入る、という処理とのこと。

戻り値は numOutstandingThreadRequests の元の値です。

よくわからなかったので実際に試してみることにしました。

var numOutstandingThreadRequests = 0;
var count = 0;
while (count < Environment.ProcessorCount)
{
    int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count);
    Console.WriteLine("pre " + prev + " num " + numOutstandingThreadRequests + " cou " + count);
    count = prev;
}

結果は下記のとおりです。

pre 0 num 1 cou 0
pre 1 num 1 cou 0
pre 1 num 2 cou 1
pre 2 num 2 cou 1
pre 2 num 3 cou 2
pre 3 num 3 cou 2
pre 3 num 4 cou 3
pre 4 num 4 cou 3
pre 4 num 5 cou 4
pre 5 num 5 cou 4
pre 5 num 6 cou 5
pre 6 num 6 cou 5
pre 6 num 7 cou 6
pre 7 num 7 cou 6
pre 7 num 8 cou 7
pre 8 num 8 cou 7

prev と count が等しければ ThreadPool.RequestWorkerThread() が実行されて終了となるので、ループは 3 回回ることになります。

ThreadPool.RequestWorkerThread() の中身は。。。

ThreadPool.cs

~省略~
[DllImport(JitHelpers.QCall, CharSet = CharSet.Unicode)]
internal static extern bool RequestWorkerThread();
~省略~

そっ

※これも今後追いかけてみたいところです。

おわりに

ということで、 Task.Run で始まった処理は ConcurrentQueue に Queue として Task.Run の引数である delegate メソッドを追加し、バックグラウンドスレッドをリクエストしたところで終わりました。

このあと OS 側でスレッドの処理が実行される、ということになると考えられます。

Task クラス完全に理解した

先ほどのコードの中にも色々端折った部分としてエラー処理や、 Task 同士に親子関係を持たせる、といったところがあったり、今 Task を使うとなればほぼ確実に登場する、 async/await も気になるところ。

ということで引き続き Task クラスに関連して色々調べてみたいと思います。

参照