AZAREA-Clusterでは、複数のフローをまとめて一つのアプリケーションとして実行することが出来る。
アプリケーションは以下のような手順で作成する。
「アプリケーションの編集」ダイアログの上部にある「フロー図を表示」ボタンを押すと「エンティティフローの編集」ダイアログが開き、そのアプリケーション内に登録されているフローが全て表示される。
ここで「MapReduceジョブを表示」を選ぶと“どの処理がまとまってMapReduceジョブになるか”が表示される。
package example.app; import example.flow.WordCountFlow; import java.io.IOException; import jp.co.cac.azarea.cluster.Main; import jp.co.cac.azarea.cluster.planner.job.EntityFlowManager; import jp.co.cac.azarea.cluster.util.Generated;
@Generated("AZAREA-Cluster 1.0") public class WordCountApp extends EntityFlowManager { @Override protected void initializeEntityFlow() throws IOException { addEntityFlow(new WordCountFlow()); } public static void main(String[] args) throws Exception { Main.execute(WordCountApp.class.getName(), args); } }
アプリケーションに複数のフローが含まれていると、それら全てを合わせたMapReduceジョブが生成される。
例えばフロー1のConversionとフロー2のConversionをひとつのMapReduceで実行するような合成(最適化)が行われる。
原因がよく分からないが、アプリケーションにフローを追加する為の「エンティティフロークラスを選択してください」ダイアログに、フロークラスの候補が一切出ないことがある。
パッケージの中に複数のフローがあると駄目とか、srcとtestフォルダーが分かれてると駄目とか、UNIXだと駄目とか?(原因追求してないので不明)
AZAREAでは、アプリケーションの引数を処理実行時に使用することが出来る。
これにはApplicationContextを使用する。
→類似:Asakusa FrameworkのBatchContext
initialize()の先頭でgetApplicationContext()を用いてApplicationContextを取得し、そこから値を取り出す。
@Generated("AZAREA-Cluster 1.0")
public class SalesFlow extends EntityFlow {
@Override
protected void initialize() {
final int LIMIT = getApplicationContext().getInt("LIMIT");
EntityFile<SalesEntity> entity1 = getInput(SalesEntity.class);
〜
GroupSort<Result4Entity, Result4Entity> entity9 = new GroupSort<Result4Entity, Result4Entity>(sum4, "date", GroupSort.DELIMITER, "amount DESC", "code") {
@Override
protected void merge(List<Result4Entity> entities) {
int n = 0;
for (Result4Entity entity : entities) {
if (++n > LIMIT) break;
Result4Entity result = new Result4Entity();
result.copyFrom(entity);
output(result);
}
}
};
setOutput(entity9);
}
シミュレーターで実行する場合は、testerからApplicationContextが取得できる。
このApplicationContextに値を設定しておく。
@Generated("AZAREA-Cluster 1.0")
public class SalesFlowTester {
public static void main(String[] args) throws IOException {
MapReduceJobManagerTester tester = new MapReduceJobManagerTester(new SimpleEntityFlowManager());
tester.getApplicationContext().setInt("LIMIT", 10);
tester.test("../data", SalesFlow.class.getName());
}
}
package example.hadoop2.azarea.app;
import java.io.IOException; import java.util.List; import example.hadoop2.azarea.flow.SalesFlow; import jp.co.cac.azarea.cluster.Main; import jp.co.cac.azarea.cluster.planner.job.EntityFlowManager; import jp.co.cac.azarea.cluster.util.Generated;
@Generated("AZAREA-Cluster 1.0")
public class AzareaSalesApp extends EntityFlowManager {
@Override
protected void initializeContext(List<String> args) throws IOException {
if (args.size() < 1) {
throw new IllegalArgumentException("引数は1個指定してください。");
}
getApplicationContext().setInt("LIMIT", Integer.parseInt(args.get(0)));
}
@Override
protected void initializeEntityFlow() throws IOException {
addEntityFlow(new SalesFlow());
}
public static void main(String[] args) throws Exception {
Main.execute(AzareaSalesApp.class.getName(), args);
}
}
自分でinitializeContext()をオーバーライドする(initializeEntityFlow()とmain()は自動生成される)。
そして、実行時引数を解釈・変換してApplicationContextに登録する。
ここで注意。
この状態でアプリケーションクラスの編集画面を開いて「エンティティフローの編集」から「MapReduceジョブを表示」しようとすると、以下のような例外が発生する。
java.lang.NumberFormatException: For input string: "" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:493) at java.lang.Integer.parseInt(Integer.java:514) at example.hadoop2.azarea.app.AzareaSalesApp.initializeContext(AzareaSalesApp.java:17) 〜
MapReduceジョブを計画するに当たり、アプリケーションクラスを実行しているらしい。
args.get(0)が空文字列を返しているのでparseInt()が失敗しているという事のようだ。
「エンティティフローの編集」ダイアログの「MapReduceジョブを表示」の右側に「アプリケーション引数」という入力エリアがあり、ここに引数を書くとargsに渡される。
(abcと書いたら「java.lang.NumberFormatException: For input string: "abc"
」に変わった)
分かりにくいよ!(苦笑) MapReduceの実行計画を表示するのに引数を実際に入れないといけないとは思わないし。
Hadoopで実行する際には、「hadoop jar」コマンドの最後尾に引数を記述する。
hadoop jar AzareaSalesApp.jar -i=inputdir -o=outputdir 10