複数プロセス処理と分散計算
分散メモリ型の並列計算の実装は,Julia同梱の標準ライブラリの一部としてDistributed
モジュールによって提供されます.
ほとんどの現代の計算機は2つ以上のCPUを搭載しており,クラスタ内で複数の計算機をまとめて用いることができます. これらの複数CPUの力を使いこなすことで,より多くの計算をより高速に行うことが可能となります. 性能に影響を及ぼすのは2つの大きな要因があります: CPUのスピードそのものと,CPUがメモリへアクセスする速度です. クラスタにおいては,ある特定のCPUが同じコンピュータ(ノード)内のRAMに最速でアクセスできることはほぼ自明です. おそらくもっと驚くべきことに,メインメモリとキャッシュの速度 が異なることが原因で,同様の問題が一般的なマルチコアラップトップにも関連してきます. したがって,優れたマルチプロセシング環境では,特定のCPUによってメモリチャンクの「所有権」を制御できるべきです. Juliaはメッセージパッシングに基づいてマルチプロセシング環境を提供し,別々のメモリドメイン内の複数プロセス上で プログラムを一度に実行することを可能にします.
Juliaのメッセージパッシングの実装は,MPI[1]などほかの環境とは異なるものとなっています. Julia内の通信は一般的には「一方向的(one-sided)」です,すなわちプログラマは2プロセスの操作の内,1つのプロセス のみを明示的に管理する必要があります.さらには,これらの操作は典型的には「メッセージ送信」や「メッセージ受信」 のようには見えず,ユーザ関数を呼び出すような高レベルな操作に似たものとなります.
Juliaにおける分散計算は,2つのプリミティブによって構成されます: リモートリファレンスとリモートコールです. リモートリファレンスは,任意のプロセスから,特定のプロセスに格納されているオブジェクトを参照するために使うことができるものです.リモートコールは1つのプロセスによるリクエストで,別の(同じでも良い)プロセス上で関数と引数を指定しながら呼び出すためのものです.
リモートリファレンスには2つのフレーバーがあります: Future
and RemoteChannel
です.
リモートコールは,Future
をその結果に返します.リモートコールは直ちに結果を返します, すなわちリモート呼び出しが別の場所で発生している間に,呼び出しを行ったプロセスは次の操作に進みます. あなたはリモートコールが終わるのをwait
を返されるFuture
上で呼ぶことで待つことができ, またfetch
を用いて結果のすべての値を取得することができます.
一方で,RemoteChannel
は上書き可能です.例えば,複数のプロセスが同じリモートのChannel
を参照することにより それらの処理を組み合わせることができます.
各プロセスは識別子を持ちます.インタラクティブなJuliaプロンプトを提供するプロセスは常に1というid
を持ちます. 並列操作に用いられるプロセスはデフォルトでは「ワーカ」として参照されます.プロセスが一つだけの時は,プロセス1が ワーカとしてとらえられます.そうでない場合は,ワーカはプロセス1以外のすべてのプロセスであるととらえられます. 結果として,pmap
のような並列処理メソッドから恩恵を得るためには,2つ以上のプロセスが必要となります. 長い計算がワーカ上で実行されている間にメインプロセスで他のことをやらせたい場合は,シングルプロセスを1つ足すことで恩恵を得られます.
これを試してみましょう.julia -p n
で始めると,n
はローカルマシン上にn個のワーカプロセスを提供します. 一般的にn
はマシン上のCPUスレッド数(論理コア数)と同じにするのが理にかなっています.-p
引数は,暗黙の内に Distributed
モジュールをロードすることに注意してください.
$ ./julia -p 2
julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)
julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.18526 1.50912
1.16296 1.60607
remotecall
の第一引数は呼び出される関数です.Juliaにおける並列プログラミングのほとんどは, 特定のプロセスや利用可能なプロセス数を参照しませんが,remotecall
はより細かい制御を 提供する低レベルのインタフェースと考えられています.remotecall
の第二引数は処理を行う プロセスのid
で,残りの引数は呼び出される関数に渡されます.
1行目ではプロセス2に2x2のランダム行列を構築するように求め,2行目ではこれに1を加えるように求めていることが 見て取れます.両方の計算結果は,2つのフューチャ,r
とs
で利用可能です.@spawnat
マクロは, 第一引数で指定されたプロセス上で第二引数内の表現を評価します.
リモートで計算された値がすぐに必要になることがあるかもしれません.これは典型的には,次のローカル操作で必要な データを取得するために,リモートオブジェクトから読み出しを行う時に起こります.この目的のために,remotecall_fetch
関数が存在します.これはfetch(remotecall(...))
と等価ですが,より効率的です.
julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.18526337335308085
getindex(r,1,1)
はr[1,1]
とequivalentであるため,この呼び出しはフューチャr
の 最初の要素をフェッチすることを覚えておいてください.
より簡単にするために,シンボル:any
を[@spawnat
]に渡すことができ,これにより操作を行う場所を選択します.
julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)
julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.38854 1.9098
1.20939 1.57158
ここで,私たちが1 .+ r
ではなく,1 .+ fetch(r)
を用いていることに注意してください.これはコードがどこで 実行されるのかを知ることができないため,一般的には,加算を行うプロセスにr
を移動させるのにfetch
が 必要になる場合があるからです.この場合,@spawnat
はr
を所有しているプロセス上で計算を実行するのに 十分賢いので,fetch
はno-opです(処理は行われません).
(@spawnat
は組み込みではなく,Juliaでmacroとして定義されているのは注記に値します. このような構造体を独自に定義することも可能です.)
覚えておくべき重要なことは,一度フェッチされると,Future
はその値をローカルにキャッシュする ということです.さらなるfetch
の呼び出しは,ネットワークホップを必要としません.すべての参照する Future
sをフェッチされると,リモートに格納されている値は削除される.
@async
は@spawnat
と似ていますが,ローカルプロセス上でしかタスクを動かしません. これを使って,各プロセスに「フィーダ」タスクを作成します.各タスクは計算が必要な次のインデックスを指定し, そのプロセスが終了するのを待ち,インデックスが無くなるまでこれを繰り返します.メインタスクが@sync
の 最終ブロック,すなわち制御を放棄し関数から戻る前にすべてのローカルタスクが完了するのを末ポイントに到達するまで, フィーダタスクは実行を開始しないことに注意してください. v0.7以降では,フィーダタスクは,すべて同じプロセス上で実行されるため,nextidx
を介して状態を共有することが できます.Task
が協調的にスケジュールされていたとしても,asynchronous I/Oのように, コンテキストによってはロックが必要になる場合があります. これは,コンテキストスイッチは良く定義されたポイント,この場合は remotecall_fetch
が呼ばれた時のみ発生する ことを意味します.これは現在の実装の状態であり,将来のJuliaのバージョンでは,M個のProcess
上でN個までTasks
を実行 する,M:N Threadingを可能にするために変更される 可能性があります.その場合,複数のプロセスに同時に単一のリソースへの読み書きを行わせるのはセーフでないため, nextidx
用のロック獲得/再解放モデルが必要になります.
コードの利用可能性とパッケージの読み込み
あなたのコードは,それを実行する全てのプロセスで利用可能でなければなりません.例えば, Juliaプロンプトに以下のように入力します:
julia> function rand2(dims...)
return 2*rand(dims...)
end
julia> rand2(2,2)
2×2 Array{Float64,2}:
0.153756 0.368514
1.15119 0.918912
julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))
Stacktrace:
[...]
プロセス1は関数rand2
を知っていましたが,プロセス2は知りませんでした.
ほとんどの場合,あなたはファイルやパッケージからコードをロードすることになりますが,どのプロセスが コードをロードするのかはかなり柔軟に制御することができます.以下のようなコードを含むDummyModule.jl
というファイルを考えてみましょう:
module DummyModule
export MyType, f
mutable struct MyType
a::Int
end
f(x) = x^2+1
println("loaded")
end
全てのプロセスにわたってMyType
を参照するためには,全てのプロセスでDummyModule.jl
をロードする 必要があります.include("DummyModule.jl")
を呼び出すと,単一のプロセス上でのみロードされます. 全てのプロセスでロードするには,@everywhere
マクロを使用します.(julia -p 2
でJuliaを開始します.):
julia> @everywhere include("DummyModule.jl")
loaded
From worker 3: loaded
From worker 2: loaded
いつものように,これはDummyModule
をどのプロセスのスコープにも入れません,using
またはimport
を 必要とします.さらに,DummyModule
を1つのプロセスのスコープに入れると,他のプロセスではスコープに入れません:
julia> using .DummyModule
julia> MyType(7)
MyType(7)
julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: MyType not defined
⋮
julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)
しかしながら,例えば,スコープに入っていないとしても,DummyModule
をロードしたプロセスに MyType
を送ることは可能です.
julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)
-L
フラグを使って起動時に複数のプロセスにファイルをプリロードしたり,ドライバスクリプトを 使って計算を駆動したりすることもできます:
julia -p <n> -L file1.jl -L file2.jl driver.jl
上の例のドライバスクリプトを実行しているJuliaプロセスは,対話型プロンプトを提供するプロセスと 同じようにid
として1を持ちます.
最後に,DummyModule.jl
がスタンドアロンファイルではなくパッケージである場合,using DummyModule
は全てのプロセスでDummyModule.jl
をロードしますが,using
が呼ばれたプロセスでのみスコープに入ります.
ワーカプロセスの開始と管理
基本となるJuliaのインストールでは,2種類のクラスタがサポートされています:
- 上で示した通り,
-p
オプションで指定されたローカルクラスタ. --machine-file
オプションを使ったマシンをまたいだクラスタ.これはパスワードなしのssh
ログインを
使用して,指定されたマシン上で(現在のホストと同じパスから)Juliaワーカプロセスを起動します.
addprocs
, rmprocs
, workers
などの関数が,クラスタ内のプロセスを追加, 削除,クエリするためのプログラム的な手段として利用できます.
julia> using Distributed
julia> addprocs(2)
2-element Array{Int64,1}:
2
3
モジュールDistributed
はaddprocs
を呼び出す前に,マスタプロセス上で明示的にロードされなければなりません. ワーカプロセス上では自動的に利用可能になります.
ワーカは~/.julia/config/startup.jl
スタートアップスクリプトを実行せず,またワーカはグローバル状態 (グローバル変数,新しいメソッド定義,ロードされたモジュール)を他の実行中のプロセスと同期させません. 特定の環境でワーカを初期化するために,addprocs(exeflags="--project")
を使用し,その後@everywhere using <modulename>
または@everywhere include("file.jl")
を使用することができます.
他のタイプのクラスタは,以下のClusterManagersで説明されているように,独自のカスタムClusterManager
を書くことでサポートすることができます.
データ移動
メッセージの送信とデータの移動は,分散プログラムのオーバーヘッドの大部分を占めています. メッセージの数と送信されるデータの量を減らすことは,パフォーマンスとスケーラビリティを達成するために非常に重要です. この目的のために,Juliaの様々な分散プログラミング構造によって実行されるデータ移動を理解することが重要です.
考えることができます.@spawnat
(といくつかの関連する構造)もデータを移動しますが,これは明らかではないので, 暗黙のデータ移動操作と呼ぶことができます.ランダム行列を構築して二乗するための2つのアプローチを考えてみましょう:
メソッド1:
julia> A = rand(1000,1000);
julia> Bref = @spawnat :any A^2;
[...]
julia> fetch(Bref);
メソッド2:
julia> Bref = @spawnat :any rand(1000,1000)^2;
[...]
julia> fetch(Bref);
これらの違いは些細なように見えますが,実際には@spawnat
の振る舞いによってかなり大きな違いがあります. 1つめのメソッドでは,ランダム行列が局所的に構築され,別のプロセスに送られて二乗されます.2つめのメソッドでは, ランダム行列は,別のプロセスで構築も二乗もされます.ゆえに,2つめのメソッドは1つめのメソッドよりもはるかに少ないデータを送信します.
このおもちゃの例では,この2つの方法は簡単に区別して選択することができます.しかし,実際のプログラムでは, データ移動の設計をするにはより多くの考えが必要であり,おそらく何らかの測定が必要になる場合があります. 例えば,1つめのプロセスが行列A
を必要とする場合,1つめのメソッドが良いかもしれません.あるいは,A
の 計算コストが高く現在のプロセスだけがそれを持っている場合は,他のプロセスへの移動は避けられないかもしれません. あるいは,現在のプロセスが@spawnat
とfetch(Bref)
の間にほとんど何もしない場合には,並列性を完全に 排除した方が良いかもしれません.あるいは,rand(1000,1000)
がより計算コストのかかる処理に置き換えられることを 想像してみてください.その場合,このステップのためだけに,別の@spawnat
文を追加するのが理にかなっている かもしれません.
グローバル変数
@spawnat
経由でリモート実行される式や,remotecall
を使ってリモートで実行するために指定されたクロージャは, グローバル変数を参照することがあります.Main
モジュールの下のグローバルバインディングは, 他のモジュールのグローバルバインディングとは少し違った扱いになります.以下のコードスニペットを考えてみましょう:
A = rand(10,10)
remotecall_fetch(()->sum(A), 2)
この場合,sum
はリモートプロセスで定義されなければなりません.A
はローカルのワークスペースで定義された グローバル変数であることに注意してください.ワーカ2はMain
の下にA
という変数を持っていません.クロージャ()->sum(A)
をワーカ2に送る行為はMain.A
がワーカ2に定義される結果となります.remotecall_fetch
の呼び出しがリターンされた後も, ワーカ2の上にMain.A
は存在し続けます.グローバル参照が埋め込まれたリモート呼び出し(Main
モジュールの下でのみ)は, 以下のようにグローバルを管理します:
リモートコールの一部といして参照されている場合,宛先ワーカに新しいグローバルバインディングが作成されます
グローバル定数はリモートノード上でも定数として宣言されます.
グローバル変数が宛先ワーカに再送信されるのは,リモート呼び出しのコンテキストのみで,その値が変更された 場合のみです.また,クラスタはノード間でグローバルバインディングを同期化しません.例えば以下のようになります:
A = rand(10,10) remotecall_fetch(()->sum(A), 2) # worker 2 A = rand(10,10) remotecall_fetch(()->sum(A), 3) # worker 3 A = nothing
上記のスニペットを実行すると,ワーカ2の
Main.A
はワーカ3のMain.A
とは異なる値を持ち, ノード1のMain.A
の値は何も設定されません.
お気付きかもしれませんが,マスタ上で再割り当てられたされたときにグローバルに関連付けられたメモリが収集される 場合がありますが,バインディングが有効であり続けるため,ワーカにはそのようなアクションは実行されません. clear!
を使用すると,リモートのノード上の特定のグローバルが不要になったら,手動でそれらをnothing
へ 再割り当てすることができます.これにより,通常のガベージコレクションサイクルの一部として,それらに関連付けられた メモリが解放されます.
したがって,プログラムはリモート呼び出しの際のグローバルの参照に注意する必要があります.実際には,可能であれば 完全に避けることが望ましいです.グローバルを参照する必要がある場合は,グローバル変数をローカライズするために, let
ブロックを使用することを検討してください.
以下は例です:
julia> A = rand(10,10);
julia> remotecall_fetch(()->A, 2);
julia> B = rand(10,10);
julia> let B = B
remotecall_fetch(()->B, 2)
end;
julia> @fetchfrom 2 InteractiveUtils.varinfo()
name size summary
––––––––– ––––––––– ––––––––––––––––––––––
A 800 bytes 10×10 Array{Float64,2}
Base Module
Core Module
Main Module
このように,グローバル変数A
はワーカ2上で定義されていますが,B
はローカル変数として捉えられているため, ワーカ2上にはB
のバインディングが存在しません.
並列マップとループ
幸いなことに多くの有用な並列計算はデータ移動を必要としません.一般的な例としては, 複数のプロセスが独立したシミュレーション試行を同時に処理することができるモンテカルロシミュレーションがあります. ここでは,@spawnat
を使って,2つのプロセスでコインを反転させることができます.まず,count_heads.jl
に 以下のような関数を書きます:
function count_heads(n)
c::Int = 0
for i = 1:n
c += rand(Bool)
end
c
end
関数count_heads
は,単純にn
個のランダムビットを加算します.ここでは,2つのマシンでいくつかの 試行を行い,その結果を足し合わせる方法を示します.
julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")
julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)
julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)
julia> fetch(a)+fetch(b)
100001564
この例は,今日六でよく使われる並列プログラミングパターンを示しています.多くの反復処理はいくつかの プロセスで独立して実行され,その結果が何らかの関数を使って結合されます. この組み合わせのプロセスはreductionと呼ばれます,なぜならそれは一般的にtensor-rank-reducingだからです: あるベクトルが1つの数に削減されたり,行列が1つの行や列に削減されたりすることから,こう呼びます. コードでは,これは通常,x = f(x,v[i])
というパターンのように見えます.ここでx
はアキュムレータ, f
はリダクション関数,v[i]
はリデュースされる要素です.演算がどのような順序で実行されても問題ないように, f
は結合律を満たしていることが望ましいです.
count_heads
でのこのパターンの使用は一般化できることに注意してください.2つの明示的な@spawnat
文を 使用しているので,並列処理は2つのプロセスに制限されています.任意の数のプロセスで実行するためには, 分散メモリで実行するparallel for loopを使うことができ,これはJuliaでは@distributed
を 使って以下のように書くことができます:
nheads = @distributed (+) for i = 1:200000000
Int(rand(Bool))
end
この構文は複数のプロセスに反復処理を割り当て,指定されたリダクション(ここでは(+)
)と組み合わせるパターンを 実装しています.各反復の結果は,ループ内の最後の式の値として取られます.並列ループ全体の式自体は, 最終的な答えとして評価されます.
並列ループはシリアルループのように見えますが,動作は劇的に異なることに注意してください.特に, 反復は指定された順序では行われず,変数や配列への書き込みは,反復が異なるプロセスで実行されるため, グローバルには表示されません.並列ループ内で使用される変数は全てコピーされ,各プロセスにブロードキャストされます.
例えば,以下のようなコードは意図通りには動きません:
a = zeros(100000)
@distributed for i = 1:100000
a[i] = i
end
このコードでは,各プロセスが個別のコピーを持つことになるので,全てのa
を初期化することはできません. このようなループのための並列化は避けなければなりません.幸いなことに,Shared Arrays を使うことで,この制限を回避することができます:
using SharedArrays
a = SharedArray{Float64}(10)
@distributed for i = 1:10
a[i] = i
end
変数が読み取り専用であれば,並列ループで「外部」変数を使用するのは完全に合理的です:
a = randn(1000)
@distributed (+) for i = 1:100000
f(a[rand(1:end)])
end
ここでは,各反復処理は,全ての処理で共有されるベクトルa
からランダムに選択されたサンプルに対してf
を適用します.
ここで見た通り,リダクション演算子は必要なければ省略することができます.その場合,ループは非同期に実行される. つまり,利用可能な全てのワーカ上で独立したタスクを生成し,完了を待たずに直ちにFuture
の 配列を返します.呼び出し元は,Future
の完了を後のポイントでfetch
を呼び出すことで 待つか,ループの最後に@sync @distributed for
のように@sync
を接頭辞としてつけることにより完了を待つことができる.
場合によってはリダクション演算子は必要とされず,ある範囲の全ての整数(またはより一般的には,あるコレクションの全ての要素) に関数を適用したいだけの場合もあります.これはparallel mapと呼ばれるもう一つの便利な操作で,Juliaでは pmap
関数として実装されています.例えば,以下のようにいくつかの大きな乱数行列の特異値を並列に計算することができます:
julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
julia> pmap(svdvals, M);
Juliaのpmap
は各関数呼び出しが大量の作業を行う場合のために設計されています. 対照的に@distributed for
は,それぞれの反復が小さなもので,おそらく2つの数値を合計するだけのような状況を 扱うことができます.pmap
と@distributed for
は並列計算のためにワーカプロセスのみを使用します. @distributed for
を使う場合には,最終的なリダクションは呼び出したプロセスで行われます.
リモートリファレンスとアブストラクトチャネル
リモートリファレンスは常にAbstractChannel
の実装を参照します.
(Channel
のような)AbstractChannel
の具体的な実装はput!
,take!
, fetch
, isready
およびwait
を実装するのに必要とされます. Future
によって参照されるリモートオブジェクトは,Channel{Any}(1)
,すなわち Any
タイプのオブジェクトを保持することのできるサイズ1のChannel
に格納されます.
RemoteChannel
は上書き可能ですが,任意の型やサイズのチャネル,あるいはAbstractChannel
の 他の実装を指定することができます.
コンストラクタRemoteChannel(f::Function, pid)()
を使用すると,特定の型の複数の値を保持するチャネルへの 参照を作成することができます.f
はpid
上で実行される関数であり,AbstractChannel
を返さなければなりません.
例えば,RemoteChannel(()->Channel{Int}(10), pid)
はInt型でサイズ10のチャネルへの参照を返します. このチャネルはワーカpid
上に存在します.
RemoteChannel
上のメソッドput!
,take!
,fetch
,isready
およびwait
は,リモートプロセス上のバッキングストアにプロキシされます.
このように,RemoteChannel
はユーザが実装したAbstractChannel
オブジェクトを参照するために 使用することができます.この単純な例は,Examples repository のdictchannel.jl
で提供されており,リモートストアとして辞書を使用しています.
チャネルとリモートチャネル
Channel
はプロセスに対してローカルなものです.ワーカ2がワーカ3のChannel
を直接参照することはできませんが,RemoteChannel
はワーカ間で値を入れたり出したりすることができます.RemoteChannel
はChannel
のhandleと考えることができます.RemoteChannel
に関連付けられたプロセスidpid
は,バッキングストアが存在するプロセス言い換えるとバッキングChannel
が存在するプロセスを識別します.RemoteChannel
への参照を持つ全てのプロセスは,チャネルからアイテムを入れたり出したりできます.データはRemoteChannel
が関連付けられているプロセスに自動的に送信されます(またはそこから取得されます).Channel
をシリアライズすると,チャネル内に存在する全てのデータもシリアライズされます.そのため,チャネルをデシリアライズすると,元のオブジェクトのコピーが効果的に作成されます.- 一方,
RemoteChannel
をシリアライズすると,ハンドルが参照しているChannel
の場所とインスタンスを識別する識別子のシリアライズのみが行われます.したがって,(任意のワーカ上の)デシリアライズされたRemoteChannel
オブジェクトはオリジナルと同じバッキングストアを指すことになります
上記のチャネルの例は,以下のようにプロセス間通信のために変更することができます.
単一のjobs
リモートチャネルを処理するために4つのワーカを起動します.Jobsはjob_id
によって識別され, そのチャネルに書き込まれます.このシミュレーションでは各リモート実行タスクはjob_id
を読み込み, ランダムな時間だけ待機し,job_id
,かかった時間,自身のpid
のタプルを結果チャネルに書き戻します. 最後に,全ての結果がマスタプロセスに出力されます.
julia> addprocs(4); # add worker processes
julia> const jobs = RemoteChannel(()->Channel{Int}(32));
julia> const results = RemoteChannel(()->Channel{Tuple}(32));
julia> @everywhere function do_work(jobs, results) # define work function everywhere
while true
job_id = take!(jobs)
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
put!(results, (job_id, exec_time, myid()))
end
end
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> @async make_jobs(n); # feed the jobs channel with "n" jobs
julia> for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_work, p, jobs, results)
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time, where = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
global n = n - 1
end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741
リモートリファレンスと分散ガベージコレクション
リモートリファレンスによって参照されるオブジェクトはクラスタ内で保持されている全ての参照が 削除されたときにのみ解放されることができます.
値が格納されているノードは,どのワーカがその値への参照を持っているかを追跡します. RemoteChannel
や(フェッチされていない)Future
がワーカにシリアライズされるたびに, 参照先のノードが通知されます.また,RemoteChannel
や(フェッチされていない)Future
が ローカルでガベージコレクションされるたびに,値を所有するノードは再度通知される.これは内部クラスタを意識した シリアライザで実装されています.リモート参照は実行中のクラスタのコンテキストでのみ有効です.通常のIO
オブジェクトへの, または通常のIO
オブジェクトからの参照のシリアライズとデシリアライズはサポートされていません.
この通知は参照が別のプロセスにシリアライズされた場合は,「参照を追加」メッセージ,参照がローカルで ガベージコレクションされた場合には「参照を削除」メッセージという「トラッキングメッセージ」の送信によって 行われます.
Future
は一度限りの書き込みでローカルにキャッシュされるので, Future
をfetch
する行為は,値を所有しているノードの参照トラッキング情報も更新する.
値を所有しているノードは,値への全ての参照がクリアされると,値を解放します.
Future
では,オリジナルのリモートストアがこの時点までに値を収集している場合があるので, すでに別のノードへフェッチされたFuture
をシリアライズした時も値を送信します.
オブジェクトがいつローカルでガベージコレクションされるのかが,オブジェクトのサイズとシステム内の 現在のメモリプレッシャに依存することに注意することは重要です.
リモートリファレンスの場合,ローカルリファレンスオブジェクトのサイズはかなり小さいですが,リモートノードに 格納されている値はかなり大きいかもしれません.ローカルオブジェクトはすぐに収集されない可能性があるので, RemoteChannel
のローカルインスタンスや,フェッチされていないFuture
に 対して明示的にfinalize
を呼び出すのが良い方法です.Future
に対して, fetch
を呼び出すと,リモートストアからの参照も削除されるので,フェッチされたFuture
sに 対してはこれは必要ありません.明示的にfinalize
を呼び出すと,リモートノードに値への参照を削除するための 即時メッセージが送信されます.
一度ファイナライズされると,参照は無効になり,それ以降の呼び出しでは使用できなくなります.
ローカルな呼び出し
実行のためにデータは必然敵にリモートノードにコピーされる.これはリモートコールの場合と, データが別のノードのRemoteChannel
/ Future
に格納されている場合の両方に当てはまる. 予想通り,これはリモートノード上のシリアライズされたオブジェクトのコピーになります.しかし,宛先ノードが ローカルノードである場合,つまり呼び出し元のプロセスIDがリモートノードIDと同じである場合は,ローカルコールとして 実行されます.通常は(常にではありませんが)別のタスクで実行されますが,データのシリアライズ/デシリアライズは 行われません.その結果,コールは渡されたものと同じオブジェクトインスタンスを参照します,この時コピーは作成されません. この動作を以下に示します.
julia> using Distributed;
julia> rc = RemoteChannel(()->Channel(3)); # RemoteChannel created on local node
julia> v = [0];
julia> for i in 1:3
v[1] = i # Reusing `v`
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[3], [3], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1
julia> addprocs(1);
julia> rc = RemoteChannel(()->Channel(3), workers()[1]); # RemoteChannel created on remote node
julia> v = [0];
julia> for i in 1:3
v[1] = i
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[1], [2], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3
見て取られるように,ローカルに所有されているRemoteChannel
に呼び出しの間に修正された 同一のオブジェクトv
をput!
すると,同じ単一のオブジェクトインスタンスが格納されます. これは,rc
を所有しているノードが別のノードである場合にv
のコピーが作成されるのとは対照的です.
これは一般的には問題ではないことに注意してください.これは,オブジェクトがローカルに保存されている場合と, 呼び出し後に変更されている場合にのみ考慮すべきことです.そのような場合は,オブジェクトのdeepcopy
を 保存するのが適切かもしれません.
これは,次の例のようにローカルノード上のリモートコールにも当てはまります:
julia> using Distributed; addprocs(1);
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v); # Executed on local node
julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # Executed on remote node
julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false
再度見て取れるように,ローカルノードへのリモート呼び出しは,直接呼び出しと同じように動作します. 呼び出しは引数として渡されたローカルオブジェクトを変更します.リモート呼び出しでは,引数のコピーを操作します.
繰り返しになりますが,これは一般的には問題になりません.ローカルノードが計算ノードとしても使用されており, 呼び出し後に引数が使用されている場合,要求された引数のディープコピーをローカルノードで実行するコールへ渡さねば ならないときには,このディープコピーの中で,この動作を考慮する必要があります.
Shared Arrays
Shared Arraysはシステムの共有メモリを使用して,多くのプロセスにわたって同じ配列をマッピングします. DArray
といくつかの類似点がありますが, SharedArray
の動作はかなり異なります.DArray
では, 各プロセスはデータのチャンクへのローカルアクセス権を持ち,2つのプロセスが同じチャンクを共有することはありません: 対照的に,SharedArray
では各「参加」プロセスは,配列全体にアクセスすることができます. SharedArray
は,同じマシン上の2つ以上のプロセスが共同でアクセスできる大量のデータを持ちたい場合に良い選択です.
Shared Arrayのサポートは,参加している全てのワーカ上で明示的にロードされなければならないSharedArrays
モジュールを介して利用可能です.
SharedArray
のインデクシング(値の代入とアクセス)は通常の配列と同じように動作し, ローカルプロセスで利用可能なメモリ上で動作しているので効率的です.したがって,シングルプロセスモードとはいえ, ほとんどのアルゴリズムは自然にSharedArray
s上で動作します.アルゴリズムがArray
入力を要求 する場合,sdata
を呼ぶことによって,SharedArray
からその下にある配列を取得することができます. 他のAbstractArray
型の場合,sdata
はオブジェクト自体を返すだけなので,どんなArray
型のオブジェクト上で sdata
を使ってもセーフです.
Shared Arrayのコンストラクタは以下の形式です:
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])
これはpids
で指定されたプロセス間でビット型T
とサイズdims
のN
次元のshared arrayを作成します. 分散配列とは異なり,shared arrayは引数に指定されたpids
で指定された参加ワーカからのみアクセス可能です( 同じホスト上にある場合は作成プロセスからもアクセス可能です.)SharedArrayでは,isbits
である 要素のみがサポートされていることに注意してください.
シグネチャinitfn(S::SharedArray)
のinit
関数が指定された場合,参加している全てのワーカ上で呼び出されます. 各ワーカが配列の異なる部分でinit
関数を実行できるように指定することで,初期化を並列化することができます.
以下に簡単な例を示します:
julia> using Distributed
julia> addprocs(3)
3-element Array{Int64,1}:
2
3
4
julia> @everywhere using SharedArrays
julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 3 4 4
julia> S[3,2] = 7
7
julia> S
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 7 4 4
SharedArrays.localindices
はインデックスの不連続な一次元の範囲を提供し,プロセス間でタスクを分割するのに 便利なことがあります.もちろん好きなように作業を分割することができます:
julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedArray{Int64,2}:
2 2 2 2
3 3 3 3
4 4 4 4
全てのプロセスが下にあるデータにアクセスできるので,コンフリクトを起こさないように気を付けなければなりません. 例えば:
@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p)
end
end
end
これは定義されていない挙動を生む結果となります.各々のプロセスは自身のpid
で配列全体を埋めるので, 最後に(Sの任意の特定の要素に対して)実行したプロセスがいずれであっても,そのpid
を保持することになります.
より拡張された複雑な例として,以下の「カーネル」を並列に実行することを考えてみましょう:
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
この場合,1次元のインデックスを使って作業を分割しようとすると,問題が発生する可能性があります: q[i,j,t]
があるワーカに割り当てられたブロックの終わり近くにあり,q[i,j,t+1]
が別のワーカに割り当て られたブロックの始まり近くにある場合,q[i,j,t]
がq[i,j,t+1]
を計算するのに必要な時間に準備できていない 可能性が高いです.このような場合には,手動で配列をチャンクした方が良いでしょう.2つ目の次元に沿って 分割してみましょう.このワーカに割り当てられた(irange, jrange)
インデックスを返す関数を定義します:
julia> @everywhere function myrange(q::SharedArray)
idx = indexpids(q)
if idx == 0 # This worker is not assigned a piece
return 1:0, 1:0
end
nchunks = length(procs(q))
splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
1:size(q,1), splits[idx]+1:splits[idx+1]
end
次に,カーネルを定義します:
julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
@show (irange, jrange, trange) # display so we can see what's happening
for t in trange, j in jrange, i in irange
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
end
q
end
SharedArray
実装のために便利なラッパも定義します:
julia> @everywhere advection_shared_chunk!(q, u) =
advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)
では3つの異なるバージョンを比べてみましょう.シングルプロセスで動作させた場合:
julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);
@distributed
を使った場合:
julia> function advection_parallel!(q, u)
for t = 1:size(q,3)-1
@sync @distributed for j = 1:size(q,2)
for i = 1:size(q,1)
q[i,j,t+1]= q[i,j,t] + u[i,j,t]
end
end
end
q
end;
そしてチャンクに委譲した場合:
julia> function advection_shared!(q, u)
@sync begin
for p in procs(q)
@async remotecall_wait(advection_shared_chunk!, p, q, u)
end
end
q
end;
SharedArray
sを作成してこれらの関数を実行すると,以下のような結果が得られます(julia -p 4
を用いた場合):
julia> q = SharedArray{Float64,3}((500,500,500));
julia> u = SharedArray{Float64,3}((500,500,500));
関数を一度実行してJITコンパイルし,2回目の実行時に @time
で計測します:
julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
830.220 milliseconds (216 allocations: 13820 bytes)
julia> @time advection_parallel!(q, u);
2.495 seconds (3999 k allocations: 289 MB, 2.09% gc time)
julia> @time advection_shared!(q,u);
From worker 2: (irange,jrange,trange) = (1:500,1:125,1:499)
From worker 4: (irange,jrange,trange) = (1:500,251:375,1:499)
From worker 3: (irange,jrange,trange) = (1:500,126:250,1:499)
From worker 5: (irange,jrange,trange) = (1:500,376:500,1:499)
238.119 milliseconds (2264 allocations: 169 KB)
advection_shared!
の最大の利点は,ワーカ間のトラフィックを最小限に抑え,各ワーカが割り当てられたピースで 長時間計算することを可能にすることです.
Shared Arraysと分散ガベージコレクション
リモートリファレンスと同様に,shared arraysもまた,参加している全てのワーカから参照を解放をするのに, 作成ノードのガベージコレクションに依存しています.短期間で多くのshared arrayオブジェクトを作成するコードでは, これらのオブジェクトをできるだけ早く明示的にファイナライズすることが有益です.これにより,共有セグメントを マッピングするメモリとファイルハンドルの両方がより早く解放されるようになります.
ClusterManagers
論理クラスタへのJuliaプロセスの起動,管理,ネットワーキングは,クラスタマネージャを介して行われます. ClusterManager
は以下を担当します:
- クラスタ環境下におけるワーカプロセスの起動
- 各ワーカのライフタイムの間のイベント管理
- オプションで,データ転送を提供
Juliaクラスタは以下のような特徴を持ちます:
- 最初のJuliaプロセスは
master
とも呼ばれますが,これは特別なもので,id
として1を持ちます. master
プロセスだけが,ワーカプロセスを追加したり削除したりできます.- 全てのプロセスはお互いに直接通信することができます.
ワーカ間の接続(ビルトインのTCP/IP転送を利用)は,以下のような形で確立されます:
- マスタプロセス上で,
ClusterManager
とともに,addprocs
を呼び出します. addprocs
は適切なlaunch
を呼び,適切なマシン上で要求された数のワーカを生成します.- 各ワーカはフリーなポートをリスンし始め,
stdout
にそのホストとポート情報を書き出します. - クラスタマネージャは各ワーカの
stdout
をキャプチャし,マスタプロセスで利用できるようにします. - マスタプロセ薄は個の情報をパース氏,各ワーカとのTCP/IP接続をセットアップします.
- 全てのワーカはクラスタ内の他のワーカにも通知されます.
- 各ワーカは自分自身の
id
よりも小さいid
を持つ全てのワーカに接続します. - このようにして,メッシュネットワークが確立され,そこでは全てのワーカが全ての他のワーカと直接つなげられています.
デフォルトのトランスポートレイヤはTCPSocket
を用いていますが,Juliaクラスタでは独自のトランスポートを 提供することができます.
Juliaは2つのビルトインなクラスタマネージャを提供します:
addprocs()
またはaddprocs(np::Integer)
が呼ばれたときに使用されるLocalManager
addprocs(hostnames::Array)
がホストネームのリスト共に呼び出された時に使用されるSSHManager
LocalManager
は同じホスト上でワーカを追加して起動するのに用いられ,それによりマルチコア,マルチプロセッサ なハードウェアを有効活用します.
Thus, a minimal cluster manager would need to: したがって,最小限のクラスタマネージャは以下のようである必要があります:
- アブストラクトな
ClusterManager
のサブタイプであること - 新しいワーカを起動することを担当するメソッドである
launch
を実装すること - ワーカのライフタイムの間の様々なイベント(例えば,割り込み信号の送信)の際に呼ばれる,
manage
を実装すること
addprocs(manager::FooManager)
はFooManager
が実装されていることを必要とします:
function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
例として,同じホスト上でワーカの起動を担当するマネージャであるLocalManager
がどのように実装されているかを見てみましょう:
struct LocalManager <: ClusterManager
np::Integer
end
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
launch
メソッドは以下のような引数を取ります:
manager::ClusterManager
:addprocs
が呼び出されるクラスタマネージャparams::Dict
:addprocs
に渡される全てのキーワード引数launched::Array
: 1つ以上のWorkerConfig
オブジェクトをアペンドするための配列c::Condition
: ワーカの起動時に通知される条件変数
launch
メソッドは,別のタスクで非同期的に呼び出されます.このタスクの終了は, 要求された全てのワーカが起動されたことを示しています.したがって,要求された全てのワーカが 起動されたらすぐに,launch
関数は終了されなければなりません.
新たに起動されたワーカは,お互いとマスタプロセスにall-to-allに接続されます.コマンドライン引数--worker[=<cookie>]
を指定すると,起動されたプロセスがワーカとして初期化され,TCP/IPソケットを介して接続がセットアップされます.
クラスタ内の全てのワーカはマスタと同じcookieを共有します.クッキーが指定されていない場合, つまり--worker
を指定した場合,ワーカはそれを標準入力から読み込もうとします. LocalManager
とSSHManager
はどちらも自らの標準入力を介して,新しく起動されたワーカにクッキーを渡します.
デフォルトではワーカはgetipaddr()
の呼び出しで返されたアドレスの空きポートをリスンします. リスンする特定のアドレスはオプションの引数--bind-to bind_addr[:port]
で指定できます.これはマルチホームホストに便利です.
非TCP/IPトランスポートの例として,実装はMPIを使用することを選ぶことができるが,その場合は--worker
は指定してはならない. 代わりに,新しく起動されたワーカは,並列構成を使う前にinit_worker(cookie)
を呼び出さねばなりません.
起動された全てのワーカに対して,launch
はWorkerConfig
オブジェクトを(適切なフィールドを初期化しながら)launched
に追加しなければなりません.
mutable struct WorkerConfig
# Common fields relevant to all cluster managers
io::Union{IO, Nothing}
host::Union{AbstractString, Nothing}
port::Union{Integer, Nothing}
# Used when launching additional workers at a host
count::Union{Int, Symbol, Nothing}
exename::Union{AbstractString, Cmd, Nothing}
exeflags::Union{Cmd, Nothing}
# External cluster managers can use this to store information at a per-worker level
# Can be a dict if multiple fields need to be stored.
userdata::Any
# SSHManager / SSH tunnel connections to workers
tunnel::Union{Bool, Nothing}
bind_addr::Union{AbstractString, Nothing}
sshflags::Union{Cmd, Nothing}
max_parallel::Union{Integer, Nothing}
# Used by Local/SSH managers
connect_at::Any
[...]
end
WorkerConfig
のフィールドのほとんどは,ビルトインのマネージャで使用されます.カスタムクラスタマネージャは通常, io
またはhost
/ port
のみを指定します:
io
が指定された場合,それはホスト/ポート情報を読み込むために使用されます.Juliaワーカは起動時にバインドアドレスとポートを出力します.これにより,ワーカのポートを手動で設定することを要求する代わりにJuliaワーカは空いている任意ポートをリスンすることができます.io
が指定されていない場合,host
とport
が接続に用いられます.count
,exename
,およびexeflags
は,ワーカから追加のワーカを起動する際に関連します.例えばクラスタマネージャはノードごとに単一のワーカを起動し,それを使用して追加のワーカを起動することができます.count
に整数値n
を指定すると合計n
個のワーカが起動されます.count
に:auto
の値を指定すると,そのマシンのCPUスレッド(論理コア)の数と同じ数のワーカを起動します.exename
はフルパスを含むjulia
実行ファイルの名前です.exeflags
は新しいワーカに必要なコマンドライン引数に設定してください.
tunnel
,bind_addr
,sshflags
およびmax_parallel
はマスタプロセスからワーカに接続するためにsshトンネルが必要な場合に使用されます.userdata
はカスタムクラスタマネージャが独自のワーカの固有の情報を保存するために提供されます.
manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
は以下のようなop
値を指定して, ワーカのライフタイム中に異なるタイミングで呼び出されます:
- Juliaワーカプールでワーカが追加削除されたときに指定する
:register
/:deregister
interrupt(workers)
が呼び出されたときに指定する:interrupt
.ClusterManager
は適切なワーカに対して,割り込み信号を送信しなければなりません.- クリーンアップのために指定する
:finalize
.
カスタムトランスポートを使用したクラスタマネージャ
デフォルトのTCP/IPのall-to-allなソケット接続をカスタムのトランスポートレイヤに置き換えるのはもう少し複雑です. 各Juliaプロセスは,接続されているワーカの数だけ通信タスクを持っています.例えば,32プロセスからなるJulia クラスタがall-to-allなメッシュネットワークにあるとします:
- 各Juliaプロセスは31個の通信タスクを持っています.
- 各タスクは,メッセージ処理のループで単一のリモートワーカからの全ての着信メッセージを処理します.
- The message-processing loop waits on an
IO
object (for example, aTCPSocket
in the default implementation), reads an entire message, processes it and waits for the next one. - メッセージ処理ループは,
IO
オブジェクト(例えば,デフォルトの実装ではTCPSocket
)上で待機し,メッセージ全体を読み込んで処理し,次のメッセージを待ちます. - プロセスへのメッセージの送信は,通信タスクだけでなく,Juliaタスクからも,適切な
IO
オブジェクトを介して直接行われます.
デフォルトのトランスポートを置き換えるには,新しい実装でリモートワーカへの接続を設定し,メッセージ処理ループが 待機できる適切なIO
オブジェクトを提供する必要があります.マネージャ固有のコールバックは以下の通り実装されます:
connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)
デフォルトの実装(TCP/IPソケットを使用)は,connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)
として実装されています.
connect
はワーカpid
から送信されたデータを読み込むためのIO
オブジェクトと,ワーカpid
に送信する必要が あるデータを書き込むためのIO
オブジェクトのペアを返す必要があります.カスタムクラスタマネージャは, カスタマイズされたおそらくIO
ではないトランスポートと,Juliaのビルトインな並列インフラストラクチャの間で, データをプロキシするための配管として,インメモリのBufferStream
を使用することができます.
BufferStream
はインメモリのIOBuffer
で,IO
のように振舞います,すなわち非同期に処理できるストリームです.
Examples repositoryレポジトリのclustermanager/0mq
フォルダには, ZeroMQを利用して0MQブローカを中央に配置したスター型のトポロジでJuliaワーカを接続する例があります. 注意: Juliaプロセスは全て,まだ論理的に接続されています,つまりどのワーカもトランスポートレイヤとして使用されている 0MQを意識することなく,他のワーカに直接メッセージを送ることができます.
When using custom transports: カスタムトランスポートを使用する場合は:
- Juliaワーカは
--worker
で開始してはなりません.--worker
をつけて起動すると,新しく起動されたワーカはTCP/IPソケットトランスポートの実装をデフォルトにします. - ワーカとの論理接続を受信するたびに,
Base.process_messages(rd::IO, wr::IO)()
が呼び出されなければなりません.これはIO
オブジェクトであらわされるワーカとの間のメッセージの読み書きを処理する新しいタスクを起動します. init_worker(cookie, manager::FooManager)
はワーカプロセスの初期化の一部として呼び出されなければなりません.WorkerConfig
の中のconnect_at::Any
フィールドは,launch
が呼ばれた時にクラスタマネージャによって設定することができます.このフィールドの値は全てのconnect
コールバックで渡されます.通常,このフィールドはワーカへの接続方法に関する情報を保持します.例えば,TCP/IPソケットトランスポートはこのフィールドを利用して,ワーカに接続するための(host, port)
タプルを指定します.
kill(manager, pid, config)
はクラスタからワーカを削除するために呼び出されます.マスタプロセス上では, 適切なクリーンアップを確実にするために,対応するIO
オブジェクトは実装によってクローズされなければなりません. デフォルトの実装では,指定されたリモートワーカに対して,exit()
を実行するだけです.
Examplesフォルダのclustermanager/simple
は,クラスタのセットアップにUNIXドメインソケットを使用した簡単な実装を示す例です.
LocalManagerとSSHManagerのネットワーク要件
Juliaクラスタはローカルのラップトップ,部署のクラスタ,クラウドといった,インフラストラクチャ上にすでに セキュアにされている環境下で実行されるように設計されています.このセクションでは,ビルトインである LocalManager
とSSHManager
のネットワークセキュリティ要件について説明します:
マスタプロセスはどのポートもリスンせず,ワーカに対して外向きにのみ接続します.
各ワーカはローカルインタフェースの1つだけにバインドし,OSによって割り当てられた,エフェメラルなポート番号でリスンします.
addprocs(N)
が使用するLocalManager
は,デフォルトでは,ループバックインタフェースにのみバインドします.これはあとからリモートホスト上で起動されたワーカが(あるいは悪意のある者によって起動されたものが)クラスタに接続できないことを意味します.addprocs(["remote_host"])
の後にaddprocs(4)
を実行しても失敗します.ユーザによってはローカルシステムといくつかのリモートシステムからなるクラスタを作成する必要があるかもしれません.これは外部ネットワークインタフェースをバインドするように,キーワード引数restrict
をaddprocs(4; restrict=false)
と設定しながら,明示的にLocalManager
に要求することで行うことができます.addprocs(list_of_remote_hosts)
によって使用されるSSHManager
は,SSH経由でリモートホスト上のワーカを起動します.デフォルトでは,SSHはJuliaのワーカを起動するためのみに使用されます.その後のマスタ-ワーカ間およびワーカ-ワーカ間の接続では,プレーンで暗号化されていないTCP/IPソケットを使用します.リモートホストはパスワードなしのログインが有効になっている必要があります.追加のSSHフラグや認証情報は,キーワード引数sshflags
で指定できます.addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>)
はマスタ-ワーカ間にもSSH接続を使用したい場合に便利です.典型的なシナリオは,Julia REPL(すなわちマスタ)を実行しているローカルのラップトップと,クラウド上のクラスタの残りの部分,例えばAmazon EC2上のものですが,それらが一緒に動いているような場合です.この場合,公開鍵インフラストラクチャ(PKI)を介して認証されたSSHクライアントと組み合わせてリモートクラスタでポート22を開く必要があるだけです.認証のための認証情報は,sshflags
を使って,例えばsshflags=`-i <keyfile>`
のようにして提供することができます.all-to-allなトポロジ(デフォルト)では,全てのワーカはプレーンなTCPソケットを介して互いに接続します.したがって,クラスタノードのセキュリティポリシは,(OSによって異なりますが)エフェメラルポート範囲のワーカ間の自由な接続を保証する必要があります.
全てのワーカ-ワーカ間のトラフィックをSSH経由でセキュアにして暗号化したり,個々のメッセージを暗号化したりすることは,カスタム
ClusterManager
を介して行うことができます.addprocs
のオプションとしてmultiplex=true
を指定した場合,SSH多重化はマスタとワーカの間にトンネルを作成するために使用されます.独自にSSH多重化を設定しており,接続が既に確立されている場合は,multiplex
オプションに関係なく,SSH多重化が使用されます.多重化が有効になっている場合は,既存の接続を使用して転送が設定されます(sshの-O forward
オプション).これは,サーバがパスワード認証を必要とするときに有効です.:addprocs
の前にサーバにログインすることで,Juliaでの認証を回避することができます.既存の多重化接続が使用されていない限り,セッションの間制御ソケットは,~/.ssh/julia-%r@%h:%p
に置かれます.ノード上に複数のプロセスを作成して多重化を有効にすると,帯域幅が制限される可能性があることに注意してください,なぜならこの場合,プロセスは単一の多重化TCP接続を共有するからです.
Cluster Cookie
クラスタ内の全てのプロセスは同じクッキーを共有しますが,これはデフォルトでは,マスタプロセス上でランダムに生成された文字列です:
cluster_cookie()
はクッキーを返し,cluster_cookie(cookie)()
はクッキーを設定して新しいクッキーを返します.- 全ての接続は双方で認証され,マスタによって起動されたワーカだけがお互いに接続を許可されることを確実にします.
- クッキーは引数
--worker=<cookie>
で起動時にワーカに渡すことができます.--worker
がクッキーなしで指定された場合,ワーカは標準入力(stdin
)からクッキーを読み込もうとします.stdin
はクッキーが取得された直後に閉じられます. ClusterManager
sは,cluster_cookie()
を呼び出すことでマスタ上のクッキーを取得できます.デフォルトのTCP/IPトランスポートを使用していない(つまり,--worker
を指定していない)クラスタマネージャは,マスタ上のものと同じクッキーでinit_worker(cookie, manager)
を呼び出さなければなりません.
より高いレベルのセキュリティを必要とする環境では,カスタムのClusterManager
を通してこれを実装できることに注意してください. 例えば,クッキーは事前に共有することができ,それゆえに起動時の引数として指定されません.
ネットワークトポロジの指定 (Experimental)
The keyword argument topology
passed to addprocs
is used to specify how the workers must be connected to each other: addprocs
に渡されるキーワード引数topology
は,ワーカがどのように相互に接続されなければならないかを指定するために使用されます:
:all_to_all
,デフォルト: 全てのワーカが相互に接続されます.:master_worker
: ドライバプロセス,すなわちpid
1のもののみがワーカに対して接続します.:custom
: クラスタマネージャのlaunch
方法はWorkerConfig
のident
フィールドとconnect_idents
フィールドで接続トポロジを指定します.クラスタマネージャが提供するアイデンティティident
を持つワーカは,connect_idents
で指定された全てのワーカに接続します.
キーワード引数lazy=true|false
はtopology
オプションの:all_to_all
のみに影響します.true
に設定されている場合, クラスタはマスタが全てのワーカに接続された状態で開始します.特定のワーカ-ワーカ間の接続は2つのワーカ間の最初の リモート呼び出し時に確立されます.これにより,クラスタ内通信に割り当てられる初期リソースを削減するのに役立ちます. 接続は並列プログラムのランタイム要件に応じて設定されます.lazy
のデフォルト値はtrue
です.
現在,接続されていないワーカ間でメッセージを送信するとエラーになります.この動作は,機能やインタフェースと 同様に,実験的なものであり,将来のリリースで変更される可能性があります.
注目すべき外部パッケージ
Juliaの並列化以外にも,言及すべき外部パッケージはたくさんあります.例えば,MPI.jl はMPI
プロトコルのJuliaラッパであり,DistributedArrays.jlは Shared Arraysで紹介した通りです.またJuliaのGPUプログラミングエコシステムについても言及しなければなりません:
低レベル(Cカーネル)ベースの操作OpenCL.jlとCUDAdrv.jlはそれぞれ,OpenCLインタフェースとCUDAラッパです.
CUDAnative.jlのような低レベル(Juliaカーネル)インタフェースは,JuliaネイティブなCUDA実装です.
CuArrays.jlやCLArrays.jlのような高レベルのベンダ固有の抽象化
ArrayFire.jlやGPUArrays.jlのような高レベルのライブラリ
以下の例では,DistributedArrays.jl
とCuArrays.jl
の両方を使用して,最初にdistribute()
とCuArray()
を通して配列をキャスト することで,複数のプロセスに配列を分散させます.
DistributedArrays.jl
を全てのプロセスにわたってインポートする時には,@everywhere
を使用することを忘れないでください
$ ./julia -p 4
julia> addprocs()
julia> @everywhere using DistributedArrays
julia> using CuArrays
julia> B = ones(10_000) ./ 2;
julia> A = ones(10_000) .* π;
julia> C = 2 .* A ./ B;
julia> all(C .≈ 4*π)
true
julia> typeof(C)
Array{Float64,1}
julia> dB = distribute(B);
julia> dA = distribute(A);
julia> dC = 2 .* dA ./ dB;
julia> all(dC .≈ 4*π)
true
julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}
julia> cuB = CuArray(B);
julia> cuA = CuArray(A);
julia> cuC = 2 .* cuA ./ cuB;
julia> all(cuC .≈ 4*π);
true
julia> typeof(cuC)
CuArray{Float64,1}
いくつかのJuliaの昨日は現在CUDAnative.jl[2]ではサポートされていないことに注意してください.特にsin
のような関数は,CUDAnative.sin
(cc: @maleadt)で置き換える必要があります.
次の例では,DistributedArrays.jl
とCuArrays.jl
の両方を使って,複数のプロセスに配列を分散させ,その上で汎用関数を呼び出すようにします.
function power_method(M, v)
for i in 1:100
v = M*v
v /= norm(v)
end
return v, norm(M*v) / norm(v) # or (M*v) ./ v
end
power_method
は新しいベクトルの生成と正規化を繰り返しています.関数宣言では型の指定をしていませんでしたが, 前述のデータ型で動作するかを見てみましょう:
julia> M = [2. 1; 1 1];
julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877
julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)
julia> cuM = CuArray(M);
julia> cuv = CuArray(v);
julia> curesult = power_method(cuM, cuv);
julia> typeof(curesult)
CuArray{Float64,1}
julia> dM = distribute(M);
julia> dv = distribute(v);
julia> dC = power_method(dM, dv);
julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}
外部パッケージの簡単な紹介の最後として,MPIプロトコルのJuliaラッパであるMPI.jl
について考えてみましょう. 全ての内部関数を考慮するには時間がかかりすぎるので,プロトコルを実装するために使用されているアプローチを 単純に評価する方が良いでしょう.
単純に各サブプロセスを呼び出し,そのランクをインスタンス化し,マスタプロセスに到達したらランクの合計を計算する, このおもちゃのスクリプトを考えてみましょう
import MPI
MPI.Init()
comm = MPI.COMM_WORLD
MPI.Barrier(comm)
root = 0
r = MPI.Comm_rank(comm)
sr = MPI.Reduce(r, MPI.SUM, root, comm)
if(MPI.Comm_rank(comm) == root)
@printf("sum of ranks: %s\n", sr)
end
MPI.Finalize()
mpirun -np 4 ./julia example.jl
- 1MPIはこの文脈ではMPI-1標準を指しています.MPI-2以降,MPI標準化委員会は,リモートメモリアクセス(RMA)と総称される新しい通信メカニズムのセットを導入しました.MPI標準にrmaを追加した動機は,一方向的な通信パターンを容易にすることでした.最新のMPI企画についての詳細は,https://mpi-forum.org/docsを参照してください.
- 2Julia GPU man pages