S-JIS[2016-04-10] 変更履歴

Asakusa on Spark反復機能

Asakusa on SparkのIterative Extensions(反復機能)のメモ。


概要

Asakusa on Spark 0.3.0(2016/4/8)でIterative Extensions(反復機能)が加わった。
Iterative Extensionsは、バッチ引数(YAESSの-A相当)の値を複数指定して同時に実行するようなイメージ。

Iterative Extensionsの機能で実行できるバッチを「反復バッチ」と呼ぶ。


例えば、バッチ引数dateを持つバッチがあったとして、複数の日付で処理を実行したい場合は、今までは以下の様に個別に実行していた。

$ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.ExampleBatch -A date=20160401
$ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.ExampleBatch -A date=20160402
$ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.ExampleBatch -A date=20160403

これを反復バッチにすると、以下のように1回のバッチで複数の日付の処理を実行できる。

$ cat $HOME/date.json
{
    "date" : "20160401"
}
{
    "date" : "20160402"
}
{
    "date" : "20160403"
}
$ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.IterativeExampleBatch -X-parameter-table $HOME/date.json

バッチを個別に実行するのと反復バッチにして1回で実行するのとの大きな違いは、
反復バッチにした場合、Sparkから見ると1つの処理になること。空いたワーカーがあれば他の引数の処理を詰め込むことが出来るので、リソースの使用効率が上がる。
また、異なる引数の場合でも全く同一になる処理が含まれていれば、1回だけ処理することにより、効率が良くなる。


残念ながら、反復バッチにする為には、どの部分を反復可能にするかをAsakusaDSL上で指定する必要があるので、今までのバッチがアプリケーション修正なしで反復可能になるわけではない。
どの部分が反復可能で、どの部分が共通なのかをプログラマーが指定する必要がある。(一番細かい粒度だとOperatorのメソッド、一番大きい粒度はジョブフロー)
このために新たなAsakusaDSLの構文(アノテーション)が追加された。(今までのAsakusaFWの歴史の中で、DSLの拡張が行われたのは今回が初)

アプリケーション内で引数の値を取得する方法は、-Aの場合も-X-parameter-tableの場合もBatchContext.get("引数名")であり、変わらない。
(-X-parameter-tableで指定された引数であればその値が取れるし、そうでなければ-Aで指定された値が取れる)

ただし、Exporterの${引数名}では-X-parameter-tableの値が取得できないので注意が必要。


「反復可能」と聞くと、繰り返し処理する→順番に処理するというイメージを(自分は)持つが、Asakusa on Sparkの反復バッチはそうではない。
引数自体は複数(ファイル内に記述する都合上、順番に)指定するが、Sparkにまとめて渡されるので、処理自体は(Spark内のスケジューリングとクラスター内のリソースの空き具合次第だが、)複数の引数の処理が同時に実行される。


コーディング方法

ジョブフロークラス

反復バッチを定義する為には、ジョブフロークラスに対して@IterativeBatchアノテーションを付ける。

ExampleJob.java:

import com.asakusafw.vocabulary.iterative.IterativeBatch;
@IterativeBatch(name = "IterativeExampleBatch")
public class ExampleJob extends FlowDescription {
	〜
}

これをコンパイル(sparkCompileBatchapps)すると、「spark.IterativeExampleBatch」というバッチが生成される。
(反復バッチでない通常のバッチを作る必要が無い場合は、@JobFlowを付ける必要は無いし、Batchクラスを作る必要も無い)

ジョブフローに付けるのに「Batch」というアノテーション名なのはどうしたことか^^;
と思ったが、この構文だとBatchクラスを作る必要が無いので、コーディングが少し楽になる。


Importerに${引数名}を使っていて、それが反復対象の引数(-X-parameter-tableで指定する)の場合は、ジョブフローの該当ポートに@Iterativeアノテーションを付ける。

ExampleJob.java:

import com.asakusafw.vocabulary.iterative.Iterative;
	public ExampleJob(
		@Iterative({ "date" }) @Import(name = "example1", description = Example1FromCsv.class) In<Example1> example1,
		@Export(name = "example2", description = Example2ToCsv.class) Out<Example2> example2
	) {
		this.example1 = example1;
		this.example2 = example2;
	}

もしくは、インポータークラスに付けてもよいようだ。

Example1FromCsv.java:

import com.asakusafw.vocabulary.iterative.Iterative;
@Iterative({ "date" })
public class Example1FromCsv extends AbstractExample1CsvInputDescription {

	@Override
	public String getBasePath() {
		return "${date}/example1";
	}

	〜
}

オペレータークラス

BatchContext.get("引数名")を使っていて、それが反復対象の引数(-X-parameter-tableで指定する)の場合は、それを使っているOperatorのメソッドに@Iterativeアノテーションを付ける。

ExampleOperator.java:

import com.asakusafw.vocabulary.iterative.Iterative;
public abstract class ExampleOperator {

	@Iterative({ "date" })
	@Update
	public void update(Example2 in) {
		String date = BatchContext.get("date");
		in.setDateAsString(date);
	}
}

@Iterativeの引数には、反復対象となる引数名を書く。
引数名を省略すると、全反復引数に対する反復メソッドになる。

Operatorクラス内の全メソッドが反復対象の場合は、メソッド個別ではなく、Operatorクラスに@Iterativeアノテーションを付けても良い。

@Iterative({ "date" })
public abstract class ExampleOperator {
	〜
}

また、反復対象となったメソッドがあると、フロー上で後続の処理も自動的に反復対象になる。


もしBatchContextから取得した値をstaticフィールドに保持するようなコーディングをしていたら、その方法は反復処理では使えないので注意。
(-Aで取得した値だけなら、バッチの実行中は変わるはずがないので、staticフィールドに保持していても問題なかった)

なお、BatchContext.get()はstaticメソッドだが、どの反復処理中なのかを判別して値を返してくれるらしい。


Asakusa on Sparkへ戻る / AsakusaFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま