S-JIS[2012-01-07/2012-01-17] 変更履歴

Asakusa Framework WindGateでWordCount

Asakusa Framework0.2.4のWindGateでWordCountを作ってみた。


前置き

WindGate(AsakusaFW 0.2.4)より前のAsakusaFWはテキストファイルを扱う為に自分でInputFormat等を記述する必要があったが、
WindGateはCSVファイルに対応して、記述するのが楽になった。
(WindGateはHDFS上でなくローカルのCSVファイルを扱う。)

そこで、WindGateを利用してテキストファイルを対象としたWordCountを作ってみる。
ただし、カンマは項目区切りとして認識されてしまうので、カンマが入ったファイルは対象外とする。

WindGate版のAsakusaFWはWindowsでは動かせないので、動作確認はLinux(CentOS5.7)。

基本的な作り方はbatchapp版WordCountと変わらないが、DMDLの指定の仕方とImporter/Exporterが大きく違う。
また、プロジェクト作成直後の初期サンプルを見ると変数等の命名ルールが変更になっているようなので、概ね新ルールに合わせる。


WordCountプロジェクトの作成

一番最初に、Eclipseで扱えるWordCount用プロジェクトを作成する。

  1. WindGateの開発環境を構築する。今回の設定内容は以下の通り。
    > mvn archetype:generate -DarchetypeCatalog=http://asakusafw.s3.amazonaws.com/maven/archetype-catalog.xml
    Choose archetype:
    1: http://asakusafw.s3.amazonaws.com/maven/archetype-catalog.xml -> com.asakusafw:asakusa-archetype-batchapp (-)
    2: http://asakusafw.s3.amazonaws.com/maven/archetype-catalog.xml -> com.asakusafw:asakusa-archetype-thundergate (-)
    3: http://asakusafw.s3.amazonaws.com/maven/archetype-catalog.xml -> com.asakusafw:asakusa-archetype-windgate (-)
    Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): :
    3 WindGateを選択。
    Choose version:
    1: 0.2-SNAPSHOT
    2: 0.2.2
    3: 0.2.3
    4: 0.2.4
    Choose a number: 4:
    4 AsakusaFW 0.2.4を使用。
    Define value for property 'groupId': : wgate-wordcount  
    Define value for property 'artifactId': : wgate-wordcount プロジェクトのディレクトリー名。
    Define value for property 'version': 1.0-SNAPSHOT: : 1.0 今回作る自分のプロジェクトのバージョン。
    どんな値でもいいが、ファイル名の一部に使われる。
    Define value for property 'package': afw-wordcount: : wgate.wordcount パッケージ名。

モデル

最初に、入力データ・出力データを表すクラス(モデル)を作成する。

今回はテキストファイルを入力とするが、カンマは無いものとして、1項目だけのCSVファイルという扱いにする。
出力は単語と個数というCSVファイルとする。
中間のデータモデルとして、1行のテキストを分割した単語のみを表すデータモデルも用意する。

モデルはdmdlというファイルに定義を書き、モデルジェネレーターによってJavaソースに変換される。
入力ファイル・出力ファイルの一行分のデータ構造をDMDL(Data Model Definition Language)という言語で記述する。

このDMDLファイル内にはまずモデル名を書くが、これがクラス名に変換される。
ダブルクォーテーションで囲んだ部分が、JavaソースのJavadocコメントになる。
ひとつのDMDLファイルの中に複数のモデルを記述することが出来る。
モデル名や項目名は、Javaソースに変換する都合で(だと思う)英小文字と数字とアンダーバーだけが使える。(→名前の形式

参考: Asakusa FrameworkのDMDLユーザーガイド


インストール時にmodels.dmdlというサンプルのファイルが作られているので、その中を書き換える。

wgate-wordcount/src/main/dmdl/models.dmdl:

"入力ファイル"
@windgate.csv
text_model = {
	"テキスト"
	text : TEXT;
};

"単語"
word_model = {
	"単語"
	word : TEXT;
};

"単語数ファイル"
@windgate.csv
summarized word_count_model = word_model => {
	"単語"
	any   word -> word;
	"件数"
	count word -> count;
} % word;

WindGateでCSVファイルを扱う場合、(Javaのアノテーションのような)@windgate.csvという印を付ける。
これにより、データモデルクラスの他にImporter/Exporterクラス・CsvSupportクラス等が生成される。

summarizedは、集計を行うモデルを表す。
このデータモデルは「summarized モデル名 = 集計元モデル名 => { 定義 } % 集計キー項目名,…;」という形式で書く。
各項目の定義は「集計方法 集計元項目名 -> 集計結果項目名;」という書式で記述する。
(AsakusaFW 0.2.1の頃はcountにバグがあって、集計元項目にキー項目を指定すると変な集計のされ方をしたが、0.2.3以降では大丈夫)


DMDLファイルを修正したら、モデルクラスの生成を行う。
コンソールからMavenのコマンドを実行する。

$ mvn generate-resources

生成したら、EclipseでF5を押してソースを反映させるのを忘れずに。
※この時点ではOperatorクラス等は初期サンプルの状態なので、初期のモデルクラスが無くなってコンパイルエラーになる。


オペレーター(演算子)

次に、オペレーター(演算子)を作成する。

オペレーターは、MapReduceのMapper#map()やReducer#reduce()に相当する部分。

オペレーター用のクラスを用意し、その中にメソッドを定義して処理を記述していく。(このメソッドは、実行時に実際に呼ばれる)
メソッドには、どのような処理を行うかに応じて“Asakusa Frameworkで用意されているアノテーション”を付ける必要がある。
そして、アノテーションの種類(というか処理の種類)によって、メソッドの書き方(引数や戻り値など)が変わってくる。

参考: Asakusa Frameworkの演算子リファレンス

Operatorのテスト


まず、初期サンプルのオペレーター関連クラスを消しておく。


オペレーターはJavaの抽象クラス内に定義する決まりになっているので、抽象クラスを作成する。

src/main/java/wgate/wordcount/operator/WordCountOperator.java

package wgate.wordcount.operator;

import java.util.StringTokenizer;

import wgate.wordcount.modelgen.dmdl.model.TextModel;
import wgate.wordcount.modelgen.dmdl.model.WordCountModel;
import wgate.wordcount.modelgen.dmdl.model.WordModel;

import com.asakusafw.runtime.core.Result;
import com.asakusafw.vocabulary.flow.processor.PartialAggregation;
import com.asakusafw.vocabulary.operator.Extract;
import com.asakusafw.vocabulary.operator.Summarize;
public abstract class WordCountOperator {

テキストを単語に分割するには、Extract(抽出演算子)を使う。[2012-01-17]
(最初はExtractがあるのに気付かずCoGroupを使っていたが、CoGroupはReduceフェーズで処理されExtractはMapフェーズで処理されるので、Extractの方が効率が良い)

	WordModel wm = new WordModel();

	@Extract
	public void split(TextModel line, Result<WordModel> out) {
		String text = line.getTextAsString();

		StringTokenizer tokenizer = new StringTokenizer(text);
		while (tokenizer.hasMoreTokens()) {
			String word = tokenizer.nextToken();

			wm.setWordAsString(word);

			out.add(wm);
		}
	}

Operatorクラスでは、メソッド名がジョブフローで使えるメソッド名(AsakusaDSL的には“語彙”と呼ばれる)になる。
Resultの変数名は、ジョブフローでの出力フィールド名になる。
(デフォルトで用意されている演算子(core演算子)では、出力が1つの場合はoutという名前になっている事が多いようなので、それを踏襲した)

DMDL上でTEXTとして宣言した項目は、Javaのクラス上はHadoopのTextクラスになっている。
そしてセッター・ゲッターには「asString」という接尾辞が付いたメソッドが用意されているので、Stringクラスで読み書きできる。


DMDLでsummarizedを使った場合、ジョブフローで使えるメソッド名(語彙)を決める為に(だと思う)、OperatorクラスにSummarize演算子(メソッド名と入出力のデータモデルクラス)を記述する。

	@Summarize(partialAggregation = PartialAggregation.PARTIAL)
	public abstract WordCountModel count(WordModel wm);

集計方法自体はDMDLの方に書かれているので、Operatorのメソッドとしては特に記述するものは無い。
したがって抽象メソッドになっている。

SummarizeにPARTIALを指定すると、部分集約(中間集計)が行われる(つまりCombinerが動く)ようになる。[2012-01-17]


Operatorのソースを記述して保存すると、EclipseのAsakusaFWプラグイン(Ashigelコンパイラー)が自動的に起動して、FactoryクラスやOperatorImplクラスを生成する。
(DMDLと違って、何らかのコマンドを実行する必要は無い)
もしエラーがあったら、通常のEclipseでのコンパイルエラーと同様にソース上に赤い印が出る。


オペレーター(演算子)のテスト

オペレーター(演算子)はHadoopでの実行時に普通に呼び出されるクラスなので、単体テストは普通にJUnit4で行う。

ただしOperatorは抽象クラスなので、直接インスタンス化することが出来ない。
Operatorクラスを保存(コンパイル)した際に自動生成されるOperatorImplというクラスを使用する。

参考: Asakusa Frameworkのアプリケーションテストスタートガイド


src/test/java/wgate/wordcount/operator/WordCountOperatorTest.java:

package wgate.wordcount.operator;

import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;

import java.util.ArrayList;
import java.util.List;

import org.junit.Test;

import com.asakusafw.runtime.testing.MockResult;

import wgate.wordcount.modelgen.dmdl.model.TextModel;
import wgate.wordcount.modelgen.dmdl.model.WordModel;
public class WordCountOperatorTest {

Extractのメソッドの入力データは単なるListだからいいとして、出力データを入れるResultにはMockResultを使う。
ただしデフォルトではadd()で追加されたインスタンスをそのまま保持する為、自分でコピーを作って保持するように修正する必要がある。

	@Test
	public void split() {
		WordCountOperator operator = new WordCountOperatorImpl();

		// 入力データ
		List<TextModel> list = new ArrayList<TextModel>();
		list.add(createText("word1"));
		list.add(createText(""));
		list.add(createText("word3 blank"));
		list.add(createText("word4 blank"));
		list.add(createText("word5\ttab"));
		list.add(createText("word6\t tab-blank"));
		list.add(createText("word7\ttab blank"));

		// 出力を保持するクラス
		MockResult<WordModel> result = new MockResult<WordModel>() {
			@Override
			protected WordModel bless(WordModel result) {
				WordModel clone = new WordModel();
				clone.copyFrom(result);
				return clone;
			}
		};

		for (TextModel tm : list) {
			operator.split(tm, result);	//テスト実行
		}

		// 結果確認
		List<WordModel> r = result.getResults();
		String[] expected = { "word1", "word3", "blank", "word4", "blank", "word5", "tab", "word6", "tab-blank", "word7", "tab", "blank" };
		assertThat(r.size(), is(expected.length));
		int i = 0;
		for (WordModel wc : r) {
			assertThat(wc.getWordAsString(), is(expected[i++]));
		}
	}

	TextModel createText(String text) {
		TextModel tm = new TextModel();
		tm.setTextAsString(text);
		return tm;
	}
}

countメソッドは抽象メソッドであり、実体はAsakusaFWが生成するので、単体テストの対象外。

OperatorImplでのcountメソッドの実装は、例外が発生するようになっている(使えないようになっている)。


ジョブフロー

次に、ジョブフローを定義する。
ここで「どのファイルを入力とし、どのオペレーターを呼んで、どのファイルへ出力するか」を記述する。

参考: Asakusa FrameworkのAsakusa DSLスタートガイド#データフローを記述する

ジョブフローのテスト


まず、初期サンプルのジョブフロー関連クラスを消しておく。


Importerクラス

まずは入力ファイルを定義する。
@windgate.csvを付けたデータモデルでは モデル名に応じたImporterクラスが自動生成されるので、それを使う。
プログラマーはプロファイル名と入力ファイル名だけ記述する。

src/main/java/wgate/wordcount/jobflow/TextFromCsv.java:

package wgate.wordcount.jobflow;

import wgate.wordcount.modelgen.dmdl.csv.AbstractTextModelCsvImporterDescription;
public class TextFromCsv extends AbstractTextModelCsvImporterDescription {

	@Override
	public String getProfileName() {
		return "asakusa";	//プロファイル名
	}

	@Override
	public String getPath() {
		return "input/hello.txt";	//ローカルマシン上の相対パス
	}
}

自動生成されたAbstractTextModelCsvImporterDescriptionの一番上の親クラスは、WindGateImporterDescription。
WindGateを使ったImporterは、全てこのクラスを継承している。

getPath()にはバッチ引数を指定することも出来る。

	@Override
	public String getPath() {
		return "input/${IN_FILE}";
	}

これはOperatorバッチ引数(BatchContext)を取得するのと同等。
(つまりジョブフローのテスト運用環境での実行時にバッチ引数を指定する必要がある)


Exporterクラス

次いで出力ファイル。これも同様にExporterクラスを記述する。

src/main/java/wgate/wordcount/jobflow/WordCountToCsv.java:

package wgate.wordcount.jobflow;

import wgate.wordcount.modelgen.dmdl.csv.AbstractWordCountModelCsvExporterDescription;
public class WordCountToCsv extends AbstractWordCountModelCsvExporterDescription {

	@Override
	public String getProfileName() {
		return "asakusa";	//プロファイル名
	}

	@Override
	public String getPath() {
		return "output/wordcount.csv";	//ローカルマシン上の相対パス
	}
}

プロファイル

プロファイルとは、WindGateの環境依存情報を記述しておくファイル。
例えばWindGateのImporter/Exporterでプロファイル名を「hoge」としておくと、$ASAKUSA_HOME/windgate/profile/hoge.propertiesというファイルが参照される。
プロファイル名asakusaのファイルはインストール時に用意されている。デフォルトでは中身は以下のような形。

# Local File System
resource.local=com.asakusafw.windgate.stream.file.FileResourceProvider
resource.local.basePath=/tmp/windgate-${USER}
# Hadoop File System
resource.hadoop=com.asakusafw.windgate.hadoopfs.HadoopFsProvider

@windgate.csvで生成されるCsvImporterやCsvExporterの場合、入出力ファイルはローカルマシン上のファイルとなる。
例えばCsvImporterで指定したパスが「input/hello.csv」で実行するユーザーがhdfsの場合、プロファイルのresource.local.basePathと合わせて「/tmp/windgate-hdfs/input/hello.csv」が入力ファイルのフルパスとなる。

CsvImporterは、ジョブの入り口部分でローカルファイルをHDFSに転送する。
CsvExporterは、結果出力部分でHDFS上のファイルをローカルに転送する。
その転送の際に使う具体的なクラス(プロバイダークラス)を指定するのがプロファイル。

例えば入力と出力でルートディレクトリーを変えたい場合、プロファイルを2つ用意してImporterとExporterで別々のプロファイル名を指定すればいい。
また、テスト環境と本番環境でパスを変えておくことも出来るだろう。


Flowクラス

Importer/Exporterを用意したら、入力→処理→出力のつながり(順序・フロー)を記述する。

src/main/java/wgate/wordcount/jobflow/WordCountJob.java

package wgate.wordcount.jobflow;

import wgate.wordcount.modelgen.dmdl.model.TextModel;
import wgate.wordcount.modelgen.dmdl.model.WordCountModel;
import wgate.wordcount.operator.WordCountOperatorFactory;
import wgate.wordcount.operator.WordCountOperatorFactory.Count;
import wgate.wordcount.operator.WordCountOperatorFactory.Split;

import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;

JobFlowというアノテーションを付け、ジョブ名を指定する。
クラス自体はFlowDescriptionを継承する。

@JobFlow(name = "WordCountJob")
public class WordCountJob extends FlowDescription {

ジョブフローでは、コンストラクターで入力データと出力データを受け取る。
それをフィールドで保持しておき、使用する。

	private In<TextModel> text;
	private Out<WordCountModel> wordCount;

	public WordCountJob(
			@Import(name = "text"     , description = TextFromCsv.class   ) In<TextModel> text,
			@Export(name = "wordCount", description = WordCountToCsv.class) Out<WordCountModel> wordCount) {
		this.text      = text;
		this.wordCount = wordCount;
	}

FlowDescriptionのdescribe()メソッドをオーバーライドし、フローを記述する。

	@Override
	protected void describe() {
		WordCountOperatorFactory operators = new WordCountOperatorFactory();

		Split split = operators.split(text);
		Count count = operators.count(split.out);
		wordCount.add(count.out);
	}
}

operators.split()やoperators.count()が、自分で作ったオペレーター(演算子)を表している。

split.outの「out」は、Extractを使って自分で記述したメソッドの引数のResultの変数名

ジョブフローのグラフ化(可視化)


ジョブフローのテスト

AsakusaFWでは、ジョブフロー(とFlowPartとバッチ)のテスト用クラスが用意されている。
それらを使ってJUnit4として実行できる。(実際にHadoopを起動して実行される)

それと、DMDLをコンパイルした際に(Javaソースの他に)Excelファイルが生成される。
このExcelファイルを所定の場所にコピーして、テスト用の入力データと出力結果(検証用)データを記入する。

参考: Asakusa Frameworkのアプリケーションテストスタートガイド#ジョブフローのテスト


src/test/java/wgate/wordcount/jobflow/WordCountJobTest.java

package wgate.wordcount.jobflow;

import org.junit.Test;

import wgate.wordcount.modelgen.dmdl.model.TextModel;
import wgate.wordcount.modelgen.dmdl.model.WordCountModel;

import com.asakusafw.testdriver.JobFlowTester;
public class WordCountJobTest {

	@Test
	public void describe() {
		String text      = "text_model.xls";
		String wordCount = "word_count_model.xls";

		JobFlowTester tester = new JobFlowTester(getClass());

		tester.input("text", TextModel.class).prepare(text + "#input");
		tester.output("wordCount", WordCountModel.class).verify(wordCount + "#output", wordCount + "#rule");

		tester.runTest(WordCountJob.class);
	}
}

tester.input()で入力データを指定する。
第1引数はジョブフローで指定した入力データの名前。つまりWordCountJobのコンストラクターの「@Import(name = "text"」。この名前を使って入力データのファイル名を取得している(一致しているものが無いと、入力ファイルを作れない) 。
そのinput()メソッドの後に続いているprepare()で、入力データのExcelファイル名(text_model.xml)を指定する。

tester.output()もtester.input()と同様。
第1引数はジョブフローの出力データの名前「@Export(name = "wordCount"」。検証時にこの名前を使ってデータを取得する(一致しているものが無いと、実行が終わった後のチェックでエラーになる) 。
verify()メソッドの第1引数が出力データのExcelファイル名、第2引数がチェック用ルールのExcelファイル名

「#input」「#output」「#rule」は、そのExcelファイル内のシート名を表している。
数値を使って 「#:1」や「#:2」の様に記述することもでき、その場合は「#:1」が左から2番目のoutputシート、「#:2」が3番目のruleシートを表す。
省略した場合は「#:0」扱いで、すなわち一番左にあるinputシートを指す。

Importer/ExporterのgetPath()でバッチ引数を指定している場合は、テスト実行時にバッチ引数を指定する必要がある。

		JobFlowTester tester = new JobFlowTester(getClass());
		tester.setBatchArg("IN_FILE",  "hello.txt");
		tester.setBatchArg("OUT_FILE", "result.csv");

ジョブフローのテストで使用するテストデータをExcelファイルで(またはJSON形式で)用意する必要がある。

参考: Asakusa FrameworkのExcelによるテストデータ定義

target/excelの下にExcelファイルが自動生成されているので、それをsrc/test/resources/wgate/wordcount/jobflow/の下にコピーし、その中にデータを記述する。

src/test/resources/sample/jobflow/text_model.xlsのinputシート:

  A B C
1 text    
2 Hello Hadoop World    
3 Hello Asakusa    
4      
5      

入力データはinputシートに書く。
1行目(色付きのセル)は自動で入っている。

src/test/resources/sample/jobflow/word_count_model.xlsのoutputシート:

  A B C
1 word count  
2 Asakusa 1  
3 Hadoop 1  
4 Hello 2  
5 World 1  
6      

期待される出力データをoutputシートに書く。
1行目(色付きのセル)は自動で入っている。

src/test/resources/sample/jobflow/word_count_model.xlsのruleシート:

  A B C D E
1 Format EVR-1.0.0      
2 全体の比較 全てのデータを検査 [Strict]      
3 プロパティ 値の比較 NULLの比較 コメント  
4 word 検査キー [Key] NULLなら常に失敗 [DA] 単語  
5 count 完全一致 [=] NULLなら常に失敗 [DA] 件数  
6          

outputシートに書かれているデータと実際のデータをどう比較してどういう状態ならテストOK(あるいはNG)とするかをruleシートに書く。
どのセルも初期値は自動で入っている。
(「コメント」の“単語”や“件数”は、DMDLに自分が記述した各項目のコメント)

今回のケースでは出力項目がwordとcountの二項目なので、それぞれどういう比較をするかを書く。(書くというか、実際はプルダウンになっているので、選択する)

※ExcelファイルをEclipse外のエディターで編集したら、Eclipse上でF5を押して反映させる(srcからclassesへコピーさせる)必要がある。

Excelファイルを使用せず、独自のデータを入力データにする方法


バッチ

最後にBatch DSLで「どのジョブフローを実行するか」を記述する。
(WordCountだとシンプルすぎて、ありがたみが無いかも^^;)

参考: Asakusa FrameworkのAsakusa DSLスタートガイド#バッチを記述する


初期サンプルのバッチ関連クラスを消しておく。


src/main/java/sample/batch/WordCountBatch.java:

package wgate.wordcount.batch;

import wgate.wordcount.jobflow.WordCountJob;

import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
@Batch(name = "WordCountBatch")
public class WordCountBatch extends BatchDescription {

	@Override
	protected void describe() {
		run(WordCountJob.class).soon();
	}
}

実行するジョブのClassをrun()で指定し、soon()で“すぐ(前提条件が無いので最初に)実行する”という意味になる。

@Batchアノテーションで付けたバッチ名が、分散環境で実行する際に指定する名前となる。


バッチのテスト

AsakusaFW 0.2.4の初期サンプルでは、バッチのテストが入っていない。
という事は、1つしかジョブを実行しないようなシンプルなバッチでは特にテストは要らないという事だろうか。

たぶん記述方法はbatchappのバッチのテストと同じなので、今回は省略する。


分散環境での実行

分散環境で実行するには、開発環境で作ったAsakusaFWアプリをアーカイブ化して分散環境(実行するマシン)に持っていく。

  1. アーカイブを作成する。
    $ cd $HOME/workspace/wgate-wordcount
    $ mvn package
  2. アーカイブ(target/wgate-wordcount-batchapps-1.0.jar)を起動するマシンへ転送し、解凍する。
    $ cd $ASAKUSA_HOME/batchapps
    $ jar xf $HOME/Desktop/wgate-wordcount-batchapps-1.0.jar
    $ rm -rf META-INF
    $ find WordCountBatch -name "*.sh" | xargs chmod u+x
  3. 起動するマシン上でテストデータを作成する。
    $ mkdir -p /tmp/windgate-$USER/input
    $ vi /tmp/windgate-$USER/input/hello.txt
  4. 実行する。
    $ WordCountBatch/bin/experimental.sh
    または
    $ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh WordCountBatch
    バッチ引数を指定する場合は以下の様にして実行する。
    $ export ASAKUSA_BATCH_ARGS='IN_FILE=hello.txt,OUT_FILE=wordcount.csv'
    $ WordCountBatch/bin/experimental.sh
    または
    $ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh WordCountBatch -A IN_FILE=hello.txt -A OUT_FILE=wordcount.csv
  5. 結果を確認する。
    $ cat /tmp/windgate-$USER/output/wordcount.csv

WindGateへ戻る / AsakusaFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま