Asakusa Framework0.2.5のDirect I/OでWordCountを作ってみた。
AsakusaFWはWindGate(AsakusaFW 0.2.4)でCSVファイルに対応したが、これはローカルマシン上のファイルをCSVとして扱うものだった。
Direct I/O(AsakusaFW 0.2.5)はHDFS(やAWS(Amazon Web Services)のS3)のファイルを直接CSVとして扱うことが出来る。
そこで、Direct I/Oを利用してテキストファイルを対象としたWordCountを作ってみた。
ただし、カンマは項目区切りとして認識されてしまうので、カンマが入ったファイルは対象外とする。
基本的な作り方はWindGate版WordCountと変わらないが、DMDLの指定の仕方とInputDescription/OutputDescription(Importer/Exporterから名前が変わった)が違う。
一番最初に、Eclipseで扱えるWordCount用プロジェクトを作成する。
> mvn archetype:generate -DarchetypeCatalog=http://asakusafw.s3.amazonaws.com/maven/archetype-catalog.xml
Choose archetype: |
4 | Direct I/Oを選択。 |
Choose com.asakusafw:asakusa-archetype-directio
version: |
2 | AsakusaFW 0.2.5を使用。 |
Define value for property 'groupId': : |
dio-wordcount | |
Define value for property 'artifactId': : |
dio-wordcount | プロジェクトのディレクトリー名。 |
Define value for property 'version':
1.0-SNAPSHOT: : |
1.0 | 今回作る自分のプロジェクトのバージョン。 どんな値でもいいが、ファイル名の一部に使われる。 |
Define value for property 'package':
afw-wordcount:
: |
dio.wordcount | パッケージ名。 |
最初に、入力データ・出力データを表すクラス(モデル)を作成する。
今回はテキストファイルを入力とするが、カンマは無いものとして、1項目だけのCSVファイルという扱いにする。
出力は単語と個数というCSVファイルとする。
中間のデータモデルとして、1行のテキストを分割した単語のみを表すデータモデルも用意する。
モデルはdmdlというファイルに定義を書き、モデルジェネレーターによってJavaソースに変換される。
入力ファイル・出力ファイルの一行分のデータ構造をDMDL(Data Model Definition Language)という言語で記述する。
このDMDLファイル内にはまずモデル名を書くが、これがクラス名に変換される。
ダブルクォーテーションで囲んだ部分が、JavaソースのJavadocコメントになる。
ひとつのDMDLファイルの中に複数のモデルを記述することが出来る。
モデル名や項目名は、Javaソースに変換する都合で(だと思う)英小文字と数字とアンダーバーだけが使える。(→名前の形式)
参考: Asakusa FrameworkのDMDLユーザーガイド
インストール時にmodels.dmdlというサンプルのファイルが作られているので、その中を書き換える。
"入力ファイル" @directio.csv text_model = { "テキスト" text : TEXT; }; "単語" word_model = { "単語" word : TEXT; }; "単語数ファイル" @directio.csv summarized word_count_model = word_model => { "単語" any word -> word; "件数" count word -> count; } % word;
Direct I/OでCSVファイルを扱う場合、(Javaのアノテーションのような)@directio.csvという印を付ける。
(WindGate版で@windgate.csvを付けるのと同様)
これにより、データモデルクラスの他にImporter/Exporterクラス・CsvSupportクラス等が生成される。
summarizedは、集計を行うモデルを表す。
このデータモデルは「summarized モデル名 = 集計元モデル名 => { 定義 } % 集計キー項目名,…;
」という形式で書く。
各項目の定義は「集計方法 集計元項目名 -> 集計結果項目名;
」という書式で記述する。
(AsakusaFW 0.2.1の頃はcountにバグがあって、集計元項目にキー項目を指定すると変な集計のされ方をしたが、0.2.3以降では大丈夫)
DMDLファイルを修正したら、モデルクラスの生成を行う。
コンソールからMavenのコマンドを実行する。
$ mvn generate-resources
生成したら、EclipseでF5を押してソースを反映させるのを忘れずに。
※この時点ではOperatorクラス等は初期サンプルの状態なので、初期のモデルクラスが無くなってコンパイルエラーになる。
次に、オペレーター(演算子)を作成する。
オペレーターは、MapReduceのMapper#map()やReducer#reduce()に相当する部分。
オペレーター用のクラスを用意し、その中にメソッドを定義して処理を記述していく。(このメソッドは、実行時に実際に呼ばれる)
メソッドには、どのような処理を行うかに応じて“Asakusa Frameworkで用意されているアノテーション”を付ける必要がある。
そして、アノテーションの種類(というか処理の種類)によって、メソッドの書き方(引数や戻り値など)が変わってくる。
参考: Asakusa Frameworkの演算子リファレンス
まず、初期サンプルのオペレーター関連クラスを消しておく。
オペレーターはJavaの抽象クラス内に定義する決まりになっているので、抽象クラスを作成する。
package dio.wordcount.operator; import java.util.StringTokenizer; import dio.wordcount.modelgen.dmdl.model.TextModel; import dio.wordcount.modelgen.dmdl.model.WordCountModel; import dio.wordcount.modelgen.dmdl.model.WordModel; import com.asakusafw.runtime.core.Result; import com.asakusafw.vocabulary.flow.processor.PartialAggregation; import com.asakusafw.vocabulary.operator.Extract; import com.asakusafw.vocabulary.operator.Summarize;
public abstract class WordCountOperator {
テキストを単語に分割するには、Extract(抽出演算子)を使う。
WordModel wm = new WordModel(); @Extract public void split(TextModel line, Result<WordModel> out) { String text = line.getTextAsString(); StringTokenizer tokenizer = new StringTokenizer(text); while (tokenizer.hasMoreTokens()) { String word = tokenizer.nextToken(); wm.setWordAsString(word); out.add(wm); } }
Operatorクラスでは、メソッド名がジョブフローで使えるメソッド名(AsakusaDSL的には“語彙”と呼ばれる)になる。
Resultの変数名は、ジョブフローでの出力フィールド名になる。
(デフォルトで用意されている演算子(core演算子)では、出力が1つの場合はoutという名前になっている事が多いようなので、それを踏襲した)
DMDL上でTEXTとして宣言した項目は、Javaのクラス上はHadoopのTextクラスになっている。
そしてセッター・ゲッターには「asString」という接尾辞が付いたメソッドが用意されているので、Stringクラスで読み書きできる。
DMDLでsummarizedを使った場合、ジョブフローで使えるメソッド名(語彙)を決める為に(だと思う)、OperatorクラスにSummarize演算子(メソッド名と入出力のデータモデルクラス)を記述する。
@Summarize(partialAggregation = PartialAggregation.PARTIAL) public abstract WordCountModel count(WordModel wm);
集計方法自体はDMDLの方に書かれているので、Operatorのメソッドとしては特に記述するものは無い。
したがって抽象メソッドになっている。
SummarizeにPARTIALを指定すると、部分集約(中間集計)が行われる(つまりCombinerが動く)ようになる。
Operatorのソースを記述して保存すると、EclipseのAsakusaFWプラグイン(Ashigelコンパイラー)が自動的に起動して、FactoryクラスやOperatorImplクラスを生成する。
(DMDLと違って、何らかのコマンドを実行する必要は無い)
もしエラーがあったら、通常のEclipseでのコンパイルエラーと同様にソース上に赤い印が出る。
オペレーター(演算子)はHadoopでの実行時に普通に呼び出されるクラスなので、単体テストは普通にJUnit4で行う。
ただしOperatorは抽象クラスなので、直接インスタンス化することが出来ない。
Operatorクラスを保存(コンパイル)した際に自動生成されるOperatorImplというクラスを使用する。
参考: Asakusa Frameworkのアプリケーションテストスタートガイド
package dio.wordcount.operator; import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; import java.util.ArrayList; import java.util.List; import org.junit.Test; import com.asakusafw.runtime.testing.MockResult; import dio.wordcount.modelgen.dmdl.model.TextModel; import dio.wordcount.modelgen.dmdl.model.WordModel;
public class WordCountOperatorTest {
Extractのメソッドの入力データは単なるListだからいいとして、出力データを入れるResultにはMockResultを使う。
ただしデフォルトではadd()で追加されたインスタンスをそのまま保持する為、自分でコピーを作って保持するように修正する必要がある。
@Test public void split() { WordCountOperator operator = new WordCountOperatorImpl(); // 入力データ List<TextModel> list = new ArrayList<TextModel>(); list.add(createText("word1")); list.add(createText("")); list.add(createText("word3 blank")); list.add(createText("word4 blank")); list.add(createText("word5\ttab")); list.add(createText("word6\t tab-blank")); list.add(createText("word7\ttab blank")); // 出力を保持するクラス MockResult<WordModel> result = new MockResult<WordModel>() { @Override protected WordModel bless(WordModel result) { WordModel clone = new WordModel(); clone.copyFrom(result); return clone; } }; for (TextModel tm : list) { operator.split(tm, result); //テスト実行 } // 結果確認 List<WordModel> r = result.getResults(); String[] expected = { "word1", "word3", "blank", "word4", "blank", "word5", "tab", "word6", "tab-blank", "word7", "tab", "blank" }; assertThat(r.size(), is(expected.length)); int i = 0; for (WordModel wc : r) { assertThat(wc.getWordAsString(), is(expected[i++])); } } TextModel createText(String text) { TextModel tm = new TextModel(); tm.setTextAsString(text); return tm; } }
countメソッドは抽象メソッドであり、実体はAsakusaFWが生成するので、単体テストの対象外。
OperatorImplでのcountメソッドの実装は、例外が発生するようになっている(使えないようになっている)。
次に、ジョブフローを定義する。
ここで「どのファイルを入力とし、どのオペレーターを呼んで、どのファイルへ出力するか」を記述する。
参考: Asakusa FrameworkのAsakusa DSLスタートガイド#データフローを記述する
まず、初期サンプルのジョブフロー関連クラスを消しておく。
まずは入力ファイルを定義する。
@directio.csvを付けたデータモデルでは モデル名に応じたInputDescriptionクラス(以前はImporterという名前だった)が自動生成されるので、それを使う。
プログラマーはプロファイル名と入力ファイル名だけ記述する。
package dio.wordcount.jobflow; import dio.wordcount.modelgen.dmdl.csv.AbstractTextModelCsvInputDescription;
public class TextFromCsv extends AbstractTextModelCsvInputDescription { @Override public String getBasePath() { return "wordcount/dio/input"; } @Override public String getResourcePattern() { return "hello.txt"; } @Override public DataSize getDataSize() { return DataSize.LARGE; } }
自動生成されたAbstractTextModelCsvInputDescriptionの一番上の親クラスは、DirectFileInputDescription。
Direct I/Oを使ったInputDescriptionは、全てこのクラスを継承している。
getBasePath()にディレクトリー、getResourcePattern()にファイル名を指定する。
入力のgetResourcePattern()ではファイル名直書きだけでなく、パターン(「${バッチ引数}
」やワイルドカード「*
」「**
」、「{|}
」による組み合わせ)も指定できる。(→入力ファイル名のパターン)
→Direct
I/Oのベースパスとリソースパターンの使い分け
getDataSize()はデフォルトではDataSize.UNKOWNになっているので、(最適化に使われると思うので)何かしら指定した方がいいと思う。
次いで出力ファイル。これも同様にOutputDescriptionクラス(旧Exporter)を記述する。
package dio.wordcount.jobflow; import java.util.Arrays; import java.util.List; import dio.wordcount.modelgen.dmdl.csv.AbstractWordCountModelCsvOutputDescription;
public class WordCountToCsv extends AbstractWordCountModelCsvOutputDescription { @Override public String getBasePath() { return "wordcount/dio/output"; } @Override public String getResourcePattern() { return "wordcount.csv"; } @Override public List<String> getOrder() { return Arrays.asList("-count", "+word"); } }
出力のgetResourcePattern()でもパターンが指定できるが、指定できる内容は入力とは若干異なる。(→出力ファイル名のパターン)
→Direct
I/Oのベースパスとリソースパターンの使い分け
なお、入力と出力で同じディレクトリー(ベースパス)を指定することは出来ない。(テスト実行時に「出力が別の入力を上書きします」というエラーになる)
Direct I/Oでは、getOrder()で出力ファイルのソートが指定できる。
項目名のリストを返すようなっている。項目名の先頭に「+」を付けると昇順、「-」を付けると降順。
(ドキュメントには書かれていないが、"count desc"
や"word asc"
といった書き方も可能)
Arrays.asList()
は可変長引数なので、カンマ区切りで項目を並べられる。
getOrder()でnullを返すとエラーになる。
InputDescription/OutputDescriptionではgetBasePath()でディレクトリーを指定する。
これが具体的にどこを指すかは、$ASAKUSA_HOME/core/conf/asakusa-resources.xml内の記述に依存する。
→Direct I/Oのベースパスと実際のパス
<property>
<name>com.asakusafw.directio.root.fs.path</name>
<value>target/testing/directio</value>
</property>
Hadoopの単独環境(スタンドアローン)では、ユーザーのホームディレクトリー($HOME)にこのパスを加え、さらにgetBasePath()で指定した場所になる。
例えばユーザーが「asakusa025」でgetBasePath()が「wordcount/dio」の場合、実際の場所は「/home/asakusa025/target/testing/directio/wordcount/dio/」となる。
InputDescription/OutputDescriptionを用意したら、入力→処理→出力のつながり(順序・フロー)を記述する。
package dio.wordcount.jobflow; import dio.wordcount.modelgen.dmdl.model.TextModel; import dio.wordcount.modelgen.dmdl.model.WordCountModel; import dio.wordcount.operator.WordCountOperatorFactory; import dio.wordcount.operator.WordCountOperatorFactory.Count; import dio.wordcount.operator.WordCountOperatorFactory.Split; import com.asakusafw.vocabulary.flow.Export; import com.asakusafw.vocabulary.flow.FlowDescription; import com.asakusafw.vocabulary.flow.Import; import com.asakusafw.vocabulary.flow.In; import com.asakusafw.vocabulary.flow.JobFlow; import com.asakusafw.vocabulary.flow.Out;
JobFlowというアノテーションを付け、ジョブ名を指定する。
クラス自体はFlowDescriptionを継承する。
@JobFlow(name = "WordCountJob") public class WordCountJob extends FlowDescription {
ジョブフローでは、コンストラクターで入力データと出力データを受け取る。
それをフィールドで保持しておき、使用する。
private In<TextModel> text; private Out<WordCountModel> wordCount; public WordCountJob( @Import(name = "text" , description = TextFromCsv.class ) In<TextModel> text, @Export(name = "wordCount", description = WordCountToCsv.class) Out<WordCountModel> wordCount) { this.text = text; this.wordCount = wordCount; }
FlowDescriptionのdescribe()メソッドをオーバーライドし、フローを記述する。
@Override protected void describe() { WordCountOperatorFactory operators = new WordCountOperatorFactory(); Split split = operators.split(text); Count count = operators.count(split.out); wordCount.add(count.out); } }
operators.split()やoperators.count()が、自分で作ったオペレーター(演算子)を表している。
split.out
の「out」は、Extractを使って自分で記述したメソッドの引数のResultの変数名。
→ジョブフローのグラフ化(可視化)
AsakusaFWでは、ジョブフロー(とFlowPartとバッチ)のテスト用クラスが用意されている。
それらを使ってJUnit4として実行できる。(実際にHadoopを起動して実行される)
それと、DMDLをコンパイルした際に(Javaソースの他に)Excelファイルが生成される。
このExcelファイルを所定の場所にコピーして、テスト用の入力データと出力結果(検証用)データを記入する。
参考: Asakusa Frameworkのアプリケーションテストスタートガイド#ジョブフローのテスト
package dio.wordcount.jobflow; import org.junit.Test; import dio.wordcount.modelgen.dmdl.model.TextModel; import dio.wordcount.modelgen.dmdl.model.WordCountModel; import com.asakusafw.testdriver.JobFlowTester;
public class WordCountJobTest { @Test public void describe() { String text = "text_model.xls"; String wordCount = "word_count_model.xls"; JobFlowTester tester = new JobFlowTester(getClass()); tester.input("text", TextModel.class).prepare(text + "#input"); tester.output("wordCount", WordCountModel.class).verify(wordCount + "#output", wordCount + "#rule"); tester.runTest(WordCountJob.class); } }
tester.input()で入力データを指定する。
第1引数はジョブフローで指定した入力データの名前。つまりWordCountJobのコンストラクターの「@Import(name = "text"
」。この名前を使って入力データのファイル名を取得している(一致しているものが無いと、入力ファイルを作れない)。
そのinput()メソッドの後に続いているprepare()で、入力データのExcelファイル名(text_model.xls)を指定する。
tester.output()もtester.input()と同様。
第1引数はジョブフローの出力データの名前「@Export(name = "wordCount"
」。検証時にこの名前を使ってデータを取得する(一致しているものが無いと、実行が終わった後のチェックでエラーになる)。
verify()メソッドの第1引数が出力データのExcelファイル名、第2引数がチェック用ルールのExcelファイル名。
「#input」「#output」「#rule」は、そのExcelファイル内のシート名を表している。
数値を使って「#:1」や「#:2」の様に記述することもでき、その場合は「#:1」が左から2番目のoutputシート、「#:2」が3番目のruleシートを表す。
省略した場合は「#:0」扱いで、すなわち一番左にあるinputシートを指す。
InputDescription/OutputDescriptionのgetResourcePattern()でバッチ引数を指定している場合は、テスト実行時にバッチ引数を指定する必要がある。
JobFlowTester tester = new JobFlowTester(getClass()); tester.setBatchArg("IN_FILE", "hello.txt"); tester.setBatchArg("OUT_FILE", "wordcount.csv");
ジョブフローのテストで使用するテストデータをExcelファイルで(またはJSON形式で)用意する必要がある。
参考: Asakusa FrameworkのExcelによるテストデータ定義
target/excelの下にExcelファイルが自動生成されているので、それをsrc/test/resources/dio/wordcount/jobflow/の下にコピーし、その中にデータを記述する。
A | B | C | |
1 | text | ||
2 | Hello Hadoop World | ||
3 | Hello Asakusa | ||
4 | |||
5 |
入力データはinputシートに書く。
1行目(色付きのセル)は自動で入っている。
A | B | C | |
1 | word | count | |
2 | Asakusa | 1 | |
3 | Hadoop | 1 | |
4 | Hello | 2 | |
5 | World | 1 | |
6 |
期待される出力データをoutputシートに書く。
1行目(色付きのセル)は自動で入っている。
A | B | C | D | E | |
1 | Format | EVR-1.0.0 | |||
2 | 全体の比較 | 全てのデータを検査 [Strict] | |||
3 | プロパティ | 値の比較 | NULLの比較 | コメント | |
4 | word | 検査キー [Key] | NULLなら常に失敗 [DA] | 単語 | |
5 | count | 完全一致 [=] | NULLなら常に失敗 [DA] | 件数 | |
6 |
outputシートに書かれているデータと実際のデータをどう比較してどういう状態ならテストOK(あるいはNG)とするかをruleシートに書く。
どのセルも初期値は自動で入っている。
(「コメント」の“単語”や“件数”は、DMDLに自分が記述した各項目のコメント)
今回のケースでは出力項目がwordとcountの二項目なので、それぞれどういう比較をするかを書く。(書くというか、実際はプルダウンになっているので、選択する)
※ExcelファイルをEclipse外のエディターで編集したら、Eclipse上でF5を押して反映させる(srcからclassesへコピーさせる)必要がある。
→Excelファイルを使用せず、独自のデータを入力データにする方法
ただ、Direct I/OはOutputDescriptionのgetOrder()でソート順を指定できるが、ちゃんとソートされているかどうかの確認には対応していないようだ。
最後にBatch DSLで「どのジョブフローを実行するか」を記述する。
(WordCountだとシンプルすぎて、ありがたみが無いかも^^;)
参考: Asakusa FrameworkのAsakusa DSLスタートガイド#バッチを記述する
初期サンプルのバッチ関連クラスを消しておく。
package dio.wordcount.batch; import dio.wordcount.jobflow.WordCountJob; import com.asakusafw.vocabulary.batch.Batch; import com.asakusafw.vocabulary.batch.BatchDescription;
@Batch(name = "WordCountBatch") public class WordCountBatch extends BatchDescription { @Override protected void describe() { run(WordCountJob.class).soon(); } }
実行するジョブのClassをrun()で指定し、soon()で“すぐ(前提条件が無いので最初に)実行する”という意味になる。
@Batchアノテーションで付けたバッチ名が、分散環境で実行する際に指定する名前となる。
AsakusaFW 0.2.5の初期サンプルでは、バッチのテストが入っていない。
という事は、1つしかジョブを実行しないようなシンプルなバッチでは特にテストは要らないという事だろうか。
たぶん記述方法はbatchappのバッチのテストと同じなので、今回は省略する。
分散環境で実行するには、開発環境で作ったAsakusaFWアプリをアーカイブ化して分散環境(実行するマシン)に持っていく。
<property> <name>com.asakusafw.directio.root.fs.path</name> <value>/user/hishidama</value> ←HDFS上のパス </property>
$ cd $HOME/workspace/dio-wordcount $ mvn package
$ cd $ASAKUSA_HOME/batchapps $ jar xf $HOME/Desktop/dio-wordcount-batchapps-1.0.jar $ rm -rf META-INF $ find WordCountBatch -name "*.sh" | xargs chmod u+x …AsakusaFW 0.5では不要
$ vi hello.txt $ hadoop fs -put hello.txt /user/hishidama/wordcount/dio/input/hello.txt
$ WordCountBatch/bin/experimental.sh …AsakusaFW 0.5ではexperimental.shは存在しないので、YAESSを使う または $ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh WordCountBatchバッチ引数を指定する場合は以下の様にして実行する。
$ export ASAKUSA_BATCH_ARGS='IN_FILE=hello.txt,OUT_FILE=wordcount.csv' $ WordCountBatch/bin/experimental.sh または $ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh WordCountBatch -A IN_FILE=hello.txt -A OUT_FILE=wordcount.csv
$ hadoop fs -cat /user/hishidama/wordcount/dio/output/wordcount.csv