Asakusa FrameworkのOperator DSLのフロー演算子(FlowPart)のメモ。
|
フロー演算子は、演算子を組み合わせて1つの演算子のように扱う演算子。
性能特性は中身次第。
| 入力 ポート数 |
入力データモデル の制約 |
イメージ | 出力 ポート数 |
出力データモデル の制約 |
入力1レコード に対する 出力レコード数 |
|---|---|---|---|---|---|
| 任意 |
![]() |
任意 | 任意。 (0〜∞レコード) |
フロー演算子はサブルーチンのようなもの。
複数の演算子を1つのフロー演算子にまとめることにより、Flow DSLがシンプルになる。
AsakusaFWによってフローがコンパイルされるときには、フロー演算子の中身が外側に展開される。(各演算子の最適化は展開後に行われる)
したがって、実行時には、フロー演算子があることによる影響は全く無い。
WordCountを行うフロー演算子の例。
WordCountのフロー演算子の定義(中身)は以下のようになる。
(この図はToad Editorを用いて作っています)

| 入力データ例 | 中間データ例 | 出力データ例 | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| in |
|
→ | out |
|
in | → | out |
|
||||||||||||||||
"テキスト"
text_model = {
"テキスト"
text : TEXT;
};
"単語"
word_model = {
"単語"
word : TEXT;
};
"単語数"
summarized word_count_model = word_model => {
"単語"
any word -> word;
"件数"
count word -> count;
} % word;
import java.util.StringTokenizer; import com.asakusafw.runtime.core.Result; import com.asakusafw.vocabulary.operator.Extract; import com.asakusafw.vocabulary.operator.Summarize; import com.example.modelgen.dmdl.model.TextModel; import com.example.modelgen.dmdl.model.WordCountModel; import com.example.modelgen.dmdl.model.WordModel;
public abstract class WordCountOperator {
private WordModel word = new WordModel();
/**
* 単語分割
*
* @param in
* テキスト
* @param out
* 単語
*/
@Extract
public void split(TextModel in, Result<WordModel> out) {
String text = in.getTextAsString();
StringTokenizer tokenizer = new StringTokenizer(text);
while (tokenizer.hasMoreTokens()) {
word.setWordAsString(tokenizer.nextToken());
out.add(word);
}
}
/** * カウント * * @param in * 単語 * @return 単語数 */ @Summarize public abstract WordCountModel count(WordModel in); }
Operatorに書くのはフロー演算子の中から使用するもの。(フロー演算子の定義とは関係ない)
package com.example.wordcount.flowpart;
import com.asakusafw.vocabulary.flow.FlowDescription; import com.asakusafw.vocabulary.flow.FlowPart; import com.asakusafw.vocabulary.flow.In; import com.asakusafw.vocabulary.flow.Out; import com.example.modelgen.dmdl.model.TextModel; import com.example.modelgen.dmdl.model.WordCountModel; import com.example.wordcount.WordCountOperatorFactory; import com.example.wordcount.WordCountOperatorFactory.Count; import com.example.wordcount.WordCountOperatorFactory.Split;
@FlowPart
public class WordCountFlowPart extends FlowDescription {
private final In<TextModel> in;
private final Out<WordCountModel> out;
public WordCountFlowPart(
In<TextModel> in, Out<WordCountModel> out ) { this.in = in; this.out = out; }
@Override
public void describe() {
WordCountOperatorFactory operators = new WordCountOperatorFactory();
// 単語分割
Split split = operators.split(this.in);
// カウント
Count count = operators.count(split.out);
this.out.add(count.out);
}
}
フロー演算子では、Flow DSLで処理を記述する。
フロー演算子の単体テストの実装例。
フローのテストになるので、テストドライバーを使い、入出力データを用意する必要がある。
フローのテストはスモールジョブ実行エンジンで実行される。(AsakusaFW 0.5.1の頃はHadoopを使って実行されていたので、色々環境整備しないとWindowsでは実行できなかった)[/2017-06-03]
入力データおよび出力データの検証用データ・検証ルールをExcelファイルで指定する。
DMDLをコンパイルすると、プロジェクト/target/excel(gradle版の場合はプロジェクト/build/excel)の下にExcelファイルの雛形が生成される。
これをsrc/test/resourcesの下のフロー演算子のパッケージ名と同じ場所にコピーし、その中にデータを記述する。
| A | B | C | |
| 1 | text | ||
| 2 | Hello Hadoop World | ||
| 3 | Hello Asakusa | ||
| 4 | |||
| 5 |
入力データはinputシートに書く。
1行目(色付きのセル)は自動で入っている。
| A | B | C | |
| 1 | word | count | |
| 2 | Asakusa | 1 | |
| 3 | Hadoop | 1 | |
| 4 | Hello | 2 | |
| 5 | World | 1 | |
| 6 |
期待される出力データをoutputシートに書く。
1行目(色付きのセル)は自動で入っている。
| A | B | C | D | E | |
| 1 | Format | EVR-1.0.0 | |||
| 2 | 全体の比較 | 全てのデータを検査 [Strict] | |||
| 3 | プロパティ | 値の比較 | NULLの比較 | コメント | |
| 4 | word | 検査キー [Key] | NULLなら常に失敗 [DA] | 単語 | |
| 5 | count | 完全一致 [=] | NULLなら常に失敗 [DA] | 件数 | |
| 6 |
outputシートに書かれているデータと実際のデータをどう比較してどういう状態ならテストOK(あるいはNG)とするかをruleシートに書く。
どのセルも初期値は自動で入っている。
(「コメント」の“単語”や“件数”は、DMDLに自分が記述した各項目のコメント)
今回のケースでは出力項目がwordとcountの二項目なので、それぞれどういう比較をするかを書く。(書くというか、実際はプルダウンになっているので、選択する)
※ExcelファイルをEclipse外のエディターで編集したら、Eclipse上でF5を押して反映させる(srcからclassesへコピーさせる)必要がある。
package com.example.wordcount.flowpart;
import org.junit.Test; import com.asakusafw.testdriver.FlowPartTester; import com.asakusafw.vocabulary.flow.FlowDescription; import com.asakusafw.vocabulary.flow.In; import com.asakusafw.vocabulary.flow.Out; import com.example.modelgen.dmdl.model.TextModel; import com.example.modelgen.dmdl.model.WordCountModel;
/**
* {@link WordCountFlowPart}のテスト.
*/
public class WordCountFlowPartTest {
@Test
public void describe() {
FlowPartTester tester = new FlowPartTester(getClass());
In<TextModel> in = tester.input("in", TextModel.class).prepare("text_model.xls#input");
Out<WordCountModel> out = tester.output("out", WordCountModel.class).verify("word_count_model.xls#output", "word_count_model.xls#rule");
FlowDescription flowPart = new WordCountFlowPart(in, out);
tester.runTest(flowPart);
}
}
フロー演算子のテストにはFlowPartTesterを使う。
tester.input()で入力データを指定する。
第1引数はフロー演算子で指定した入力データの名前。つまりWordCountFlowPartのコンストラクターの変数名。
そのinput()メソッドの後に続いているprepare()で、入力データのExcelファイル名およびシート名を指定する。
tester.output()で出力データを指定する。
第1引数はフロー演算子で指定した出力データの名前。つまりWordCountFlowPartのコンストラクターの変数名。
verify()メソッドの第1引数が出力データのExcelファイル名およびシート名、第2引数がチェック用ルールのExcelファイル名およびシート名。
tester.runTest()にフロー演算子のオブジェクトを渡すとテストが実行される。
定義したフロー演算子を使う側(ジョブフローや別のフロー演算子)の実装例。

import com.asakusafw.vocabulary.flow.FlowDescription; import com.asakusafw.vocabulary.flow.In; import com.asakusafw.vocabulary.flow.Out; import com.example.modelgen.dmdl.model.TextModel; import com.example.modelgen.dmdl.model.WordCountModel; import com.example.wordcount.flowpart.WordCountFlowPartFactory; import com.example.wordcount.flowpart.WordCountFlowPartFactory.WordCountFlowPart;
@Override
public void describe() {
WordCountFlowPartFactory wordCountFlowPartFactory = new WordCountFlowPartFactory();
// 単語数カウント
WordCountFlowPart wordCount = wordCountFlowPartFactory.create(this.in);
this.out.add(wordCount.out);
}
Flow DSLでは、自分が作ったFlowPartのFactoryクラス(AsakusaFWのコンパイラーによって生成される)を使用する。
メソッド名は常にcreate。
戻り値の型はAsakusaFWのコンパイラーによって生成されたクラス。(フロー演算子のクラス名と同じ。(名前は同じだが別物なので注意))
フロー演算子の出力ポートの名前は、FlowPartのコンストラクターに記述した変数名。