Asakusa Framework0.8.1でWordCountを作ってみる。
|
|
入力はベタのテキストファイルとし、出力をCSVファイルとする。
(AsakusaFW 0.8.1より前はベタのテキストを扱うには工夫が必要だったが、0.8.1でベタのテキストを読み込むDirect
I/O lineが加わったので、それを使用する)
開発環境としては、EclipseにShafuとDMDL EditorXがインストールされているものとする。
一番最初に、Eclipseで扱えるWordCount用プロジェクトを作成する。(Shafuの新規プロジェクト作成機能を使用する)
最初に、入力データ・出力データを表すクラス(モデル)を作成する。
モデルはdmdlというファイルに定義を書き、モデルジェネレーターによってJavaソースに変換される。
入力ファイル・出力ファイルの一行分のデータ構造をDMDL(Data Model Definition Language)という言語で記述する。(JSONぽい感じ。見ればすぐ分かる)
このDMDLファイル内にはまずモデル名を書くが、これがクラス名に変換される。
モデル名はファイル名と異なっていてもいいし、1つのファイル内に複数のモデルを書いてもよい。
ダブルクォーテーションで囲んだ部分が、JavaソースのJavadocコメントになる。
参考: Asakusa FrameworkのDMDLユーザーガイド
ここではmodels.dmdlというファイルを作成し、データモデルを記述する。
WordCountであれば、入力は単なる文字列(text)、出力は単語(word)と出現数(count)。
"テキストファイルの一行に対応するエンティティ" @directio.line line_model = { "テキスト" text : TEXT; }; "単語と発生回数の対を表現するエンティティ" @directio.csv word_count_model = { "発生した単語" word : TEXT; "発生回数" count : INT; };
「@directio.line」というのは、そのデータモデルをHDFS上のテキストファイルとして扱うという印。
「@directio.csv」というのは、そのデータモデルをHDFS上のCSVファイルとして扱うという印。
作成したDMDLファイルからデータモデルクラス(Javaソース)を生成する。
また、@directio.lineや@directio.csv属性が付いていると、入出力ファイルとしての定義クラス(Importer/Exporterの抽象クラス)も生成されるようになる。(ジョブフローの作成時に関係してくる)
Shafuを使う場合は以下のようにして生成する。
あるいは、DMDL EditorXを使う場合は以下のようにする。
次に、オペレーター(演算子)を作成する。
オペレーターは、HadoopのMapReduceのMapper#map()やReducer#reduce()に相当する部分。
つまりデータ変換や集計といった実際の処理を記述する。
オペレーター用のクラスを用意し、その中にメソッドを定義して処理を記述していく。(このメソッドは、実行時に実際に呼ばれる)
メソッドには、どのような処理を行うかに応じてAsakusa Frameworkで用意されているアノテーションを付ける必要がある。
そして、アノテーションの種類(というか処理の種類)によって、メソッドの書き方(引数や戻り値など)が変わってくる。
オペレーターはJavaの抽象クラス内に定義する決まりになっているので、最初に空のクラスを作る。
package com.example.operator; public abstract class WordCountOperator { }
HadoopのMapReduceのMapperやReducerではクラスの中に1メソッド(map()やreduce())しか書かないが、
AsakusaFWのOperatorクラスは、その1つのクラスの中に複数のメソッド(テキスト分割のメソッドや集計のメソッド)を記述する。
まず、入力データを単語ごとに分割する処理を考える。
入力データは一種類で、出力は入力1件(テキスト1行)に対して複数件(複数の単語)になる。
このように複数件の出力を行うには、メソッドの出力部分にResultというクラスが指定できる演算でなければならない。(Result#add()を何度も呼び出してデータを出力する為)
各演算子のサンプルを見てみると、Resultが指定できそうなのはExtract・Split・GroupSort・CoGroup。
Splitは名前からして分割に使えそうに思えるが、結合されたデータモデルを分割するものなので、
全く目的に合わない。
GroupSortも入力が複数件になり、そもそも今の時点でソートは不要(冗長な処理になる)なので、却下。
CoGroupは複数種類のデータを入力としてキーで結合するものであり、今回の目的には冗長なので、却下。
という訳で、1件読み込んで複数件出力できるExtractを採用する。
import java.util.StringTokenizer; import com.asakusafw.runtime.core.Result; import com.asakusafw.vocabulary.operator.Extract; import com.example.modelgen.dmdl.model.LineModel; import com.example.modelgen.dmdl.model.WordCountModel;
/** * 単語と発生回数の対を表現するエンティティ */ private final WordCountModel wordCount = new WordCountModel();
/** * 単語分割 * * @param line * テキストファイルの一行 * @param wordCountResult * 単語と発生回数の対 */ @Extract public void split(LineModel line, Result<WordCountModel> wordCountResult) { String text = line.getTextAsString(); StringTokenizer tokenizer = new StringTokenizer(text); while (tokenizer.hasMoreTokens()) { String word = tokenizer.nextToken(); wordCount.setWordAsString(word); wordCount.setCount(1); wordCountResult.add(wordCount); } }
Extract演算子の場合、MapReduceのMapper#map()と同様に、入力の1行につき1回ずつ呼ばれる。
DMDL上でTEXTとして宣言した項目は、Javaのクラス上はHadoopのTextクラスになっている。
が、セッター・ゲッターには「asString」という接尾辞が付いたメソッドが用意されているので、Stringクラスで読み書きできる。
出力は、MapReduceのcontext.write()の代わりにResultにaddしていく。
出力に使うインスタンスは、フィールドに定義しておいて使い回すことが出来る。
(ここで付けたResultの引数名は、後でジョブフローを定義する際に使われる
。ひとつしか出力が無い場合は、大抵はoutという名前を付ける。(今回は使われていることを分かり易くする為に長ったらしい名前にしてみた))
なお、ここで下記の様な中途半端な状態(コーディング途中)にすると、コンパイルエラーが発生する。(プロジェクトを自動的にビルドする設定になっている場合)
import com.asakusafw.vocabulary.operator.Extract; public class WordCountOperator { @Extract public void split() {} }
エラー位置はクラス名部分、エラーメッセージは「演算子クラスcom.example.operator.WordCountOperatorはabstractとして宣言する必要があります」。
他にもExtractアノテーションの場合はメソッドの引数にResultが要る等のエラーメッセージを出してくれる。
このエラーは、AsakusaFW(Ashigelコンパイラー)が出しているもの。
AsakusaFWのインストール(Eclipseの設定)をすると、Asakusa DSLの文法に沿っているかどうかをチェックする機能が入る。すごい!
次に単語数の集計処理。
HadoopのMapReduceに倣ってCoGroupを使うことにする。(今回は入力が1種類しか無いので、GroupSortでも構わないが)
なお、AsakusaFWとしては「CoGroupを使うのは最後の手段」とされているので、今回の例ならSummarizeを使った集計処理やFoldを使った集計処理の方がよい。
入力リストのキーに単語項目を指定すれば、単語ごとにメソッドが呼ばれるようになる。
import java.util.List; import com.asakusafw.runtime.core.Result; import com.asakusafw.runtime.value.StringOption; import com.asakusafw.vocabulary.model.Key; import com.asakusafw.vocabulary.operator.CoGroup;
@CoGroup public void sum( @Key(group = { "word" }) List<WordCountModel> wordCountList, Result<WordCountModel> wordCountResult ) { StringOption word = wordCountList.get(0).getWordOption(); int count = 0; for (WordCountModel wc : wordCountList) { count += wc.getCount(); } wordCount.setWordOption(word); wordCount.setCount(count); wordCountResult.add(wordCount); }
出力に使うインスタンスは使い回すことが出来るので、分割処理(split)に使ったのと同じwordCountフィールドを使っている。
(実際に実行される際には、演算子メソッドが異なる場合はOperatorインスタンス自体が別のものになるので、同じwordCountフィールドを使っているように見えても、分割処理と同一のインスタンスが使われるわけではない)
オペレータークラスを書く(保存する)と、(Eclipseがプロジェクトを自動的にビルドする設定になっている場合は)自動的にオペレーターファクトリークラスが生成される。
(ジョブフローを記述する際に使うのはファクトリークラス)
なお、Operatorのソースを変更(して保存)する度に自動的にOperatorFactory(やOperatorImpl)も更新されるが、
Operatorを削除した場合はOperatorFactory(とOperatorImpl)は削除されず、(Operatorクラスが見つからなくて)コンパイルエラーになる。
次に、ジョブフローを定義する。
ここで「どのファイルを入力とし、どのオペレーターを呼んで、どのファイルへ出力するか」を記述する。
参考: Asakusa FrameworkのAsakusa DSLスタートガイド#データフローを記述する
まずは入力ファイル(Importer)を定義する。
DMDLファイル上のモデル定義に@directio.という属性を付けておくとImporterの雛形クラス(抽象クラス)が生成されるので、それを継承した具象クラスを作成する。
package com.example.jobflow.gateway; import com.example.modelgen.dmdl.csv.AbstractLineModelCsvInputDescription;
public class LineFromFile extends AbstractLineModelLineInputDescription { @Override public String getBasePath() { return "input/file01"; } @Override public String getResourcePattern() { return "*.csv"; } }
line_modelというモデル名に@directio.lineを付けていた場合、AbstractLineModelLineInputDescription(Abstract+モデル名+LineInputDescription)というクラスが生成されているので、それを使う。
ベースパスやリソースパターンはDirect I/O固有の設定。
ベースパスはディレクトリー名相当、リソースパターンがファイル名。
ベースパスは相対パスっぽい指定になる。具体的にどこになるのかは、実行環境の設定によって変わる。
次いで出力ファイル(Exporter)。これもImporterと同様。
package com.example.jobflow.gateway; import com.example.modelgen.dmdl.csv.AbstractWordCountModelCsvOutputDescription;
public class WordCountToFile extends AbstractWordCountModelCsvOutputDescription { @Override public String getBasePath() { return "result/wordcount"; } @Override public String getResourcePattern() { return "wc-*"; } }
word_count_modelというモデル名に@directio.csvを付けていた場合、AbstractWordCountModelCsvOutputDescription(Abstract+モデル名+CsvOutputDescription)というクラスが生成されているので、それを使う。
そして、入力→処理→出力のつながり(順序・フロー)を記述する。
package com.example.jobflow; import com.asakusafw.vocabulary.flow.FlowDescription; import com.asakusafw.vocabulary.flow.JobFlow;
@JobFlow(name = "WordCountJob") public class WordCountJob extends FlowDescription { }
JobFlowというアノテーションを付け、ジョブ名を指定する。
クラス自体はFlowDescriptionを継承する。
ジョブフローでは、コンストラクターで入力データと出力データを受け取るようだ。
それをフィールドで保持しておき、使用する。
import com.asakusafw.vocabulary.flow.Export; import com.asakusafw.vocabulary.flow.Import; import com.asakusafw.vocabulary.flow.In; import com.asakusafw.vocabulary.flow.Out; import com.example.jobflow.gateway.LineFromFile; import com.example.jobflow.gateway.WordCountToFile; import com.example.modelgen.dmdl.model.LineModel; import com.example.modelgen.dmdl.model.WordCountModel;
private final In<LineModel> in; private final Out<WordCountModel> out; public WordCountJob( @Import(name = "in", description = LineFromFile.class) In<LineModel> in, @Export(name = "out", description = WordCountToFile.class) Out<WordCountModel> out) { this.in = in; this.out = out; }
FlowDescriptionのdescribe()メソッドをオーバーライドし、フローを記述する。
import com.example.operator.WordCountOperatorFactory; import com.asakusafw.vocabulary.flow.Source;
WordCountOperatorFactory operator = new WordCountOperatorFactory(); @Override protected void describe() { Source<WordCountModel> splitResult = operator.split(in).wordCountResult; Source<WordCountModel> sumResult = operator.sum(splitResult).wordCountResult; out.add(sumResult); }
operator.split()やoperator.sum()が、自分で作ったオペレーターを表している。
自分でオペレーターを記述した際に、メソッドの引数で出力用のResultを定義したが、その変数名(上記のwordCountResult)で出力データを受け取るイメージ。
(自分でコーディングしたのはOperatorクラスで、それを元にOperatorFactoryが生成されており、変数名等が流用されている)
そうして受け取ったデータを次の演算子(オペレーター)に渡し、最終的にout(出力ファイル)に渡す。
「Source」は、次の演算子への入力データ(の型)を意味すると思えばいいだろう。
※このdescribe()メソッドはAsakusaFWがジョブフローをコンパイルする際に実行される(実行することによってフロー(グラフ)を作成している)。
→ジョブフロークラスのテスト方法
→ジョブフローのグラフ化(可視化)
最後にBatch DSLで「どのジョブフローを実行するか」を記述する。
(WordCountだとシンプルすぎて、ありがたみが無いかも^^;)
参考: Asakusa FrameworkのAsakusa DSLスタートガイド#バッチを記述する
package com.example.batch; import com.asakusafw.vocabulary.batch.Batch; import com.asakusafw.vocabulary.batch.BatchDescription;
@Batch(name = "WordCountBatch") public class WordCountBatch extends BatchDescription { }
BatchDescriptionのdescribe()メソッドをオーバーライドし、どのジョブを実行するかを記述する。
import com.example.jobflow.WordCountJob;
@Override protected void describe() { run(WordCountJob.class).soon(); }
実行するジョブのClassをrun()で指定し、soon()で“すぐ(前提条件が無いので最初に)実行する”という意味になる。
オペレーター(演算子)はバッチアプリケーションとしての実行時に普通に呼び出されるクラスなので、単体テストは普通にJUnit4で行う。
ただしOperatorは抽象クラスなので、直接インスタンス化することが出来ない。
Operator(演算子)クラスを保存(コンパイル)すると自動的にOperatorFactoryが作られるが、同時にOperatorImplというクラスも作られる。テストにはこれを使用する。
今回のメソッドの入力データは単なるListだからいいとして、出力データを入れるResultにはMockResultを使う。
参考: Asakusa Frameworkのアプリケーションテストスタートガイド
package com.example.operator; import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.*; import java.util.*; import org.junit.Test; import com.asakusafw.runtime.testing.MockResult; import com.example.modelgen.dmdl.model.LineModel; import com.example.modelgen.dmdl.model.WordCountModel; public class WordCountOperatorTest {
@Test public void testSplit() { testSplit("word1", "word1"); testSplit(""); testSplit("word3 blank", "word3", "blank"); testSplit("word4 blank", "word4", "blank"); testSplit("word5\ttab", "word5", "tab"); testSplit("word6\t tab-blank", "word6", "tab-blank"); testSplit("word7\ttab blank", "word7", "tab", "blank"); } private void testSplit(String text, String... expected) { LineModel line = new LineModel(); line.setTextAsString(text); MockResult<WordCountModel> result = new MockResult<WordCountModel>() { @Override protected WordCountModel bless(WordCountModel result) { WordCountModel copy = new WordCountModel(); copy.copyFrom(result); return copy; } }; WordCountOperatorImpl operator = new WordCountOperatorImpl(); operator.split(line, result); List<WordCountModel> r = result.getResults(); assertThat(r.size(), is(expected.length)); for (int i = 0; i < expected.length; i++) { WordCountModel wc = r.get(i); assertThat(wc.getWordAsString(), is(expected[i])); assertThat(wc.getCount(), is(1)); } }
@Test public void testSum() { WordCountOperatorImpl operator = new WordCountOperatorImpl(); { // リストの要素が1個 List<WordCountModel> list = new ArrayList<WordCountModel>(); list.add(createWordCount("word", 99)); MockResult<WordCountModel> result = new MockResult<WordCountModel>(); operator.sum(list, result); List<WordCountModel> r = result.getResults(); assertThat(r.size(), is(1)); assertThat(r.get(0).getWordAsString(), is("word")); assertThat(r.get(0).getCount(), is(99)); } { // リストの要素が複数 List<WordCountModel> list = new ArrayList<WordCountModel>(); String word = "word1"; list.add(createWordCount(word, 1)); list.add(createWordCount(word, 2)); list.add(createWordCount(word, 3)); MockResult<WordCountModel> result = new MockResult<WordCountModel>(); operator.sum(list, result); List<WordCountModel> r = result.getResults(); assertThat(r.size(), is(1)); assertThat(r.get(0).getWordAsString(), is(word)); assertThat(r.get(0).getCount(), is(6)); } } WordCountModel createWordCount(String word, int count) { WordCountModel m = new WordCountModel(); m.setWordAsString(word); m.setCount(count); return m; } }
AsakusaFWでは、ジョブフロー(とFlowPart)とバッチのテスト用クラスが用意されている。
それらを使ってJUnit4として実行できる。
(実行にはスモールジョブ実行エンジンが使われる)
参考: Asakusa Frameworkのアプリケーションテストスタートガイド#データフローのテスト
また、AsakusaFWの実行環境も必要なので、作っていない場合は作成する。
Shafuを使ってAsakusaFWの実行環境を作成する。
参考: Asakusa Frameworkのアプリケーションテストスタートガイド#ジョブフローのテスト
パッケージエクスプローラー上でジョブフロークラスのソース(WordCountJob)を右クリックして、「新規(W)」→「JUnitテスト・ケース」でJUnit4のテスト用クラスを作成する。
その際、作成先のディレクトリーは「afw-wordcount/src/test/java」とする。
package com.example.jobflow; import org.junit.Test; import com.asakusafw.testdriver.JobFlowTester; import com.example.modelgen.dmdl.model.LineModel; import com.example.modelgen.dmdl.model.WordCountModel;
public class WordCountJobTest { @Test public void testDescribe() { String inDataSheet = "line_model.xls"; String outDataSheet = "word_count_model.xls"; JobFlowTester tester = new JobFlowTester(getClass()); tester.input("in", LineModel.class).prepare(inDataSheet + "#input"); tester.output("out", WordCountModel.class).verify(outDataSheet + "#output", outDataSheet + "#rule"); tester.runTest(WordCountJob.class); } }
driver.input()で入力データを指定する。
第1引数はジョブフローで指定した入力データの名前。つまりWordCountJobのコンストラクターの「@Import(name = "in"
」。
この名前を使って入力データのファイル名を取得しているようだ。(一致しているものが無いと、入力ファイルを作れない)
そのinput()メソッドの後に続いているprepare()で、入力データのExcelファイル名(line_model.xls)を指定する。
driver.output()もdriver.input()と同様。
第1引数はジョブフローの出力データの名前「@Export(name = "out"
」。検証時にこの名前を使ってデータを取得するようだ。(一致しているものが無いと、実行が終わった後のチェックでエラーになる)
verify()メソッドの第1引数が出力データのExcelファイル名、第2引数がチェック用ルールのExcelファイル名。
「#input」「#output」「#rule」は、そのExcelファイル内のシート名を表している。
数値を使って
「#:1」や「#:2」の様に記述することもでき、その場合は「#:1」が左から2番目のoutputシート、「#:2」が3番目のruleシートを表す。
省略した場合は「#:0」扱いで、すなわち一番左にあるinputシートを指す。
ジョブフローのテストで使用するテストデータをExcelファイルで(またはJSON形式で)用意する必要がある。
参考: Asakusa FrameworkのExcelによるテストデータ定義
DMDLをコンパイルして(Javaソースの他に) デーモデル毎のExcelファイルが生成させることが出来る。
Shafuの「テストデータ・テンプレートを作成」を実行するとafw-wordcount/build/excelの下にExcelファイルが生成されるので、それをsrc/test/resources/com/example/jobflow/の下にコピーし、その中にデータを記述する。
(コピー先のディレクトリー(パッケージ)は、ジョブフローテストクラスのパッケージに合わせる)
A | B | C | |
1 | text | ||
2 | Hello Hadoop World | ||
3 | Hello Asakusa | ||
4 | |||
5 |
入力データはinputシートに書く。
1行目(色付きのセル)は自動で入っている。
A | B | C | |
1 | word | count | |
2 | Asakusa | 1 | |
3 | Hadoop | 1 | |
4 | Hello | 2 | |
5 | World | 1 | |
6 |
期待される出力データをoutputシートに書く。
1行目(色付きのセル)は自動で入っている。
A | B | C | D | E | F | |
1 | Format | EVR-2.0.0 | ||||
2 | 全体の比較 | 全てのデータを検査 [Strict] | ||||
3 | プロパティ | 値の比較 | NULLの比較 | コメント | オプション | |
4 | word | 検査キー [Key] | NULLなら常に失敗 [DA] | 発生した単語 | ||
5 | count | 完全一致 [=] | NULLなら常に失敗 [DA] | 発生回数 | ||
6 |
outputシートに書かれているデータと実際のデータをどう比較してどういう状態ならテストOK(あるいはNG)とするかをruleシートに書く。
どのセルも初期値は自動で入っている。
(「コメント」の“発生した単語”や“発生回数”は、DMDLに自分が記述した各項目のコメント)
今回のケースでは出力項目がwordとcountの二項目なので、それぞれどういう比較をするかを書く。(書くというか、実際はプルダウンになっているので、選択する)
このシートを見ただけで予想が付くようになっているのは素晴らしい。
→Excelファイルを使用せず、独自のデータを入力データにする方法
バッチクラスのテスト方法は、ジョブフローのテスト方法とほとんど同じ。[2011-08-04]
参考: Asakusa Frameworkのアプリケーションテストスタートガイド#バッチのテスト
Excelデータについては、ジョブフローのテスト用と全く同じものでも問題ない。
コピー元 | afw-wordcount/build/excel/ | line_model.xls word_count_model.xls |
ジョブフローのテスト用 | afw-wordcount/src/test/resources/com/example/jobflow/ | line_model.xls word_count_model.xls |
バッチのテスト用 | afw-wordcount/src/test/resources/com/example/batch/ | line_model.xls word_count_model.xls |
package com.example.batch; import org.junit.Test; import com.example.modelgen.dmdl.model.LineModel; import com.example.modelgen.dmdl.model.WordCountModel; import com.asakusafw.testdriver.BatchTester;
public class WordCountBatchTest { @Test public void testDescribe() { String inDataSheet = "line_model.xls"; String outDataSheet = "word_count_model.xls"; BatchTester tester = new BatchTester(getClass()); tester.jobflow("WordCountJob").input("in", LineModel.class).prepare(inDataSheet + "#input"); tester.jobflow("WordCountJob").output("out", WordCountModel.class).verify(outDataSheet + "#output", outDataSheet + "#rule"); tester.runTest(WordCountBatch.class); } }
tester.jobflow()には、ジョブフローのアノテーション「@JobFlow(name = "WordCountJob")
」で付けた名前を指定する。
そこ以降のinput()・output()については、ジョブフローのテストと全く同じ書き方。
単語の集計をするオペレーターを単純集計演算子Summarizeで作ってみる。
参考: Asakusa FrameworkのAsakusa DSLスタートガイド#単純集計演算子の実装例
まず、DMDLファイルに集計モデルを定義する。
集計モデルは、キー毎に集計(合算や件数カウント)を行った結果を表すデータモデル。
参考: Asakusa FrameworkのDMDLユーザーガイド#集計モデルを定義する
"WordCountの集計モデル" @directio.csv summarized word_count_total = word_count_model => { any word -> word; sum count -> count; } % word;
summarizedを先頭に付け、集計モデル名(word_count_total)を指定する。
その後ろに集計元となるモデル名(word_count_model)を書く。
そこから「=>」で続けて、演算の種類を書く。sumで集計する(countで件数カウントも出来る模様)。集計キーとなる項目にはanyを付ける。
「->」で項目の別名(というか集計モデルでの項目名)を付けられる。同じ名前でいい場合でも省略できない。
「%」で集計キーとなる項目を指定する。(SQLのgroup byのような感じ)
※この記述上では型を指定していないので同じ型になると期待していたのだが、word_count_modelのcountはINTなのに、word_count_totalのcountはLONGになる。
データモデルを記述したら、DMDLのコンパイルを行ってJavaクラスを生成しておく。
import com.asakusafw.vocabulary.operator.Summarize; import com.example.modelgen.dmdl.model.WordCountTotal;
@Summarize public abstract WordCountTotal sum2(WordCountModel wc);
集計モデルを定義する他に、Operatorクラスにも演算子メソッドを定義する必要がある。
Summarizeの場合、メソッド本体は記述しない(抽象メソッドにする)。
戻り型には集計結果のモデル(集計モデル)、引数には集計対象(集計元)のモデルを指定する。
また、このデータモデルで出力する為のExporterも作る必要がある。
public class WordCountTotalToFile extends AbstractWordCountTotalCsvOutputDescription { @Override public String getBasePath() { return "result/wordcount"; } @Override public String getResourcePattern() { return "wc-*"; } }
ジョブフローを定義する。
import com.example.modelgen.dmdl.model.WordCountTotal;
@Override protected void describe() { Source<WordCountModel> splitResult = operator.split(in).wordCountResult; // Source<WordCountModel> sumResult = operator.sum(splitResult).wordCountResult; Source<WordCountTotal> sumResult = operator.sum2(splitResult).out; out.add(sumResult); }
sumResultの型が変わった(Model→Total)ので、out.add()がコンパイルエラーになる。
WordCountJobのコンストラクターをWordCountTotalに変える必要がある。
In<LineModel> in; Out<WordCountTotal> out; public WordCountJob( @Import(name = "in", description = LineFromFile.class) In<LineModel> in, @Export(name = "out", description = WordCountTotalToFile.class) Out<WordCountTotal> out) { this.in = in; this.out = out; }
Operatorクラスの単体テストとしては、Summarizeはメソッド内容が自動的に生成されるので、テストケースを作る必要は無い(という考え方のようだ)。
Operatorクラスのテストに使うOperatorImplを見ると、以下のようになっている。
@Override public WordCountTotal sum2(WordCountModel wc) { throw new UnsupportedOperationException("単純集計演算子は組み込みの方法で処理されます"); }つまり、OperatorImplを使ってインスタンスを生成しても、このメソッドを呼び出してテストすることは出来ないわけだ。
ジョブフローのテストは出力対象クラスが変わったので修正しておく必要がある。
public class WordCountJobTest { @Test public void testDescribe() { String inDataSheet = "line_model.xls"; String outDataSheet = "word_count_model.xls"; JobFlowTester driver = new JobFlowTester(this.getClass()); driver.input("in", LineModel.class).prepare(inDataSheet + "#input"); driver.output("out", WordCountTotal.class).verify(outDataSheet + "#output", outDataSheet + "#rule"); driver.runTest(WordCountJob.class); } }
本来なら出力データを記述しているExcelファイルも作り直すべきなのかもしれないが、今回の例では項目名は全く変わっていないので、そのまま使える。
単純集計演算子(Summarize)には部分集約(partialAggregation)というものがある。
「部分集約を行う場合、〜、ネットワークの転送量を削減しようとします」と説明されているので、MapReduceのCombinerのような中間集計のことなのだろう。
部分集約は、Summarizeアノテーションの引数として指定する。
import com.asakusafw.vocabulary.flow.processor.PartialAggregation;
@Summarize(partialAggregation=PartialAggregation.PARTIAL) public abstract WordCountTotal sum2(WordCountModel wc);
デフォルトは、PartialAggregation.TOTAL(部分集約(中間集計)を行わない)。
Summarizeを使う例としてジョブフローの出力の型(モデル)をWordCountTotalに変更したが、
最終的に出力する前にWordCountTotalからWordCountModelへ変換してやれば、フロー以降(WordCountJobのExportクラスとWordCountToFileとテスト用データのクラス指定)は修正しなくても済む。
モデル(クラス)を変換するには、Convert(変換演算子)を使用する。
import com.asakusafw.vocabulary.operator.Convert;
private WordCountModel wordCountModel = new WordCountModel();
@Convert
public WordCountModel wordCountTotal2Model(WordCountTotal t) {
wordCountModel.setWord(t.getWord());
wordCountModel.setCount((int) t.getCount());
return wordCountModel;
}
ただ単にフィールドを移し替えているだけ。
AsakusaFWはスレッドセーフ(Operatorのインスタンスがスレッド毎に作られるという意味)なので、移し先のインスタンスをフィールドに置いておいて使い回しても大丈夫。
@Override protected void describe() { Source<WordCountModel> splitResult = operator.split(in).wordCountResult; Source<WordCountTotal> sumResult = operator.sum2(splitResult).out; Source<WordCountModel> convertResult = operator.wordCountTotal2Model(sumResult).out; out.add(convertResult); }
フローの出力の型はWordCountModelで行ける。
これでOK!と思ってWordCountJobTestを実行したら、こんなエラーが…。
java.lang.RuntimeException: java.io.IOException: 実行計画の作成に失敗しました ([WordCountOperator.wordCountTotal2Modelの出力originalは結線されていません ([WordCountOperator.wordCountTotal2Model(operator#15136722)])]) at com.asakusafw.testdriver.JobFlowTester.runTest(JobFlowTester.java:95) 〜 Caused by: java.io.IOException: 実行計画の作成に失敗しました ([WordCountOperator.wordCountTotal2Modelの出力originalは結線されていません ([WordCountOperator.wordCountTotal2Model(operator#15136722)])]) at com.asakusafw.compiler.flow.FlowCompiler.plan(FlowCompiler.java:172) 〜
「出力originalが結線していない」とな?
実は、Convertではoutに変換したデータが出力され、originalに元々の入力データがそのまま出力される。
つまり、originalという出力データをフロー上でどこにも渡していないからこのエラーになったようだ。
データをどこにも渡す必要が無い(データを捨てていい)場合、フロー上で停止演算子stopへデータを渡す。(逆に0件の入力が欲しい場合は空演算子emptyが使える)
stop(やempty)はCoreOperatorFactoryというクラスでAsakusaFWによって定義されているので、それを使う。
import com.example.operator.WordCountOperatorFactory.WordCountTotal2Model; import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
CoreOperatorFactory core = new CoreOperatorFactory();
@Override
protected void describe() {
Source<WordCountModel> splitResult = operator.split(in).wordCountResult;
Source<WordCountTotal> sumResult = operator.sum2(splitResult).out;
WordCountTotal2Model convert = operator.wordCountTotal2Model(sumResult);
out.add(convert.out);
core.stop(convert.original);
}
先ほどは変換演算子ConvertによってWordCountTotalからWordCountModelへ変換したが、AsakusaFWでは、プロパティー(項目名と型)が同一な場合は再構築演算子restructureを使うのが推奨されている。
restructureもCoreOperatorFactoryで定義されているので、自分でOperatorを作る必要は無い。
import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
CoreOperatorFactory core = new CoreOperatorFactory(); @Override protected void describe() { Source<WordCountModel> splitResult = operator.split(in).wordCountResult; Source<WordCountTotal> sumResult = operator.sum2(splitResult).out; out.add(core.restructure(sumResult, WordCountModel.class)); }
restructure()の第2引数には、変換先のクラスを指定する。
で、実行してみたら、エラーが出た。
ERROR c.a.c.flow.FlowCompilingEnvironment - restructureにおいて、DataModelClass(sample.modelgen.dmdl.model.WordCountTotal).countとDataModelClass(sample.modelgen.dmdl.model.WordCountModel).countのプロパティ型が一致しません
「WordCountTotalのcountとWordCountModelのcountのプロパティ型が一致しない」って。
WordCountModelのcountはint型なのだが、それの集計モデルであるWordCountTotalはlong型になってしまうんだよね。
集計モデルのDMDLで型を指定する方法は無い。
(
「sum count -> count : INT;
」とか「sum count : INT -> count;
」を試してみたが、駄目だった)
仕方が無いので、word_count_modelをINTからLONGに変えてやったら、動いた。
※併せてWordCountOperatorTestの「assertThat(wc.getCount(), is(1));
」とかも「is(1L)
」(long型)にしないとコンパイルエラーになる。
AsakusaFWのOperator DSLには畳み込み演算子(Fold)という演算子がある。
これを使って集計処理をすることが出来る。
ScalaでfoldLeftという関数を勉強してなかったら「たたみこみ?fold?意味不明」と思ってスルーしてたところだ^^;
「畳み込み」とは、コレクション(Listとか)内の要素同士を計算して結果を出すもの。
例えばScalaで「List(1,10,100).foldLeft(0){ _ + _ }
」と書くと、1・10・100が入っているListに対し、各要素に演算を行っていく。foldLeft直後の括弧内が初期値で、最初に初期値と先頭要素で演算を行う。すなわち「0+1」を行う。そしてその結果の1と次の10で「1+10」を行い、その結果の11と次の100で「11+100」を行い、最終的に111という値が返る。
ScalaではfoldLeftと似た関数にreduceLeftというものがあり、そちらは「List(1,10,100).reduceLeft{ _ + _ }
」の様に書く。foldLeftと違って初期値が与えられていないので、最初に先頭2つの要素同士で計算する。後は同じで、最終的に111が返る。
AsakusaFWのFold演算子はScalaのreduceLeftに近い。
import com.asakusafw.vocabulary.model.Key; import com.asakusafw.vocabulary.operator.Fold;
@Fold public void sum3(@Key(group = "word") WordCountModel left, WordCountModel right) { left.setCount(left.getCount() + right.getCount()); }
Foldでは、leftとrightの2つの引数をとる。@Keyで集計キーを指定する。(キー毎に処理する為)
そして集計(演算)結果をleftに入れるようにする。
ちなみに、Foldも部分集約(中間集計)を行える。
@Fold(partialAggregation=PartialAggregation.PARTIAL) public void sum3(@Key(group = "word") WordCountModel left, WordCountModel right) { left.setCount(left.getCount() + right.getCount()); }
@Override protected void describe() { Source<WordCountModel> splitResult = operator.split(in).wordCountResult; Source<WordCountModel> sumResult = operator.sum3(splitResult).out; out.add(sumResult); }
うん、オペレーターもジョブフローも、Foldが一番シンプルに書ける^^
前述のSummarizeを使う例では集計モデルで件数項目の合算を行う方式を採ったが、集計モデルではそもそもレコード数をカウントすることが出来る。
これを使うと、以下のようになる。
"テキストファイルの一行" @directio.csv line_model = { "テキスト" text : TEXT; }; "単語" word_model = { "発生した単語" word : TEXT; }; "単語数" @directio.csv summarized word_count_model = word_model => { "単語" any word -> word; "単語数" count word -> count; } % word;
public abstract class WordCountOperator { /** * 単語 */ private final WordModel wordModel = new WordModel(); /** * 単語分割 * * @param line * テキストファイルの一行 * @param out * 単語 */ @Extract public void split(LineModel line, Result<WordModel> out) { String text = line.getTextAsString(); StringTokenizer tokenizer = new StringTokenizer(text); while (tokenizer.hasMoreTokens()) { String word = tokenizer.nextToken(); wordModel.setWordAsString(word); out.add(wordModel); } }
@Summarize public abstract WordCountModel sum(WordModel in); }
/** * 単語数カウントジョブ */ @JobFlow(name = "WordCountJob") public class WordCountJob extends FlowDescription {
/** テキストファイルの一行 */ private final In<LineModel> in; /** 単語数 */ private final Out<WordCountModel> out; /** * 単語数カウントジョブ * * @param in * テキストファイルの一行 * @param out * 単語数 */ public WordCountJob3( @Import(name = "in", description = LineFromFile.class) In<LineModel> in, @Export(name = "out", description = WordCountToFile.class) Out<WordCountModel> out ) { this.in = in; this.out = out; }
WordCountOperatorFactory operator = new WordCountOperatorFactory(); @Override public void describe() { Source<WordModel> word = operator.split(in).out; Source<WordCountModel> count = operator.sum(word).out; out.add(count); } }