S-JIS[2016-04-10/2021-06-08] 変更履歴

Asakusa on Spark使用方法

Asakusa on Sparkの使い方のメモ。


概要

Asakusa Frameworkの実行環境としてApache Sparkを使う機能が、Asakusa on Spark 0.3.0で正式版となった。
build.gradleの書き方は開発者プレビュー版から多少変わっているが、基本的な使い方は変わっていない。


開発環境の構築

Asakusa on SparkのAsakusaアプリケーションを開発するには、build.gradleにAsakusa on Sparkの設定を追加する。

以下、EclipseにShafuがインストールされている前提。


開発環境の構築(AsakusaFW 0.8)

  1. Shafuの設定で、Gradleのバージョンを2.12にしておく。
  2. 新規プロジェクトを作る場合は、Shafuを使ってAsakusaFW 0.8.0のプロジェクトを作成(テンプレートアーカイブやサンプルアーカイブをダウンロード)する。
    従来のプロジェクトをSpark用に変更する場合は、AsakusaFW 0.8.0にマイグレーションしておく。
  3. build.gradleを確認する。(Spark用のテンプレートをダウンロードした場合やAsakusaFW 0.8.0のサンプルの場合は、デフォルトでAsakusa on Sparkの設定が入っているので、特に変更する必要は無い)
    group 'com.example'
    buildscript {
        repositories {
            maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
        }
        dependencies {
            classpath group: 'com.asakusafw.spark', name: 'asakusa-spark-gradle', version: '0.3.0'
        }
    }
    apply plugin: 'asakusafw-sdk'
    apply plugin: 'asakusafw-organizer'
    apply plugin: 'asakusafw-mapreduce'
    apply plugin: 'asakusafw-spark'
    apply plugin: 'eclipse'
    dependencies {
        compile group: 'com.asakusafw.sdk', name: 'asakusa-sdk-core', version: asakusafw.asakusafwVersion
        compile group: 'com.asakusafw.sdk', name: 'asakusa-sdk-directio', version: asakusafw.asakusafwVersion
        compile group: 'com.asakusafw.sdk', name: 'asakusa-sdk-windgate', version: asakusafw.asakusafwVersion
        testRuntime group: 'com.asakusafw.sdk', name: 'asakusa-sdk-test-emulation', version: asakusafw.asakusafwVersion
    }
    ※AsakusaFW 0.8.0以降は、Asakusa on Sparkのバージョンに応じたAsakusaFWバージョンが自動的に設定されるようになったので、asakusafwVersionは明示しないようになった。 なお、0.3.0に対応しているのはAsakusaFW 0.8.0。
  4. build.gradleを変えたのであれば、Shafuを使ってEclipseプロジェクトの再構成を行う。

開発環境の構築(AsakusaFW 0.9)

  1. Shafuの設定で、Gradleのバージョンを3.1にしておく。[2016-12-10]
  2. 新規プロジェクトを作る場合は、Shafuを使ってAsakusaFW 0.9.0のプロジェクトを作成(テンプレートアーカイブやサンプルアーカイブをダウンロード)する。
    従来のプロジェクトをSpark用に変更する場合は、AsakusaFW 0.9.0にマイグレーションしておく。
  3. build.gradleを確認する。(Spark用のテンプレートやサンプルの場合はデフォルトでAsakusa on Sparkの設定が入っているので、特に変更する必要は無い)
    buildscript {
        repositories {
            maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
        }
        dependencies {
            //AsakusaFW 0.9以降はディストリビューション方式になったので、Spark用に書き換える必要は無い
            classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.9.0'
        }
    }
    apply plugin: 'asakusafw-sdk'
    apply plugin: 'asakusafw-organizer'
    apply plugin: 'asakusafw-mapreduce'
    apply plugin: 'asakusafw-spark'
    apply plugin: 'eclipse'
  4. Shafuを使ってEclipseプロジェクトの再構成を行う。

コンパイル方法

開発環境にはSparkやScalaをインストールしておく必要は無い。
(Scalaのソースを生成するわけではないので、scalac等のコンパイラーは必要ない。コンパイルすると、バイトコードをclassファイルに直接出力しているらしい?!)

Gradleを使ってSpark用のコンパイルを行う。
(ただし、デプロイメントアーカイブを作れば自動的にSpark用のコンパイルも行われるので、普通はこの作業は不要)

$ ./gradlew sparkCompileBatchapps

$ ls build/spark-batchapps/		←出来上がったバッチの確認
spark.example.summarizeSales

コンパイルされたバッチディレクトリー名(バッチ名)は、Batch DSLで付けた名前に接頭辞「spark.」が付いたものになる。
(Batch DSL上の名前が「example.summarizeSales」の場合、「spark.example.summarizeSales」となる)


接頭辞を変えたい場合は、build.gradleにspark.batchIdPrefixの設定を追加する。(規約プロパティ拡張を参照)

asakusafw {
    spark {
        batchIdPrefix 'spark.'
    }
}

生成されたバッチディレクトリー中には以下のようにYAESSで実行するためのjarファイルや設定ファイルが入っている。

   spark.example.summarizeSales
   ├─etc
   │  │  build.log
   │  │  yaess-script.properties
   │  │
   │  └─inspection
   │          dsl.json
   │          task.json
   │
   └─lib
           jobflow-byCategory.jar

従来のHadoop用のバッチディレクトリーの場合、opt/dsl-analysisというディレクトリーも作られ、その中にGraphviz用のdotファイルが生成されていた。(→ジョブフローの可視化
Spark版の場合、Graphviz用のファイルは生成されないが、dsl.jsonやtask.jsonがその情報を持っている。(ので、それを上手く解析すればフローを生成できるらしい^^;)
また、jobflow-byCategory.jarの中にMETA-INF/asakusa-spark/plan.jsonというファイルもある。


デプロイメントアーカイブの生成

Gradleのassemble(あるいはShafuの「Asakusaデプロイメントアーカイブを生成」)を実行すると、バッチコンパイルも自動的に行われる。

もしSpark以外のバッチも作成する設定になっている(build.gradleにapply plugin 'asakusafw-mapreduce'等が書かれている)と、それらのバッチも作られるので、生成されるデプロイメントアーカイブには、 複数の環境のバッチが含まれる状態になる。

$ ./gradlew assemble

$ ls build/*.tar.gz		←生成されたアーカイブファイルの確認
build/asakusafw-0.8.0.tar.gz

どちらかのバッチしか作らなくていい場合は、assembleの依存タスクであるコンパイルタスク(mapreduceCompileBatchappやsparkCompileBatchapps等)を抑制すればいい。

Hadoopバッチの生成を抑制する例

$ ./gradlew assemble -x mapreduceCompileBatchapp

Spark用バッチを作る対象クラスを指定することも出来る。
build.gradleにspark.incluespark.excludeを設定する。(規約プロパティ拡張を参照)

asakusafw {
    spark {
        include 'com.example.batch.SummarizeBatch'
    }
}

実行環境の構築

実行環境にはSparkをインストールしておく必要がある。[2017-01-18]
→Asakusa on Spark ユーザーガイドのSparkのセットアップ

その上で、ASAKUSA_HOME/spark/conf/env.shにspark-submitコマンドのパスを設定する。
(というか、SPARK_CMDが設定されていれば良い)

ASAKUSA_HOME/spark/conf/env.shの例:

export SPARK_CMD=/opt/spark-2.1.0-bin-hadoop2.7/bin/spark-submit

Sparkの実行モード(Sparkスタンドアローンとかyarn-clusterとかyarn-clientとか)は、Sparkのデフォルト設定が使われる。
($SPARK_HOME/conf/spark-defaults.conf等のspark.masterで設定する)
localで実行する場合の設定例

spark-submitコマンドのオプションを指定したい場合は、環境変数ASAKUSA_SPARK_OPTSに設定してからYAESSを実行する。

export ASAKUSA_SPARK_OPTS='--master local[*]'

バッチの実行方法

デプロイメントアーカイブを運用環境のクライアントマシン上で解凍すると、YAESSを使ってHadoopバッチあるいはSparkバッチを実行できる。
example.summarizeSalesというバッチの場合、batchappsディレクトリーの下に「example.summarizeSales」と「spark.example.summarizeSales」という2種類のディレクトリーがある。YAESSからはこのディレクトリー名を指定する)

バッチをHadoopで実行する場合(従来通りのMapReduceバッチ)

$ cd $ASAKUSA_HOME
$ yaess/bin/yaess-batch.sh example.summarizeSales -A date=2011-04-01

バッチをSparkで実行する場合

$ cd $ASAKUSA_HOME
$ yaess/bin/yaess-batch.sh spark.example.summarizeSales -A date=2011-04-01

(Sparkバッチを実行するためには、バッチを実行するクライアントマシン上にSparkがインストールされている必要がある。→実行環境の構築

なお、チューニングをちゃんとしないとパフォーマンスが出ないことが多い。[/2016-12-10]


ちなみに、batchappsの下のSpark用バッチのディレクトリー名を改名し、その変更した名前をYAESSの引数に渡して実行することも出来る。
(ただし、コンパイル時点で設定ファイル内にSpark版のバッチ名が書かれているようなので、ログとかに出るバッチ名は変わらないかも)

$ cd $ASAKUSA_HOME/batchapps
$ rm -rf example.summarizeSales
$ mv spark.example.summarizeSales example.summarizeSales
$ cd $ASAKUSA_HOME
$ yaess/bin/yaess-batch.sh example.summarizeSales -A date=2011-04-01	←Spark版バッチが動く

※Asakusa on Sparkでは改名して動いたが、これは偶然だったらしく、batchappsの下のディレクトリー名を変更したときに動く保証は無いようなので注意。[2017-06-03]


チューニング

Sparkの場合、ちゃんとチューニングしてやらないと、ちょっと大き目のデータでも遅くなったりOutOfMemoryErrorになったりする。[2016-12-10]
Asakusa on Sparkの最適化設定

特に、フラグメントバッファーサイズは入れておくのが超お勧め。


タスク分割数

アプリケーションのステージ毎の標準的なタスク分割数を指定する。

ASAKUSA_HOME/spark/conf/spark.properties:

com.asakusafw.spark.parallelism=40

SparkのExecutorに割り当てた全コア数の1〜4倍程度が良いらしい。

この値はDirect I/OのExporterのリソースパターンに「*」を指定したときの出力ファイル分割数に影響する。[2021-06-08]
デフォルト値は2なので、出力ファイルが2分割しかされず、むしろ2ファイルに集約する処理が入るので実行が遅くなってしまう。


フラグメントバッファーサイズ

各演算子の出力のデータモデルのオブジェクトをメモリー上に保持する個数を指定する。

ASAKUSA_HOME/spark/conf/spark.properties:

com.asakusafw.spark.fragment.bufferSize=256

このプロパティーを設定していない(あるいは負の数を指定した)場合、演算子から出力されたデータモデルオブジェクトは、全てメモリー上に保持される。[2016-12-15]
したがって、演算子からの出力データが多い場合、メモリーが足りなくなってGCが頻発して実行が極端に遅くなる、あるいはOutOfMemoryErrorが発生することになる。
(ただし「出力の全てをメモリーに保持する」と言っても、シリアライズされて(Sparkの)次の処理へ渡されるまでの間のことだと思う。
例えばUpdate演算子であれば1レコードずつ処理するので、1レコード分しか保持しないと思う。
問題になりそうなのは出力レコード数が多くなりうる、Resultに出力する演算子(Extract等)だと思う。
特にGroupSortCoGroup演算子では1グループ分の出力は全てメモリーに保持されるので、(1グループ分の入力が多いせいで)1グループの出力レコード数が多いという事が一番ありうる(というかよくある^^;)

このプロパティーを設定すると、その個数までメモリー上に保持し、その個数を超えたらファイルに退避する。
ファイルに退避する分、実行は遅くなることになる(が、経験上、GCが頻発する方が遥かに遅い)。

したがって、出力するデータ量が多くてGCが頻発している場合は、このバッファーサイズを指定してみる。
ただしこの値をあまり大きくしすぎると、デフォルト(無限)と同様の効果になってしまうので意味が無い。
バッファーサイズという名称だから大きな値を設定すればいいと誤解しそうだが(爆)

なお、退避するファイルの場所はJavaのプロパティーjava.io.tmpdirで定義されている場所、すなわちデフォルトであれば/tmpの下になる。
なので、/tmpのディスク容量に注意が必要。


Asakusa on Sparkへ戻る / AsakusaFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま