非同期プログラミング

プログラムが外界と対話する必要がある場合,例えば,インターネットを介して他のマシンと 通信するような場合,プログラム内の操作は予測できない順序で行われる必要がある場合があります. 例えば,プログラムがファイルをダウンロードする必要があるとします.ダウンロード操作を開始し, それが完了するのを待つ間に他の操作を行い,ダウンロードしたファイルが利用可能になったら ダウロードしたファイルを必要とするコードを再開したいとします.この種のシナリオは非同期 プログラミングの領域に該当しますが,(概念的には複数のことが同時に起こるので)並行プログラミングと呼ばれることもあります.

これらのシナリオに対応するために,JuliaはTasksを提供しています (対称的コルーチン,軽量スレッド,協調型マルチタスク,あるいはワンショット連続処理 など,他にもいくつかの名前で知られています).ある計算作業(実際には特定の関数の実行) がTaskとして指定された場合,別のTaskに切り替えることで,その作業を中断 することが可能になります.元のTaskは後で再開することができ,中断したところから再開されます.最初はこれは関数呼び出しと似ているように見えるかもしれません.しかし2つの重要な 違いがあります.第一に,タスクの切り替えはスペースを使わないので,コールスタックを消費せず に何度でもタスクを切り替えることができます.第二に,タスク間の切り替えは任意の順序で行う ことができます.関数呼び出しとは異なり,呼び出された関数は呼び出した関数に制御が戻る前に 実行を終了しなければなりません.

基本的なTask操作

Taskは実行される計算作業の単位のハンドルと考えることができます.これは作成・開始・ 実行・終了のライフサイクルを持っています.タスクは,実行する0引数関数で,Task コンストラクタを呼び出すか,@taskマクロを使用して作成されます.

julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0

@task xTask(()->x)と等価です.

このタスクは5秒間待ったのち,doneを表示しますが,まだ実行を開始していません. 準備ができたらscheduleを呼び出すことでいつでも実行できます.

julia> schedule(t);

これをREPLで試してみると,scheduleがすぐに戻ってくるのがわかると思います. これは実行するタスクの内部キューに単純にtを追加するだけだからです. そして,REPLは次のプロンプトを表示して,さらなる入力を待ちます. キーボード入力を待つことで,他のタスクを実行する機会を提供するので,その時点でt が開始されます.tsleepを呼び出し,タイマを設定して実行を停止します. 他のタスクがスケジュールされている場合は,そのタスクを実行することができます. 5秒後,タイマが作動してtが再起動されると,doneが表示されているのを見ることができます. tはこれで終了されます.

wait関数は,他のタスクが終了するまで呼び出したタスクをブロックします. したがって,例えば,scheduleだけを呼び出す代わりに,以下を入力すると,次の 入力プロンプトが表示される前に5秒間の一時停止が表示されます.

julia> schedule(t); wait(t)

これはREPLが先に進む前にtが終わるのを待っているからです.

タスクを作成してすぐにスケジュールしたいというのはよくあることなので, そのために@asyncマクロが用意されています.@async xschedule(@task x) と等価です.

チャネルとの通信

いくつかの問題では,必要とされる作業の様々な部分は,当然のことながら関数の呼び出しによって関連 づけられてはいません,すなわち必要なジョブの間には,明らかな「呼び出し元」や「呼び出し先」は 存在しません.例えば,あるプロシージャが値を生成し,別のプロシージャが値が消費しているような, プロデューサ-コンシューマ問題があります.コンシューマは,単純にプロデューサ関数を呼び出して値を 取得することはできません,なぜならプロデューサは生成すべき値を更に多く持っている可能性があり, まだ返す準備ができていない可能性があるためです.タスクでは,プロデューサとコンシューマは必要に 応じて値を前後に渡しながら,必要なだけ実行することができます.

Juliaはこの問題を解決するためにChannelメカニズムを提供します.Channelとは FIFOの待ち行列で,複数のタスクが読み書きできるようになっています.

put!呼び出しで値を生成するプロデューサタスクを定義してみましょう.値をコンシューム するには,新しいタスクを実行するようにプロデューサをスケジュールする必要があります. 1-引数関数を引数として受け付ける特殊なChannelコンストラクタを使用して,チャネルに バインドされたタスクを実行することができます.そして,チャネルオブジェクトから繰り返しtake!することができます.

julia> function producer(c::Channel)
           put!(c, "start")
           for n=1:4
               put!(c, 2n)
           end
           put!(c, "stop")
       end;

julia> chnl = Channel(producer);

julia> take!(chnl)
"start"

julia> take!(chnl)
2

julia> take!(chnl)
4

julia> take!(chnl)
6

julia> take!(chnl)
8

julia> take!(chnl)
"stop"

この動作を考える一つの方法は,producerが複数回返すことができたということです. put!の呼び出しの間に,プロデューサの実行は中断され,コンシューマが制御を持ちます.

返されたChannelは,forループ内で反復可能なオブジェクトとして使用することができ, その場合,ループ変数は生成された全ての値を引き継ぎます.チャネルがクローズされると,ループが終了します.

julia> for x in Channel(producer)
           println(x)
       end
start
2
4
6
8
stop

プロデューサでチャネルを明示的に閉じる必要はありませんでした.これはChannelTaskにバインドするという行為が,チャネルのオープンライフタイムとタスクのオープンライフタイム を関連付けているからです.タスクが終了すると,チャネルオブジェクトは自動的に閉じられます. 複数のチャネルをタスクにバインドすることもできますし,その逆も可能です.

Taskのコンストラクタは,0-引数の関数を期待しますが,タスクにバインドされたチャネルを 生成するChannelメソッドは,Channel型の1つの引数を受け入れる関数を期待します. 一般的なパターンは,プロデューサがパラメータ化されている場合で,この場合0または1引数の anonymous functionを作成するために,部分的な関数アプリケーションが 必要になります.

Taskオブジェクトの場合,これは直接または便利なマクロを使用して行うことができます:

function mytask(myarg)
    ...
end

taskHdl = Task(() -> mytask(7))
# or, equivalently
taskHdl = @task mytask(7)

より高度な処理分配パターンを編成するために,bindscheduleは, TaskChannelのコンストラクタと組み合わせて使用し,チャネルのセットと プロデューサ/コンシューマのタスクのセットを明示的にリンクさせることができます.

チャネルの詳細

チャネルはパイプとして可視化することができます.言い換えれば書き込み側と読み込み側があります:

  • 異なるタスクの複数のライタが同じチャネルに対して,put!を呼び出して,並行に書き込みを行うことができます

  • 異なるタスクの複数のリーダがtake!を呼び出して,並行にデータを読み込めます.

  • 以下は例です:

    # Given Channels c1 and c2,
    c1 = Channel(32)
    c2 = Channel(32)
    
    # and a function `foo` which reads items from c1, processes the item read
    # and writes a result to c2,
    function foo()
        while true
            data = take!(c1)
            [...]               # process data
            put!(c2, result)    # write out result
        end
    end
    
    # we can schedule `n` instances of `foo` to be active concurrently.
    for _ in 1:n
        @async foo()
    end
  • チャネルはChannel{T}(sz)コンストラクタで作成されます.チャネルはT型のオブジェクトのみを保持します.タイプが指定されていない場合は,任意のタイプのオブジェクトを保持することができます.szはチャネルに保持できる要素の最大数を指定します.例えば,Channel(32)は任意の型のオブジェクトを最大32個保持できるチャネルを作成します.Channel{MyType}(64)MyType型のオブジェクトをいつでも最大64個まで保持することができます.

  • Channelが空であれば,データが利用可能になるまで,(take!呼び出し内の)リーダはブロックします.

  • Channelがいっぱいのとき,利用可能なスペースができるまで,(put!呼び出し内の)ライタはブロックします.

  • isreadyはチャネル内にオブジェクトが存在するかどうかをテストし,waitはオブジェクトが利用可能になるのを待ちます.

  • Channelは初期状態ではオープン状態になっています.これはtake!put!の呼び出しで自由に読み書きできることを意味します.closeChannelをクローズします.クローズされたChannelでは,put!は失敗します.例えば:

    julia> c = Channel(2);
    
    julia> put!(c, 1) # `put!` on an open channel succeeds
    1
    
    julia> close(c);
    
    julia> put!(c, 2) # `put!` on a closed channel throws an exception.
    ERROR: InvalidStateException("Channel is closed.",:closed)
    Stacktrace:
    [...]
  • クローズされたチャネル上のtake!と(値を取得するが削除はしない)fetchは,それが空になるまで既存の値を返すことに成功しています.以下は上記の例の続きです:

    julia> fetch(c) # Any number of `fetch` calls succeed.
    1
    
    julia> fetch(c)
    1
    
    julia> take!(c) # The first `take!` removes the value.
    1
    
    julia> take!(c) # No more data available on a closed channel.
    ERROR: InvalidStateException("Channel is closed.",:closed)
    Stacktrace:
    [...]

タスク間通信にチャネルを使用した簡単な例を考えてみましょう.単一のjobsチャネルからデータを処理 するために4つのタスクを開始します.id (job_id)で識別されたジョブがチャネルに書き込まれます. このシミュレーションでは,各タスクはjob_idを読み込み,ランダムな時間だけ待機し,job_idと シミュレートされた時間のタプルを結果チャネルに書き戻します.最後に全てのresultsが出力されます.

julia> const jobs = Channel{Int}(32);

julia> const results = Channel{Tuple}(32);

julia> function do_work()
           for job_id in jobs
               exec_time = rand()
               sleep(exec_time)                # simulates elapsed time doing actual work
                                               # typically performed externally.
               put!(results, (job_id, exec_time))
           end
       end;

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> @async make_jobs(n); # feed the jobs channel with "n" jobs

julia> for i in 1:4 # start 4 tasks to process requests in parallel
           @async do_work()
       end

julia> @elapsed while n > 0 # print out results
           job_id, exec_time = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds")
           global n = n - 1
       end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311

より多くのタスク操作

タスク操作はyieldtoと呼ばれる低レベルのプリミティブで構築されています. yieldto(task, value)は現在のタスクを一時停止し,指定されたtaskに切り替え,その最後の yieldto呼び出しで指定されたvalueを返します.yieldtoはタスクスタイルの 制御フローをしようするために必要な唯一の操作であることに注意してください,すなわち呼び出して 返すのではなく,常に別のタスクに切り替えているだけなのです.これが,この機能が「対称型コルーチン」 と呼ばれる理由です.それぞれのタスクは同じメカニズムを使って別のタスクへ切り替えたり,別のタスクからきりかえられたりしているということです.

yieldtoは強力ですが,タスクのほとんどの用途では直接呼び出されません.その理由を考えて みましょう.現在のタスクから離れて切り替えた場合,どこかの時点で元のタスクに戻りたいと思うでしょうが, いつ元のタスクに悖るのか,どのタスクが元のタスクに戻るのかを知るには,かなりの調整が必要になります. 例えば,put!take!はブロッキング操作で,チャネルのコンテキストで使用される場合, コンシューマが誰であるのかを記憶するために状態を維持します.put!が低レベルのyieldto よりも使いやすいのは,手動でコンシューマタスクを追跡する必要がないからです.

yieldtoに加えて,タスクを効率的に使うためには,いくつかの基本的な機能が必要です.

  • current_taskは現在実行されているタスクの参照を取得します.
  • istaskdoneはタスクが終了したかどうかを検索します.
  • istaskstartedはタスクがまだ実行されているかを検索します.
  • task_local_storageは現在のタスクに固有のキーバリューストアを操作します.

タスクとイベント

ほとんどのタスクの切り替えはI/Oリクエストなどのイベントを待つ結果として発生し,Julia Baseに含まれる スケジューラによって実行されます.スケジューラは実行可能なタスクのキューを保持し,メッセージ到着など の外部イベントに基づいてタスクを再起動するイベントループを実行します.

イベントを待つための基本的な関数はwaitです.いくつかのオブジェクトがwaitを 実装しています.例えば,Processオブジェクトが与えらえた時,waitはそれが終了するのを 待ちます.waitは暗黙の裡に実装されることが多く,例えばデータが利用可能になるのを待つ ためのread呼び出しの中で,waitが発生することがあります.

これら全ての場合において,waitは最終的に,キューイングとタスクの再起動を担当している Conditionオブジェクト上で動作します.タスクがCondition上でwaitを 呼び出すと,そのタスクは実行不可能とマークされ,コンディションのキューに追加され,スケジューラに 切り替わります.スケジューラは実行する別のタスクを選択したり,外部イベントの待ち受けをブロックしたり します.全てがうまくいけば,最終的にはイベントハンドラがその条件のnotifyを呼び出し, その条件を待っているタスクが再び実行可能になります.

Taskを呼び出して明示的に作成されたタスクは,はじめはスケジューラに知られていません. これにより必要に応じてyieldtoを使って手動でタスクを管理することができます. しかし,このようなタスクがイベントを待っていても,イベントが発生すると,想定通りに自動的に再起動されます.