Asakusa on SparkのIterative Extensions(反復機能)のメモ。
|
Asakusa on Spark 0.3.0(2016/4/8)でIterative Extensions(反復機能)が加わった。
Iterative Extensionsは、バッチ引数(YAESSの-A相当)の値を複数指定して同時に実行するようなイメージ。
Iterative Extensionsの機能で実行できるバッチを「反復バッチ」と呼ぶ。
例えば、バッチ引数dateを持つバッチがあったとして、複数の日付で処理を実行したい場合は、今までは以下の様に個別に実行していた。
$ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.ExampleBatch -A date=20160401 $ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.ExampleBatch -A date=20160402 $ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.ExampleBatch -A date=20160403
これを反復バッチにすると、以下のように1回のバッチで複数の日付の処理を実行できる。
$ cat $HOME/date.json { "date" : "20160401" } { "date" : "20160402" } { "date" : "20160403" } $ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.IterativeExampleBatch -X-parameter-table $HOME/date.json
バッチを個別に実行するのと反復バッチにして1回で実行するのとの大きな違いは、
反復バッチにした場合、Sparkから見ると1つの処理になること。空いたワーカーがあれば他の引数の処理を詰め込むことが出来るので、リソースの使用効率が上がる。
また、異なる引数の場合でも全く同一になる処理が含まれていれば、1回だけ処理することにより、効率が良くなる。
残念ながら、反復バッチにする為には、どの部分を反復可能にするかをAsakusaDSL上で指定する必要があるので、今までのバッチがアプリケーション修正なしで反復可能になるわけではない。
どの部分が反復可能で、どの部分が共通なのかをプログラマーが指定する必要がある。(一番細かい粒度だとOperatorのメソッド、一番大きい粒度はジョブフロー)
このために新たなAsakusaDSLの構文(アノテーション)が追加された。(今までのAsakusaFWの歴史の中で、DSLの拡張が行われたのは今回が初)
アプリケーション内で引数の値を取得する方法は、-Aの場合も-X-parameter-tableの場合もBatchContext.get("引数名")
であり、変わらない。
(-X-parameter-tableで指定された引数であればその値が取れるし、そうでなければ-Aで指定された値が取れる)
ただし、Exporterの${引数名}
では-X-parameter-tableの値が取得できないので注意が必要。
「反復可能」と聞くと、繰り返し処理する→順番に処理するというイメージを(自分は)持つが、Asakusa on Sparkの反復バッチはそうではない。
引数自体は複数(ファイル内に記述する都合上、順番に)指定するが、Sparkにまとめて渡されるので、処理自体は(Spark内のスケジューリングとクラスター内のリソースの空き具合次第だが、)複数の引数の処理が同時に実行される。
反復バッチを定義する為には、ジョブフロークラスに対して@IterativeBatchアノテーションを付ける。
import com.asakusafw.vocabulary.iterative.IterativeBatch;
@IterativeBatch(name = "IterativeExampleBatch") public class ExampleJob extends FlowDescription { 〜 }
これをコンパイル(sparkCompileBatchapps)すると、「spark.IterativeExampleBatch
」というバッチが生成される。
(反復バッチでない通常のバッチを作る必要が無い場合は、@JobFlowを付ける必要は無いし、Batchクラスを作る必要も無い)
ジョブフローに付けるのに「Batch」というアノテーション名なのはどうしたことか^^;
と思ったが、この構文だとBatchクラスを作る必要が無いので、コーディングが少し楽になる。
Importerに${引数名}
を使っていて、それが反復対象の引数(-X-parameter-tableで指定する)の場合は、ジョブフローの該当ポートに@Iterativeアノテーションを付ける。
import com.asakusafw.vocabulary.iterative.Iterative;
public ExampleJob( @Iterative({ "date" }) @Import(name = "example1", description = Example1FromCsv.class) In<Example1> example1, @Export(name = "example2", description = Example2ToCsv.class) Out<Example2> example2 ) { this.example1 = example1; this.example2 = example2; }
もしくは、インポータークラスに付けてもよいようだ。
import com.asakusafw.vocabulary.iterative.Iterative;
@Iterative({ "date" }) public class Example1FromCsv extends AbstractExample1CsvInputDescription { @Override public String getBasePath() { return "${date}/example1"; } 〜 }
BatchContext.get("引数名")
を使っていて、それが反復対象の引数(-X-parameter-tableで指定する)の場合は、それを使っているOperatorのメソッドに@Iterativeアノテーションを付ける。
import com.asakusafw.vocabulary.iterative.Iterative;
public abstract class ExampleOperator { @Iterative({ "date" }) @Update public void update(Example2 in) { String date = BatchContext.get("date"); in.setDateAsString(date); } }
@Iterativeの引数には、反復対象となる引数名を書く。
引数名を省略すると、全反復引数に対する反復メソッドになる。
Operatorクラス内の全メソッドが反復対象の場合は、メソッド個別ではなく、Operatorクラスに@Iterativeアノテーションを付けても良い。
@Iterative({ "date" }) public abstract class ExampleOperator { 〜 }
また、反復対象となったメソッドがあると、フロー上で後続の処理も自動的に反復対象になる。
もしBatchContextから取得した値をstaticフィールドに保持するようなコーディングをしていたら、その方法は反復処理では使えないので注意。
(-Aで取得した値だけなら、バッチの実行中は変わるはずがないので、staticフィールドに保持していても問題なかった)
なお、BatchContext.get()
はstaticメソッドだが、どの反復処理中なのかを判別して値を返してくれるらしい。