S-JIS[2011-07-25/2012-01-17] 変更履歴

AsakusaFWでWordCount

Asakusa Framework0.2.1(batchapp)でWordCountを作ってみる。


前提

marblejenkaさんのexample-wordcountを参考に、Asakusa Framework0.2.1でWordCountを作ってみた。
(→AsakusaFW 0.2.4 WindGate版WordCount

WindowsXPにAsakusaFWをインストールして、Eclipse3.6上でコーディングしている。
単体テスト(JUnit)でジョブ(バッチ)を実行するところまで出来た。

通常のWordCountであればベタのテキストを入力とするところだが、AsakusaFWで扱えるファイルはデフォルトではSequenceFileのみ。出力ファイルも同じ。
ただし単体テストでは入力・出力ファイルも自動で作ってくれるので、あまり気にする必要は無い。


WordCountプロジェクトの作成

一番最初に、Eclipseで扱えるWordCount用プロジェクトを作成する。

コマンドプロンプトから以下のコマンドを実行する。

  1. Eclipseで使用するワークスペースへ移動する。
    (このディレクトリーの下にAsakusaFW用のディレクトリーを作成する。別の場所でもいいけど、最終的にワークスペースにインポートする際は同じ場所の方が(コピーが無い分だけ)楽だと思う)
    > cd /d C:\workspace
  2. AsakusaFWアーキタイプ(テンプレート)のプロジェクトを作成する。
    > mvn archetype:generate -DarchetypeCatalog=http://asakusafw.s3.amazonaws.com/maven/archetype-catalog.xml
    Choose archetype:
    1: http://asakusafw.s3.amazonaws.com/maven/archetype-catalog.xml -> asakusa-archetype-batchapp (-)
    1  
    Choose version:
    1: 0.1.0
    2: 0.2-SNAPSHOT
    3: 0.2.0
    4: 0.2.1
    5: 0.3-SNAPSHOT
    4 AsakusaFW 0.2.1を使用。
    Define value for property 'groupId': : afw-wordcount  
    Define value for property 'artifactId': : afw-wordcount プロジェクトのディレクトリー名。
    Define value for property 'version': 1.0-SNAPSHOT: : 0.1 今回作る自分のプロジェクトのバージョン。
    どんな値でもいいが、ファイル名の一部に使われる。
    Define value for property 'package': afw-wordcount: : sample パッケージ名。
    Confirm properties configuration:
    groupId: example
    artifactId: batchapp
    version: 1.0
    package: example
    Y: :
    y 上で入力した値の確認。
    これで良ければYを入れる。
    nとか入れると再度groupIdから入力し直せる。
    これで、C:\wordspace\afw-wordcountというディレクトリーが作成される。
  3. 不要ファイル(サンプルとして最初から入っているソース類)を削除する。
    > cd afw-wordcount
    > del /s/q *.java
    > del /s ex*.sql
    > del /s *.xls
  4. MySQLを使わないでモデルを作成する設定」に変更する。
    afw-wordcount\build.propertiesのasakusa.database.enabledをfalseにする。
    # The option whether to use a database for Model Generator and ThunderGate (true|false). 
    asakusa.database.enabled=false
  5. モデルが1個も無いとコンパイルが失敗するので、ダミーのモデルを作っておく。
    > echo dummy={dummy:TEXT;}; > src\main\dmdl\dummy.dmdl
  6. Eclipse用の設定ファイル生成を行う。(ここでコンパイルが行われる)
    > mvn eclipse:eclipse
    これにより、.projectとか.classpathとか、Eclipseのプロジェクトに必要なファイルが生成される。
  7. 作成したプロジェクト(afw-wordcount)をEclipseにインポートする。
  8. Eclipseのプロジェクトの設定で、ファイルのエンコードをUTF-8にしておく。
  9. ASAKUSA_HOME配下のHadoop起動用のブリッジシェルがクリアされていたら、再度用意する(afw-initのBridgeShellを実行する)。

モデル

最初に、入力データ・出力データを表すクラス(モデル)を作成する。

モデルはdmdlというファイルに定義を書き、モデルジェネレーターによってJavaソースに変換される。
入力ファイル・出力ファイルの一行分のデータ構造をDMDL(Data Model Definition Language)という言語で記述する。(JSONぽい感じ。見ればすぐ分かる)

このdmdlファイル内にはまずモデル名を書くが、これがクラス名に変換される。ファイル名と異なっていてもよい。
ダブルクォーテーションで囲んだ部分が、JavaソースのJavadocコメントになる。

参考: Asakusa FrameworkのDMDLユーザーガイド


WordCountであれば、入力は単なる文字列(text)、出力は単語(word)と出現数(count)。

afw-wordcount/src/main/dmdl/line.dmdl:

"テキストファイルの一行に対応するエンティティ"
line_model = {
  "テキスト"
  text : TEXT;
};

afw-wordcount/src/main/dmdl/word_count.dmdl

"単語と発生回数の対を表現するエンティティ"
word_count_model = {
  "発生した単語"
  word : TEXT;

  "発生回数"
  count : INT;
};

※最初に作ったdummy.dmdlは もう要らないので削除しておこう。もしくは、dummy.dmdlを改名して使う。


コマンドプロンプトからコンパイルを行い、モデルクラス(Javaソース)が生成されればOK。

> mvn compile

> dir /s/b *.java | findstr dmdl
C:\workspace\afw-wordcount\target\generated-sources\modelgen\sample\modelgen\dmdl\io\LineModelInput.java
C:\workspace\afw-wordcount\target\generated-sources\modelgen\sample\modelgen\dmdl\io\LineModelOutput.java
C:\workspace\afw-wordcount\target\generated-sources\modelgen\sample\modelgen\dmdl\io\WordCountModelInput.java
C:\workspace\afw-wordcount\target\generated-sources\modelgen\sample\modelgen\dmdl\io\WordCountModelOutput.java
C:\workspace\afw-wordcount\target\generated-sources\modelgen\sample\modelgen\dmdl\model\LineModel.java
C:\workspace\afw-wordcount\target\generated-sources\modelgen\sample\modelgen\dmdl\model\WordCountModel.java

もしくは「mvn generate-resources」でも良い。

M2Eclipseから「generate-resources」を実行するのも便利。


オペレーター

次に、オペレーター(演算子)を作成する。

オペレーターは、MapReduceのMapper#map()やReducer#reduce()に相当する部分。
つまりデータ変換や集計といった実際の処理を記述する。

オペレーター用のクラスを用意し、その中にメソッドを定義して処理を記述していく。(このメソッドは、実行時に実際に呼ばれる)
メソッドには、どのような処理を行うかに応じて“Asakusa Frameworkで用意されているアノテーション”を付ける必要がある。
そして、アノテーションの種類(というか処理の種類)によって、メソッドの書き方(引数や戻り値など)が変わってくる。
アノテーション一覧・Asakusa Frameworkの演算子リファレンス


オペレーターはJavaの抽象クラス内に定義する決まりになっているので、最初に空のクラスを作る。

afw-wordcount/src/main/java/sample/operator/WordCountOperator.java

package sample.operator;

public abstract class WordCountOperator {
}

MapReduceのMapperやReducerではクラスの中に1メソッド(map()やreduce())しか書かないが、
AsakusaFWのOperatorクラスは、その1つのクラスの中に複数のメソッド(テキスト分割のメソッド集計のメソッド)を記述する。


テキストの分割処理

まず、入力データを単語ごとに分割する処理を考える。

入力データは一種類で、出力は入力1件(テキスト1行)に対して複数件(複数の単語)になる。
このように複数件の出力を行うには、メソッドの出力部分にResultというクラスが指定できる演算でなければならないようだ。(Result#add()を何度も呼び出してデータを出力する為)
各アノテーションのサンプルを見てみると、Resultが指定できそうなのはCoGroupGroupSortSplit。 (Extractが抜けていたorz[2012-01-17]
Splitは名前からして今回の分割に使えそうかと思ったが、結合されたデータを分割するものであって、内部の処理を自由には記述できないので、却下。
GroupSortも入力が複数件になるようだし、そもそも今の時点でソートは不要(冗長な処理になる)なので、却下。
CoGroupは複数種類のデータを入力としてキーで結合し、複数種類・複数件のデータを出力するもののようだ。しかし入力が一種類でも使えるようなので、これを採用する。

CoGroupでは複数種類のデータが入ってくることを想定している為、結合キーとなる項目を指定する必要があるようだ。
今回はベタのテキストが入ってくるだけだし一種類のデータしかないので、キーには特に何も指定しない。

import java.util.List;
import java.util.StringTokenizer;

import sample.modelgen.dmdl.model.LineModel;
import sample.modelgen.dmdl.model.WordCountModel;

import com.asakusafw.runtime.core.Result;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.CoGroup;
	@CoGroup
	public void split(@Key(group = {}) List<LineModel> lineList,
			Result<WordCountModel> wordCountResult) {

		for (LineModel line : lineList) {
			String text = line.getTextAsString();

			StringTokenizer tokenizer = new StringTokenizer(text);
			while (tokenizer.hasMoreTokens()) {
				String word = tokenizer.nextToken();

				WordCountModel wc = new WordCountModel();
				wc.setWordAsString(word);
				wc.setCount(1);

				wordCountResult.add(wc);
			}
		}
	}

MapReduceのMapper#map()だと入力の1行につき1回ずつ呼ばれるのだが、CoGroupでは(DMDLで定義したクラスの)Listとして渡ってくるようだ。

DMDL上でTEXTとして宣言した項目は、Javaのクラス上はHadoopのTextクラスになっていて、
しかもセッター・ゲッターには「asString」という接尾辞が付いたメソッドが用意されているので、Stringクラスで読み書きできる。

出力は、MapReduceのcontext.write()の代わりにResultにaddしていく。
(ここで付けたResultの引数名は、後でジョブフローを定義する際に使われる)


なお、ここで下記の様な中途半端な状態(コーディング途中)にすると、コンパイルエラーが発生する。(プロジェクトを自動的にビルドする設定になっている場合)

import com.asakusafw.vocabulary.operator.CoGroup;

public class WordCountOperator {
	@CoGroup
	public void split() {}
}

エラー位置はクラス名部分、エラーメッセージは「演算子クラスsample.operator.WordCountOperatorはabstractとして宣言する必要があります」。
他にもCoGroupアノテーションの場合はメソッドの引数にListが要るとかResultが要るとか、エラーメッセージを出してくれる。

このエラーは、AsakusaFW(Ashigelコンパイラー)が出しているもの。
AsakusaFWのインストール(Eclipseの設定)をすると、Asakusa DSLの文法に沿っているかどうかをチェックするプラグインが入る。すごい!


単語数の集計処理

次に単語数の集計処理。

集計のアノテーションはSummarizeかなぁと思いつつも、これはモデルクラス内の各項目全ての合算をするように見える。
そこで、ひとまずmarblejenkaさんの例に倣ってCoGroupを使うことにする。
(→Summarizeを使った集計処理Foldを使った集計処理

入力リストのキーに単語項目を指定すれば、単語ごとにメソッドが呼ばれるようになるみたいだ。

import org.apache.hadoop.io.Text;
	@CoGroup
	public void sum(@Key(group = { "word" }) List<WordCountModel> wordCountList,
			Result<WordCountModel> wordCountResult) {

		Text word = wordCountList.get(0).getWord();

		int count = 0;
		for (WordCountModel wc : wordCountList) {
			count += wc.getCount();
		}

		WordCountModel wc = new WordCountModel();
		wc.setWord(word);
		wc.setCount(count);

		wordCountResult.add(wc);
	}

オペレータークラスが書けたところで、コマンドプロンプトからコンパイルしてやる。(Eclipseがプロジェクトを自動的にビルドする設定になっていない場合)
すると、オペレーターファクトリークラスが生成される。
ジョブフローを記述する際に使うのはファクトリークラス)

> mvn compile

> dir/s/b *factory.java
C:\workspace\afw-wordcount\target\generated-sources\annotations\sample\operator\WordCountOperatorFactory.java

Eclipseがプロジェクトを自動的にビルドする設定になっている場合は、自動的に生成される。
なお、Operatorのソースを変更(して保存)する度に自動的にOperatorFactory(やOperatorImpl)も更新されるが、
Operatorを削除した場合はOperatorFactory(とOperatorImpl)は削除されず、(Operatorクラスが見つからなくて)コンパイルエラーになる。

オペレーター(演算子)の単体テスト方法


ジョブフロー

次に、ジョブフローを定義する。
ここで「どのファイルを入力とし、どのオペレーターを呼んで、どのファイルへ出力するか」を記述する。

参考: Asakusa FrameworkのAsakusa DSLスタートガイド#データフローを記述する


まずは入力ファイルを定義する。ファイル名とそのデータの型(モデルクラス)を記述する。

afw-wordcount/src/main/java/sample/jobflow/gateway/LineFromFile.java:

package sample.jobflow.gateway;

import java.util.HashSet;
import java.util.Set;

import com.asakusafw.vocabulary.external.FileImporterDescription;

import sample.modelgen.dmdl.model.LineModel;
public class LineFromFile extends FileImporterDescription {

	@Override
	public Set<String> getPaths() {
		Set<String> set = new HashSet<String>();
//		set.add("file:/C:/cygwin/home/hadoop/wordcount/input/file01");
		set.add("input/file01");
		return set;
	}

	@Override
	public Class<?> getModelType() {
		return LineModel.class;
	}
}

FileImporterDescriptionというクラスを継承し、メソッドをオーバーライドするだけ。通常のJavaと全く同じコーディング。

ファイルを絶対パスで指定する場合は「file:/」で始める。たぶん「hdfs:」で指定することも出来るだろう。
スキーマを付けずに指定した場合、Hadoopのデフォルトの場所(単独環境ならユーザーのホームディレクトリー(Windowsだとuser.homeで指定した場所))からの相対パスになる模様。
「input/file01」は、inputというディレクトリーの下のfile01という名前のファイルを指す。
ジョブフロー・バッチの単体テストを実行すると、このファイルが自動的に作られる(上書きされる))

なお、作成されるファイルはSequenceFileとなる。
テキストファイルを扱う方法


次いで出力ファイル。これも同様にモデルクラスとファイル名(の一部)を記述する。

afw-wordcount/src/main/java/sample/jobflow/gateway/WordCountToFile.java:

package sample.jobflow.gateway;

import com.asakusafw.vocabulary.external.FileExporterDescription;

import sample.modelgen.dmdl.model.WordCountModel;
public class WordCountToFile extends FileExporterDescription {

	@Override
	public Class<?> getModelType() {
		return WordCountModel.class;
	}

	@Override
	public String getPathPrefix() {
		return "result/wc-*";
	}
}

getPathPrefix()で出力ディレクトリーとファイル名を指定する。
出力先はユーザーのホームディレクトリーになる。つまりWindowsだと「user.homeで指定した場所/result」というディレクトリーが作られ、その下に「wc-r-00000」といったファイルが作られる。
中間ファイル「part-r-00000」といったファイルもここに作られる。


そして、入力→処理→出力のつながり(順序・フロー)を記述する。

afw-wordcount/src/main/java/sample/jobflow/WordCountJob.java

package sample.jobflow;

import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.JobFlow;
@JobFlow(name = "WordCountJob")
public class WordCountJob extends FlowDescription {
}

JobFlowというアノテーションを付け、ジョブ名を指定する。
クラス自体はFlowDescriptionを継承する。


ジョブフローでは、コンストラクターで入力データと出力データを受け取るようだ。
それをフィールドで保持しておき、使用する。

import sample.jobflow.gateway.LineFromFile;
import sample.jobflow.gateway.WordCountToFile;
import sample.modelgen.dmdl.model.LineModel;
import sample.modelgen.dmdl.model.WordCountModel;

import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.Out;
	In<LineModel> in;
	Out<WordCountModel> out;

	public WordCountJob(
			@Import(name = "in", description = LineFromFile.class) In<LineModel> in,
			@Export(name = "out", description = WordCountToFile.class) Out<WordCountModel> out) {
		this.in = in;
		this.out = out;
	}

FlowDescriptionのdescribe()メソッドをオーバーライドし、フローを記述する。

import sample.operator.WordCountOperatorFactory;

import com.asakusafw.vocabulary.flow.Source;
	WordCountOperatorFactory operator = new WordCountOperatorFactory();

	@Override
	protected void describe() {
		Source<WordCountModel> splitResult = operator.split(in).wordCountResult;
		Source<WordCountModel> sumResult = operator.sum(splitResult).wordCountResult;
		out.add(sumResult);
	}

operator.split()やoperator.sum()が、自分で作ったオペレーターを表している。
自分でオペレーターを記述した際に、メソッドの引数で出力用のResultを定義したが、その変数名(上記のwordCountResult)で出力データを受け取るイメージ。
(自分でコーディングしたのはOperatorクラスで、それを元にOperatorFactoryが生成されているので、変数名とかを流用するようになっているのだろう)
そうして受け取ったデータを次の演算子(オペレーター)に渡し、最終的にout(出力ファイル)に渡す。

「Source」は、次の演算子への入力データ(の型)を意味すると思えばいいだろう。

なお、WordCountOperatorFactory.javaの入っているディレクトリーがビルドパスに含まれていないとEclipse上でコンパイルエラーになるかもしれない。
その場合はtarget/generated-sources/annotationsをEclipseのビルドパスに含める。

※このdescribe()メソッドはAsakusaFWがジョブフローをコンパイルする際に実行される(実行することによってフロー(グラフ)を作成している)。

ジョブフロークラスのテスト方法
ジョブフローのグラフ化(可視化)


バッチ

最後にBatch DSLで「どのジョブフローを実行するか」を記述する。
(WordCountだとシンプルすぎて、ありがたみが無いかも^^;)

参考: Asakusa FrameworkのAsakusa DSLスタートガイド#バッチを記述する

afw-wordcount/src/main/java/sample/batch/WordCountBatch.java:

package sample.batch;

import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
@Batch(name = "WordCountBatch")
public class WordCountBatch extends BatchDescription {
}

BatchDescriptionのdescribe()メソッドをオーバーライドし、どのジョブを実行するかを記述する。

import sample.jobflow.WordCountJob;
	@Override
	protected void describe() {
		run(WordCountJob.class).soon();
	}

実行するジョブのClassをrun()で指定し、soon()で“すぐ(前提条件が無いので最初に)実行する”という意味になる。

バッチクラスのテスト方法


オペレーター(演算子)のテスト

オペレーター(演算子)はHadoopでの実行時に普通に呼び出されるクラスなので、単体テストは普通にJUnit4で行う。[2011-08-01]

ただしOperatorは抽象クラスなので、直接インスタンス化することが出来ない。
Operator(演算子)クラスを保存(コンパイル)すると自動的にOperatorFactoryが作られるが、同時にOperatorImplというクラスも作られる。テストにはこれを使用する。

今回のメソッドの入力データは単なるListだからいいとして、出力データを入れるResultにはMockResultを使う。

参考: Asakusa Frameworkのアプリケーションテストスタートガイド

afw-wordcount/src/test/java/sample/operator/WordCountOperatorTest.java:

package sample.operator;

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;

import java.util.*;

import org.junit.Test;

import com.asakusafw.runtime.testing.MockResult;

import sample.modelgen.dmdl.model.LineModel;
import sample.modelgen.dmdl.model.WordCountModel;

public class WordCountOperatorTest {
	@Test
	public void testSplit() {
		WordCountOperatorImpl operator = new WordCountOperatorImpl();

		List<LineModel> list = new ArrayList<LineModel>();
		list.add(createLine("word1"));
		list.add(createLine(""));
		list.add(createLine("word3 blank"));
		list.add(createLine("word4 blank"));
		list.add(createLine("word5\ttab"));
		list.add(createLine("word6\t tab-blank"));
		list.add(createLine("word7\ttab blank"));

		MockResult<WordCountModel> result = new MockResult<WordCountModel>();
		operator.split(list, result);

		List<WordCountModel> r = result.getResults();
		String[] expected = { "word1", "word3", "blank", "word4", "blank", "word5", "tab", "word6", "tab-blank", "word7", "tab", "blank" };
		assertThat(r.size(), is(expected.length));
		for (int i = 0; i < expected.length; i++) {
			WordCountModel wc = r.get(i);
			assertThat(wc.getWordAsString(), is(expected[i]));
			assertThat(wc.getCount(), is(1));
		}
	}

	LineModel createLine(String text) {
		LineModel m = new LineModel();
		m.setTextAsString(text);
		return m;
	}
	@Test
	public void testSum() {
		WordCountOperatorImpl operator = new WordCountOperatorImpl();

		{ // リストの要素が1個
			List<WordCountModel> list = new ArrayList<WordCountModel>();
			list.add(createWordCount("word", 99));

			MockResult<WordCountModel> result = new MockResult<WordCountModel>();
			operator.sum(list, result);

			List<WordCountModel> r = result.getResults();
			assertThat(r.size(), is(1));
			assertThat(r.get(0).getWordAsString(), is("word"));
			assertThat(r.get(0).getCount(), is(99));
		}

		{ // リストの要素が複数
			List<WordCountModel> list = new ArrayList<WordCountModel>();
			String word = "word1";
			list.add(createWordCount(word, 1));
			list.add(createWordCount(word, 2));
			list.add(createWordCount(word, 3));

			MockResult<WordCountModel> result = new MockResult<WordCountModel>();
			operator.sum(list, result);

			List<WordCountModel> r = result.getResults();
			assertThat(r.size(), is(1));
			assertThat(r.get(0).getWordAsString(), is(word));
			assertThat(r.get(0).getCount(), is(6));
		}
	}

	WordCountModel createWordCount(String word, int count) {
		WordCountModel m = new WordCountModel();
		m.setWordAsString(word);
		m.setCount(count);
		return m;
	}
}

ジョブ・バッチのテスト

AsakusaFWでは、ジョブフロー(とFlowPart)とバッチのテスト用クラスが用意されている。
それらを使ってJUnit4として実行できる。

それと、DMDLをコンパイルした際に(Javaソースの他に)Excelファイルが生成されている!CSVファイルじゃなくて正真正銘のxlsファイルだぜー!?

> dir /s/b *.xls
C:\workspace\afw-wordcount\target\excel\line_model.xls
C:\workspace\afw-wordcount\target\excel\word_count_model.xls

このExcelファイルを所定の場所にコピーして、テスト用の入力データと出力結果(検証用)データを記入する。

参考: Asakusa Frameworkのアプリケーションテストスタートガイド#データフローのテスト


Windows(Cygwin)上でジョブのテストバッチのテストを実行する際の注意点として、VM引数にホームディレクトリーを指定する必要がある。 (Hadoop単独環境で、入力ファイル出力ファイルの場所を相対パスで指定している場合)

なぜかと言うと、ジョブ・バッチのテストの実行では実際にHadoopを実行することになるのだが、Windows上のEclilpseから起動されるテストクラス(AsakusaFW)とCygwin経由で起動されるHadoopではホームディレクトリーの位置が異なる為。
AsakusaFWはテストの実行開始前に所定の場所に入力ファイルを作成し、Hadoopの出力場所をクリアする(Hadoopでは出力先ディレクトリーが既に存在しているとエラーになるから)。
しかしホームディレクトリーの位置が違ってしまっているので、双方が一致しなくなり、エラーになる。
(Hadoopの単独環境だとホームディレクトリーからの相対パスとなるのでこの問題が起こるが、分散環境ならHDFS上のユーザーディレクトリーからの相対パスとなるはずなので、たぶんこういった不一致は起こらない)

  1. 一度テストクラスを実行する。(エラーになる)
  2. Eclipseのメニューバーの「実行(R)」→「実行構成(N)」で実行構成ダイアログを開く。
  3. 左側の一覧から、対象のテストを選択する。
  4. 右側のペインで「引数」タブを選択する。
  5. 「VM引数(G)」に「-Duser.home=C:/cygwin/home/ユーザー」を指定する。(Cygwinでログインした際のユーザー)

ジョブフローのテスト

参考: Asakusa Frameworkのアプリケーションテストスタートガイド#ジョブフローのテスト

パッケージエクスプローラー上でジョブフロークラスのソース(WordCountJob)を右クリックして、「新規(W)」→「JUnitテスト・ケース」でJUnit4のテスト用クラスを作成する。
その際、作成先のディレクトリーは「afw-wordcount/src/test/java」とする。

afw-wordcount/src/test/java/sample/jobflow/WordCountJobTest.java

package sample.jobflow;

import org.junit.Test;

import sample.modelgen.dmdl.model.LineModel;
import sample.modelgen.dmdl.model.WordCountModel;

import com.asakusafw.testdriver.JobFlowTester;
public class WordCountJobTest {

	@Test
	public void testDescribe() {
		String inDataSheet = "line_model.xls";
		String outDataSheet = "word_count_model.xls";

		JobFlowTester tester = new JobFlowTester(getClass());

		tester.input("in", LineModel.class).prepare(inDataSheet + "#input");
		tester.output("out", WordCountModel.class).verify(outDataSheet + "#output", outDataSheet + "#rule");

		tester.runTest(WordCountJob.class);
	}
}

driver.input()で入力データを指定する。
第1引数はジョブフローで指定した入力データの名前。つまりWordCountJobのコンストラクターの「@Import(name = "in"」。 この名前を使って入力データのファイル名を取得しているようだ。(一致しているものが無いと、入力ファイルを作れない)
そのinput()メソッドの後に続いているprepare()で、入力データのExcelファイル名(line_model.xml)を指定する。

driver.output()もdriver.input()と同様。
第1引数はジョブフローの出力データの名前「@Export(name = "out"」。検証時にこの名前を使ってデータを取得するようだ。(一致しているものが無いと、実行が終わった後のチェックでエラーになる)
verify()メソッドの第1引数が出力データのExcelファイル名、第2引数がチェック用ルールのExcelファイル名

「#input」「#output」「#rule」は、そのExcelファイル内のシート名を表している。
数値を使って 「#:1」や「#:2」の様に記述することもでき、その場合は「#:1」が左から2番目のoutputシート、「#:2」が3番目のruleシートを表す。
省略した場合は「#:0」扱いで、すなわち一番左にあるinputシートを指す。

※Windowsで実行する際は、ホームディレクトリーの指定を忘れずに!


ジョブフローのテストで使用するテストデータをExcelファイルで(またはJSON形式で)用意する必要がある。

参考: Asakusa FrameworkのExcelによるテストデータ定義

afw-wordcount/target/excelの下にExcelファイルが自動生成されているので、それをsrc/test/resources/sample/jobflow/の下にコピーし、その中にデータを記述する。

afw-wordcount/src/test/resources/sample/jobflow/line_model.xlsのinputシート:

  A B C
1 text    
2 Hello Hadoop World    
3 Hello Asakusa    
4      
5      

入力データはinputシートに書く。
1行目(色付きのセル)は自動で入っている。

afw-wordcount/src/test/resources/sample/jobflow/word_count_model.xlsのoutputシート:

  A B C
1 word count  
2 Asakusa 1  
3 Hadoop 1  
4 Hello 2  
5 World 1  
6      

期待される出力データをoutputシートに書く。
1行目(色付きのセル)は自動で入っている。

afw-wordcount/src/test/resources/sample/jobflow/word_count_model.xlsのruleシート:

  A B C D E
1 Format EVR-1.0.0      
2 全体の比較 全てのデータを検査 [Strict]      
3 プロパティ 値の比較 NULLの比較 コメント  
4 word 検査キー [Key] NULLなら常に失敗 [DA] 発生した単語  
5 count 完全一致 [=] NULLなら常に失敗 [DA] 発生回数  
6          

outputシートに書かれているデータと実際のデータをどう比較してどういう状態ならテストOK(あるいはNG)とするかをruleシートに書く。
どのセルも初期値は自動で入っている。
(「コメント」の“発生した単語”や“発生回数”は、DMDLに自分が記述した各項目のコメント)

今回のケースでは出力項目がwordとcountの二項目なので、それぞれどういう比較をするかを書く。(書くというか、実際はプルダウンになっているので、選択する)

このシートを見ただけで予想が付くようになっているのは素晴らしい。

Excelファイルを使用せず、独自のデータを入力データにする方法


バッチのテスト

バッチクラスのテスト方法は、ジョブフローのテスト方法とほとんど同じ。[2011-08-04]

参考: Asakusa Frameworkのアプリケーションテストスタートガイド#バッチのテスト

Excelデータについては、ジョブフローのテスト用と全く同じものでも問題ない。

Excelファイルのコピー元・コピー先
コピー元 afw-wordcount/target/excel/ line_model.xls
word_count_model.xls
ジョブフローのテスト用 afw-wordcount/src/test/resources/sample/jobflow/ line_model.xls
word_count_model.xls
バッチのテスト用 afw-wordcount/src/test/resources/sample/batch/ line_model.xls
word_count_model.xls

afw-wordcount/src/test/java/sample/jobflow/WordCountBatchTest.java:

package sample.batch;

import org.junit.Test;

import sample.modelgen.dmdl.model.LineModel;
import sample.modelgen.dmdl.model.WordCountModel;

import com.asakusafw.testdriver.BatchTester;
public class WordCountBatchTest {

	@Test
	public void testDescribe() {
		String inDataSheet = "line_model.xls";
		String outDataSheet = "word_count_model.xls";

		BatchTester tester = new BatchTester(getClass());

		tester.jobflow("WordCountJob").input("in", LineModel.class).prepare(inDataSheet + "#input");
		tester.jobflow("WordCountJob").output("out", WordCountModel.class).verify(outDataSheet + "#output", outDataSheet + "#rule");

		tester.runTest(WordCountBatch.class);
	}
}

tester.jobflow()には、ジョブフローのアノテーション「@JobFlow(name = "WordCountJob")」で付けた名前を指定する。
そこ以降のinput()・output()については、ジョブフローのテストと全く同じ書き方。

※実行する際は、ホームディレクトリーの指定を忘れずに!



単純集計演算子(Summarize)による集計

単語の集計をするオペレーターを単純集計演算子Summarizeで作ってみる。[2011-08-01]
参考: Asakusa FrameworkのAsakusa DSLスタートガイド#単純集計演算子の実装例

afw-wordcount/src/main/java/sample/operator/WordCountOperator.java

import com.asakusafw.vocabulary.operator.Summarize;
	@Summarize
	public abstract WordCountTotal sum2(WordCountModel wc);

Summarizeの場合、メソッド本体は記述しない(抽象メソッドにする)。
引数には集計対象のモデル、戻り型には集計結果のモデルを指定する。
集計結果のクラス(集計モデル)はとりあえずWordCountTotalという名前を付けてみた。この時点ではWordCountTotalクラスは存在しないのでコンパイルエラーになる。
(集計モデルは専用のDMDLを記述する必要があるので、既存のWordCountModelを戻り型に指定することは出来ない)


word_count.dmdlにでも集計モデルの定義を追加してやる(一ファイルの中に複数の定義を記述できる)。
参考: Asakusa FrameworkのDMDLユーザーガイド#集計モデルを定義する

"WordCountの集計モデル"
summarized word_count_total = word_count_model => {
  any word -> word;
  sum count -> count;
} % word;

summarizedを先頭に付け、集計モデル名(word_count_total)を指定する。
その後ろに集計元となるモデル名(word_count_model)を書く。
そこから「=>」で続けて、演算の種類を書く。sumで集計する(countで件数カウントも出来る模様)。集計キーとなる項目にはanyを付ける。
「->」で項目の別名(というか集計モデルでの項目名)を付けられる。同じ名前でいい場合でも省略できない。
「%」で集計キーとなる項目を指定する。(SQLのgroup byのような感じか)

※この記述上では型を指定していないので同じ型になると期待していたのだが、word_count_modelのcountはINTなのに、word_count_totalのcountはLONGになるようだ。


DMDLを書いたら、Javaソースを生成する。
コマンドプロンプトからMavenのコマンドを実行する。(もしくはM2Eclipseから「generate-resources」を実行する)

> cd /d C:\workspace\afw-wordcount
> mvn generate-resources

これでクラスが生成されたので、WordCountOperator.javaに戻って、インポート文を追加してやる。

import sample.modelgen.dmdl.model.WordCountTotal;

ジョブフローを定義する。

afw-wordcount/src/main/java/sample/jobflow/WordCountJob.java

import sample.modelgen.dmdl.model.WordCountTotal;
	@Override
	protected void describe() {
		Source<WordCountModel> splitResult = operator.split(in).wordCountResult;

//		Source<WordCountModel> sumResult = operator.sum(splitResult).wordCountResult;
		Source<WordCountTotal> sumResult = operator.sum2(splitResult).out;

		out.add(sumResult);
	}

sumResultの型が変わった(Model→Total)ので、out.add()がコンパイルエラーになる。
WordCountJobのコンストラクターをWordCountTotalに変える必要がある。

	In<LineModel> in;
	Out<WordCountTotal> out;

	public WordCountJob(
			@Import(name = "in", description = LineFromFile.class) In<LineModel> in,
			@Export(name = "out", description = WordCountToFile.class) Out<WordCountTotal> out) {
		this.in = in;
		this.out = out;
	}

また、WordCountToFileの型もそれに合わせて変える必要がある。

	@Override
	public Class<?> getModelType() {
		return WordCountTotal.class;
	}

テストの実行

Operatorクラスの単体テストとしては、Summarizeはメソッド内容が自動的に生成されるので、テストケースを作る必要は無い(という考え方のようだ)。

Operatorクラスのテストに使うOperatorImplを見ると、以下のようになっている。

	@Override public WordCountTotal sum2(WordCountModel wc) {
		throw new UnsupportedOperationException("単純集計演算子は組み込みの方法で処理されます");
	}

つまり、OperatorImplを使ってインスタンスを生成しても、このメソッドを呼び出してテストすることは出来ないわけだ。

ジョブフローのテストは出力対象クラスが変わったので修正しておく必要がある。

public class WordCountJobTest {

	@Test
	public void testDescribe() {
		String inDataSheet = "line_model.xls";
		String outDataSheet = "word_count_model.xls";

		JobFlowTester driver = new JobFlowTester(this.getClass());

		driver.input("in", LineModel.class).prepare(inDataSheet + "#input");
		driver.output("out", WordCountTotal.class).verify(outDataSheet + "#output", outDataSheet + "#rule");

		driver.runTest(WordCountJob.class);
	}
}

本来なら出力データを記述しているExcelファイルも作り直すべきなのかもしれないが、今回の例では項目名は全く変わっていないので、そのまま使える。


部分集約(partialAggregation)

単純集計演算子(Summarize)のリファレンスを見ていたら、部分集約(partialAggregation)というものが載っていた。[2011-08-03]
「部分集約を行う場合、〜、ネットワークの転送量を削減しようとします」と説明されているので、MapReduceのCombinerのような中間集計のことなのだろう。

部分集約は、Summarizeアノテーションの引数として指定する。

afw-wordcount/src/main/java/sample/operator/WordCountOperator.java

import com.asakusafw.vocabulary.flow.processor.PartialAggregation;
	@Summarize(partialAggregation=PartialAggregation.PARTIAL)
	public abstract WordCountTotal sum2(WordCountModel wc);

PartialAggregation.TOTAL(部分集約(中間集計)を行わない)を指定したら、ちゃんとログ上のcombine入出力件数が0件になったよ。面白いw


変換演算子(Convert)による変換

Summarizeを使う例としてジョブフローの出力の型(モデル)をWordCountTotalに変更したが、
最終的に出力する前にWordCountTotalからWordCountModelへ変換してやれば、フロー以降(WordCountJobのExportクラスとWordCountToFileとテスト用データのクラス指定)は修正しなくても済む。[2011-08-02]

モデル(クラス)を変換するには、Convert(変換演算子)を使用する。

afw-wordcount/src/main/java/sample/operator/WordCountOperator.java

import com.asakusafw.vocabulary.operator.Convert;
	private WordCountModel wordCountModel = new WordCountModel();

	@Convert
	public WordCountModel wordCountTotal2Model(WordCountTotal t) {
		wordCountModel.setWord(t.getWord());
		wordCountModel.setCount((int) t.getCount());
		return wordCountModel;
	}

ただ単にフィールドを移し替えているだけ。
AsakusaFWはスレッドセーフ(Operatorのインスタンスがスレッド毎に作られるという意味だろう)なので、移し先のインスタンスをフィールドに置いておいて使いまわしても大丈夫らしい。
(ここで初めて気付いたんだけど、WordCountModelの件数項目はint型なのに、集計モデルの集計フィールドはlong型だった)

afw-wordcount/src/main/java/sample/jobflow/WordCountJob.java

	@Override
	protected void describe() {
		Source<WordCountModel> splitResult = operator.split(in).wordCountResult;

		Source<WordCountTotal> sumResult = operator.sum2(splitResult).out;
		Source<WordCountModel> convertResult = operator.wordCountTotal2Model(sumResult).out;

		out.add(convertResult);
	}

フローの出力の型はWordCountModelで行ける。

これでOK!と思ってWordCountJobTestを実行したら、こんなエラーが…。

java.lang.RuntimeException: java.io.IOException: 実行計画の作成に失敗しました ([WordCountOperator.wordCountTotal2Modelの出力originalは結線されていません ([WordCountOperator.wordCountTotal2Model(operator#15136722)])])
	at com.asakusafw.testdriver.JobFlowTester.runTest(JobFlowTester.java:95)
〜
Caused by: java.io.IOException: 実行計画の作成に失敗しました ([WordCountOperator.wordCountTotal2Modelの出力originalは結線されていません ([WordCountOperator.wordCountTotal2Model(operator#15136722)])])
	at com.asakusafw.compiler.flow.FlowCompiler.plan(FlowCompiler.java:172)
〜

「出力originalが結線していない」とな? と思ってWordCountOperatorFactoryのソースを見たり変換演算子の説明を改めて見たりしたら、Convertではoutに変換したデータが出力され、originalに元々の入力データがそのまま出力される。
つまり、originalという出力データをフロー上でどこにも渡していないからこのエラーになったようだ。


データをどこにも渡す必要が無い(データを捨てていい)場合、フロー上で停止演算子stopへデータを渡す。(逆に0件の入力が欲しい場合は空演算子emptyが使える)
stop(やempty)はCoreOperatorFactoryというクラスでAsakusaFWによって定義されているので、それを使う。

afw-wordcount/src/main/java/sample/jobflow/WordCountJob.java

import sample.operator.WordCountOperatorFactory.WordCountTotal2Model;

import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
	CoreOperatorFactory core = new CoreOperatorFactory();

	@Override
	protected void describe() {
		Source<WordCountModel> splitResult = operator.split(in).wordCountResult;
		Source<WordCountTotal> sumResult = operator.sum2(splitResult).out;

		WordCountTotal2Model convert = operator.wordCountTotal2Model(sumResult);
		out.add(convert.out);
		core.stop(convert.original);
	}

再構築演算子(restructure)による変換

先ほどは変換演算子ConvertによってWordCountTotalからWordCountModelへ変換したが、AsakusaFW 0.2.1では、プロパティー(項目名と型)が同一な場合は再構築演算子restructureを使うのが推奨されている。[2011-08-02]

restructureもCoreOperatorFactoryで定義されているので、自分でOperatorを作る必要は無い。

afw-wordcount/src/main/java/sample/jobflow/WordCountJob.java

import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
	CoreOperatorFactory core = new CoreOperatorFactory();

	@Override
	protected void describe() {
		Source<WordCountModel> splitResult = operator.split(in).wordCountResult;
		Source<WordCountTotal> sumResult = operator.sum2(splitResult).out;

		out.add(core.restructure(sumResult, WordCountModel.class));
	}

restructure()の第2引数には、変換先のクラスを指定する。


で、実行してみたら、エラーが出た。

ERROR c.a.c.flow.FlowCompilingEnvironment - restructureにおいて、DataModelClass(sample.modelgen.dmdl.model.WordCountTotal).countとDataModelClass(sample.modelgen.dmdl.model.WordCountModel).countのプロパティ型が一致しません

「WordCountTotalのcountとWordCountModelのcountのプロパティ型が一致しない」って。
WordCountModelのcountはint型なのだが、それの集計モデルであるWordCountTotalは何故かlong型になってしまっているんだよね。

集計モデルのDMDLで型を指定する方法は無いようだ。
sum count -> count : INT;」とか「sum count : INT -> count;」を試してみたが、駄目だった。

仕方が無いので、word_count_modelをINTからLONGに変えてやったら、動いた。

※併せてWordCountOperatorTestの「assertThat(wc.getCount(), is(1));」とかも「is(1L)」(long型)にしないとコンパイルエラーになる。


畳み込み演算子(Fold)による集計

AsakusaFWのOperator DSLには畳み込み演算子(Fold)というアノテーションがある。[2011-08-03]
これを使って集計処理をすることが出来る。

ScalafoldLeftという関数を勉強してなかったら「たたみこみ?fold?意味不明」と思ってスルーしてたところだ^^;
「畳み込み」とは、コレクション(Listとか)内の要素同士を計算して結果を出すもの。
例えばScalaで「List(1,10,100).foldLeft(0){ _ + _ }」と書くと、1・10・100が入っているListに対し、各要素に演算を行っていく。foldLeft直後の括弧内が初期値で、最初に初期値と先頭要素で演算を行う。すなわち「0+1」を行う。そしてその結果の1と次の10で「1+10」を行い、その結果の11と次の100で「11+100」を行い、最終的に111という値が返る。
ScalaではfoldLeftと似た関数にreduceLeftというものがあり、そちらは「List(1,10,100).reduceLeft{ _ + _ }」の様に書く。foldLeftと違って初期値が与えられていないので、最初に先頭2つの要素同士で計算する。後は同じで、最終的に111が返る。
AsakusaFWのFold演算子はScalaのreduceLeftに近い。

afw-wordcount/src/main/java/sample/operator/WordCountOperator.java

import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.Fold;
	@Fold
	public void sum3(@Key(group = "word") WordCountModel left, WordCountModel right) {
		left.setCount(left.getCount() + right.getCount());
	}

Foldでは、leftとrightの2つの引数をとる。@Keyで集計キーを指定する。(キー毎に処理する為)
そして集計(演算)結果をleftに入れるようにする。

ちなみに、Foldも部分集約(中間集計)を行える。

	@Fold(partialAggregation=PartialAggregation.PARTIAL)
	public void sum3(@Key(group = "word") WordCountModel left, WordCountModel right) {
		left.setCount(left.getCount() + right.getCount());
	}

afw-wordcount/src/main/java/sample/jobflow/WordCountJob.java

	@Override
	protected void describe() {
		Source<WordCountModel> splitResult = operator.split(in).wordCountResult;
		Source<WordCountModel> sumResult = operator.sum3(splitResult).out;
		out.add(sumResult);
	}

うん、オペレーターもジョブフローも、Foldが一番シンプルに書ける^^


抽出演算子(Extract)による分割

最初はCoGroupで単語分割を行っていたが、Extractという演算子があるのに気付いた![2012-01-17]

CoGroupでも動作はするが、CoGroupはReduceタイプなので、Reducerで動く。つまり事前にShuffleフェーズでソートされることになり、WordCountとしては非常に効率が悪い。
ExtractはMapタイプなので、Mapperで動く。WordCountの単語分割にとってはこちらの方が効率が良い。

import com.asakusafw.vocabulary.operator.Extract;
	@Extract
	public void split(LineModel line, Result<WordCountModel> wordCountResult) {
		String text = line.getTextAsString();

		StringTokenizer tokenizer = new StringTokenizer(text);
		while (tokenizer.hasMoreTokens()) {
			String word = tokenizer.nextToken();

			WordCountModel wc = new WordCountModel();
			wc.setWordAsString(word);
			wc.setCount(1);

			wordCountResult.add(wc);
		}
	}

AsakusaFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま