Asakusa on Sparkの使い方のメモ。
|
|
Asakusa Frameworkの実行環境としてApache Sparkを使う機能が、Asakusa on Spark 0.3.0で正式版となった。
build.gradleの書き方は開発者プレビュー版から多少変わっているが、基本的な使い方は変わっていない。
Asakusa on SparkのAsakusaアプリケーションを開発するには、build.gradleにAsakusa on Sparkの設定を追加する。
以下、EclipseにShafuがインストールされている前提。
group 'com.example'
buildscript {
repositories {
maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
}
dependencies {
classpath group: 'com.asakusafw.spark', name: 'asakusa-spark-gradle', version: '0.3.0'
}
}
apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-mapreduce'
apply plugin: 'asakusafw-spark'
apply plugin: 'eclipse'
dependencies { compile group: 'com.asakusafw.sdk', name: 'asakusa-sdk-core', version: asakusafw.asakusafwVersion compile group: 'com.asakusafw.sdk', name: 'asakusa-sdk-directio', version: asakusafw.asakusafwVersion compile group: 'com.asakusafw.sdk', name: 'asakusa-sdk-windgate', version: asakusafw.asakusafwVersion testRuntime group: 'com.asakusafw.sdk', name: 'asakusa-sdk-test-emulation', version: asakusafw.asakusafwVersion }※AsakusaFW 0.8.0以降は、Asakusa on Sparkのバージョンに応じたAsakusaFWバージョンが自動的に設定されるようになったので、asakusafwVersionは明示しないようになった。 なお、0.3.0に対応しているのはAsakusaFW 0.8.0。
buildscript {
repositories {
maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
}
dependencies {
//AsakusaFW 0.9以降はディストリビューション方式になったので、Spark用に書き換える必要は無い
classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.9.0'
}
}
apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-mapreduce'
apply plugin: 'asakusafw-spark'
apply plugin: 'eclipse'
開発環境にはSparkやScalaをインストールしておく必要は無い。
(Scalaのソースを生成するわけではないので、scalac等のコンパイラーは必要ない。コンパイルすると、バイトコードをclassファイルに直接出力しているらしい?!)
Gradleを使ってSpark用のコンパイルを行う。
(ただし、デプロイメントアーカイブを作れば自動的にSpark用のコンパイルも行われるので、普通はこの作業は不要)
$ ./gradlew sparkCompileBatchapps
$ ls build/spark-batchapps/ ←出来上がったバッチの確認
spark.example.summarizeSales
コンパイルされたバッチディレクトリー名(バッチ名)は、Batch DSLで付けた名前に接頭辞「spark.
」が付いたものになる。
(Batch DSL上の名前が「example.summarizeSales
」の場合、「spark.example.summarizeSales
」となる)
接頭辞を変えたい場合は、build.gradleにspark.batchIdPrefix
の設定を追加する。(規約プロパティ拡張を参照)
asakusafw {
spark {
batchIdPrefix 'spark.'
}
}
生成されたバッチディレクトリー中には以下のようにYAESSで実行するためのjarファイルや設定ファイルが入っている。
spark.example.summarizeSales ├─etc │ │ build.log │ │ yaess-script.properties │ │ │ └─inspection │ dsl.json │ task.json │ └─lib jobflow-byCategory.jar
従来のHadoop用のバッチディレクトリーの場合、opt/dsl-analysisというディレクトリーも作られ、その中にGraphviz用のdotファイルが生成されていた。(→ジョブフローの可視化)
Spark版の場合、Graphviz用のファイルは生成されないが、dsl.jsonやtask.jsonがその情報を持っている。(ので、それを上手く解析すればフローを生成できるらしい^^;)
また、jobflow-byCategory.jarの中にMETA-INF/asakusa-spark/plan.jsonというファイルもある。
Gradleのassemble(あるいはShafuの「Asakusaデプロイメントアーカイブを生成」)を実行すると、バッチコンパイルも自動的に行われる。
もしSpark以外のバッチも作成する設定になっている(build.gradleにapply plugin
'asakusafw-mapreduce'
等が書かれている)と、それらのバッチも作られるので、生成されるデプロイメントアーカイブには、
複数の環境のバッチが含まれる状態になる。
$ ./gradlew assemble
$ ls build/*.tar.gz ←生成されたアーカイブファイルの確認
build/asakusafw-0.8.0.tar.gz
どちらかのバッチしか作らなくていい場合は、assembleの依存タスクであるコンパイルタスク(mapreduceCompileBatchappやsparkCompileBatchapps等)を抑制すればいい。
$ ./gradlew assemble -x mapreduceCompileBatchapp
Spark用バッチを作る対象クラスを指定することも出来る。
build.gradleにspark.inclue
やspark.exclude
を設定する。(規約プロパティ拡張を参照)
asakusafw {
spark {
include 'com.example.batch.SummarizeBatch'
}
}
実行環境にはSparkをインストールしておく必要がある。[2017-01-18]
→Asakusa on Spark ユーザーガイドのSparkのセットアップ
その上で、ASAKUSA_HOME/spark/conf/env.shにspark-submitコマンドのパスを設定する。
(というか、SPARK_CMDが設定されていれば良い)
export SPARK_CMD=/opt/spark-2.1.0-bin-hadoop2.7/bin/spark-submit
Sparkの実行モード(Sparkスタンドアローンとかyarn-clusterとかyarn-clientとか)は、Sparkのデフォルト設定が使われる。
($SPARK_HOME/conf/spark-defaults.conf等のspark.master
で設定する)
→localで実行する場合の設定例
spark-submitコマンドのオプションを指定したい場合は、環境変数ASAKUSA_SPARK_OPTSに設定してからYAESSを実行する。
export ASAKUSA_SPARK_OPTS='--master local[*]'
デプロイメントアーカイブを運用環境のクライアントマシン上で解凍すると、YAESSを使ってHadoopバッチあるいはSparkバッチを実行できる。
(example.summarizeSales
というバッチの場合、batchappsディレクトリーの下に「example.summarizeSales
」と「spark.example.summarizeSales
」という2種類のディレクトリーがある。YAESSからはこのディレクトリー名を指定する)
$ cd $ASAKUSA_HOME $ yaess/bin/yaess-batch.sh example.summarizeSales -A date=2011-04-01
$ cd $ASAKUSA_HOME $ yaess/bin/yaess-batch.sh spark.example.summarizeSales -A date=2011-04-01
(Sparkバッチを実行するためには、バッチを実行するクライアントマシン上にSparkがインストールされている必要がある。→実行環境の構築)
なお、チューニングをちゃんとしないとパフォーマンスが出ないことが多い。[/2016-12-10]
ちなみに、batchappsの下のSpark用バッチのディレクトリー名を改名し、その変更した名前をYAESSの引数に渡して実行することも出来る。
(ただし、コンパイル時点で設定ファイル内にSpark版のバッチ名が書かれているようなので、ログとかに出るバッチ名は変わらないかも)
$ cd $ASAKUSA_HOME/batchapps
$ rm -rf example.summarizeSales
$ mv spark.example.summarizeSales example.summarizeSales
$ cd $ASAKUSA_HOME
$ yaess/bin/yaess-batch.sh example.summarizeSales -A date=2011-04-01 ←Spark版バッチが動く
※Asakusa on Sparkでは改名して動いたが、これは偶然だったらしく、batchappsの下のディレクトリー名を変更したときに動く保証は無いようなので注意。[2017-06-03]
Sparkの場合、ちゃんとチューニングしてやらないと、ちょっと大き目のデータでも遅くなったりOutOfMemoryErrorになったりする。[2016-12-10]
→Asakusa on Sparkの最適化設定
特に、フラグメントバッファーサイズは入れておくのが超お勧め。
アプリケーションのステージ毎の標準的なタスク分割数を指定する。
com.asakusafw.spark.parallelism=40
SparkのExecutorに割り当てた全コア数の1〜4倍程度が良いらしい。
この値はDirect
I/OのExporterのリソースパターンに「*」を指定したときの出力ファイル分割数に影響する。[2021-06-08]
デフォルト値は2なので、出力ファイルが2分割しかされず、むしろ2ファイルに集約する処理が入るので実行が遅くなってしまう。
各演算子の出力のデータモデルのオブジェクトをメモリー上に保持する個数を指定する。
com.asakusafw.spark.fragment.bufferSize=256
このプロパティーを設定していない(あるいは負の数を指定した)場合、演算子から出力されたデータモデルオブジェクトは、全てメモリー上に保持される。[2016-12-15]
したがって、演算子からの出力データが多い場合、メモリーが足りなくなってGCが頻発して実行が極端に遅くなる、あるいはOutOfMemoryErrorが発生することになる。
(ただし「出力の全てをメモリーに保持する」と言っても、シリアライズされて(Sparkの)次の処理へ渡されるまでの間のことだと思う。
例えばUpdate演算子であれば1レコードずつ処理するので、1レコード分しか保持しないと思う。
問題になりそうなのは出力レコード数が多くなりうる、Resultに出力する演算子(Extract等)だと思う。
特にGroupSortやCoGroup演算子では1グループ分の出力は全てメモリーに保持されるので、(1グループ分の入力が多いせいで)1グループの出力レコード数が多いという事が一番ありうる(というかよくある^^;))
このプロパティーを設定すると、その個数までメモリー上に保持し、その個数を超えたらファイルに退避する。
ファイルに退避する分、実行は遅くなることになる(が、経験上、GCが頻発する方が遥かに遅い)。
したがって、出力するデータ量が多くてGCが頻発している場合は、このバッファーサイズを指定してみる。
ただしこの値をあまり大きくしすぎると、デフォルト(無限)と同様の効果になってしまうので意味が無い。
バッファーサイズという名称だから大きな値を設定すればいいと誤解しそうだが(爆)
なお、退避するファイルの場所はJavaのプロパティーのjava.io.tmpdirで定義されている場所、すなわちデフォルトであれば/tmpの下になる。
なので、/tmpのディスク容量に注意が必要。