Asakusa Frameworkの実行環境としてSparkを使う機能。
|
|
2015/7/8にAsakusa Frameworkの実行環境としてApache
Sparkを使う機能(開発者向けプレビュー版)が公開され、
2016/4/8のAsakusa on Spark 0.3.0(AsakusaFW 0.8.0)で正式版となった。[/2016-04-10]
AsakusaFWが対象とするような複雑なバッチ処理だと、Hadoop+スモールジョブ実行エンジンを実行環境として使うより3〜5倍程度速くなるようだ。
この機能は運用環境でSparkを使って実行するものであり、Asakusaアプリケーションの開発方法は従来のAsakusaFWと全く同じ。
(単体テストの実行やコンパイルにはSparkを使用しない)
AsakusaDSLをコンパイルすると、従来のHadoop版バッチとSpark版バッチの両方のバイナリーが作られる。
(既存のAsakusaアプリケーションも、ソース(AsakusaDSLで記述したもの)を変更せずリコンパイルのみでSpark版バッチを生成できる。(AsakusaFWのバージョンを上げる必要はあるが))
YAESSでバッチを実行する際に引数を変えることで、どちらで実行するのかを切り替える。
(スモールジョブ実行エンジンはHadoopのMapReduceジョブの代替として動くものなので、スモールジョブ実行エンジンとSpark版との共存は出来ない)
Hadoop版バッチよりSpark版バッチの方が速い主な理由は、
Hadoop版バッチは複数のMapReduce(Hadoopジョブ)を実行することになるので、毎回ジョブを起動する時間(いわゆるHadoop税)がかかること。
スモールジョブ実行エンジンの起動はHadoopジョブほどではないが、やはりJavaVMを毎回起動することになるので、少し時間がかかる。
Spark版バッチは全てSparkが制御する(Sparkジョブの実行中はJavaVMが使い回される)ので、Hadoop版バッチより速くなる。
また、Sparkはステージ間のオーバーヘッドが小さいし、ステージ内で処理を並列実行できるので、CPUやメモリーの利用率が高くなるのだそうだ。
(よく「Sparkはインメモリーで処理されるから速い」と言われるが、「Sparkはデータをメモリーにキャッシュしておいて使い回すことが出来る」というのが正しい。つまり、インメモリーで処理したかったらデータをキャッシュするようにコーディングする必要があるし、データを使い回す処理でないと意味が無い。
しかしAsakusaFWが対象とするようなバッチ処理では、同じデータを使い回すことはあまり無い。したがって、インメモリーであることは、Asakusaバッチにはあまり関係が無い)
Hadoopの方が速いのは、単純な集計処理で、集計キーの種類が非常に多いようなケース。
HadoopのMapReduceはソートベースのシャッフルなので(Combinerを使って中間処理を行えるので)、キーの種類が多くても問題ない。
Sparkはハッシュベースのシャッフルなので(HashMapで保持するので)、キーの種類が多いのは不利。
ということらしい。
Spark用のコーディングというものは特に無く、基本的に従来通りのコーディングのままでいい。のだが。
Operator DSLの実装はマルチスレッドセーフ(MTセーフ)になっていなければならない。
(Hadoopのタスクはマルチプロセスで実行するので、MTセーフでなくても動作した。が、Sparkはマルチスレッドで動くので、きちんとMTセーフである必要がある)
特に、SimpleDateFormatをOperatorクラスのstaticフィールドに保持して使っていると、Spark版バッチでは変になる(例外が発生する)可能性が非常に大orz
Hadoop版バッチで実行した結果もSpark版バッチで実行した結果も、出力結果は基本的に同じになるはずである。
ただし、データの並び順に依存するプログラムになっていた場合は、HadoopとSparkでは処理方法が異なるので、同じ結果になるとは限らない。
例えば畳み込み演算子(@Fold)で先頭レコードの値を有効にするコーディングになっている場合、データの順序が変わって、先頭レコードが別のものになってしまう可能性がある。
また、(単純集計演算子(@Summarize)等で)DOUBLE型(浮動小数点)のデータを集計する場合は、処理順が変わって、微小な桁の範囲で値が異なってしまうことも有り得る。
どちらの環境で動かしても完全に値を一致させたいと思ったら、上記のようなことに気を配ってプログラミングする必要がある。