S-JIS[2011-07-25/2016-07-31] 変更履歴

AsakusaFWでWordCount

Asakusa Framework0.8.1でWordCountを作ってみる。


前提

入力はベタのテキストファイルとし、出力をCSVファイルとする。
(AsakusaFW 0.8.1より前はベタのテキストを扱うには工夫が必要だったが、0.8.1でベタのテキストを読み込むDirect I/O lineが加わったので、それを使用する)

開発環境としては、EclipseにShafuDMDL EditorXがインストールされているものとする。


WordCountプロジェクトの作成

一番最初に、Eclipseで扱えるWordCount用プロジェクトを作成する。(Shafuの新規プロジェクト作成機能を使用する)

  1. Eclipseのメニューバーの「File」→「New」→「Gradleプロジェクトをテンプレートから生成」を選択し、ダイアログを開く。
  2. 「新規プロジェクト情報」ページで、プロジェクト名を入力する。
    今回の例では「afw-wordcount」とする。
  3. 「プロジェクトテンプレートの生成」ページでテンプレートを選択する。
    1. 「URLを指定してプロジェクトテンプレートをダウンロードする」を選択し、右側の「選択」ボタンを押す。
      1. 「Asakusa Project Template <MapReduce>」の最新バージョンを選択する。
    2. 「Gradleの設定」の「プロジェクトをインポートする前にビルドを実行する」にチェックを付けておく。
  4. 「Finish」ボタンを押すと、プロジェクトが作成される。

モデル

最初に、入力データ・出力データを表すクラス(モデル)を作成する。

モデルはdmdlというファイルに定義を書き、モデルジェネレーターによってJavaソースに変換される。
入力ファイル・出力ファイルの一行分のデータ構造をDMDL(Data Model Definition Language)という言語で記述する。(JSONぽい感じ。見ればすぐ分かる)

このDMDLファイル内にはまずモデル名を書くが、これがクラス名に変換される。
モデル名はファイル名と異なっていてもいいし、1つのファイル内に複数のモデルを書いてもよい。
ダブルクォーテーションで囲んだ部分が、JavaソースのJavadocコメントになる。

参考: Asakusa FrameworkのDMDLユーザーガイド


ここではmodels.dmdlというファイルを作成し、データモデルを記述する。

WordCountであれば、入力は単なる文字列(text)、出力は単語(word)と出現数(count)。

afw-wordcount/src/main/dmdl/models.dmdl:

"テキストファイルの一行に対応するエンティティ"
@directio.line
line_model = {

  "テキスト"
  text : TEXT;
};

"単語と発生回数の対を表現するエンティティ"
@directio.csv
word_count_model = {

  "発生した単語"
  word : TEXT;

  "発生回数"
  count : INT;
};

「@directio.line」というのは、そのデータモデルをHDFS上のテキストファイルとして扱うという印。
「@directio.csv」というのは、そのデータモデルをHDFS上のCSVファイルとして扱うという印。


作成したDMDLファイルからデータモデルクラス(Javaソース)を生成する。
また、@directio.lineや@directio.csv属性が付いていると、入出力ファイルとしての定義クラス(Importer/Exporterの抽象クラス)も生成されるようになる。(ジョブフローの作成時に関係してくる)

Shafuを使う場合は以下のようにして生成する。

  1. パッケージエクスプローラー上でプロジェクト(afw-wordcount)を選択し、右クリックしてコンテキストメニューを出す。
  2. 「Jinrikisha(人力車)」→「DMDLからデータモデルクラスを作成」を実行する。

あるいは、DMDL EditorXを使う場合は以下のようにする。

  1. パッケージエクスプローラー上でプロジェクト内のファイルを(適当に)選択する。
  2. ツールバーの「Asakusa FrameworkのDMDLコンパイラーを起動してJavaクラスを生成」ボタンを押す。

オペレーター

次に、オペレーター(演算子)を作成する。

オペレーターは、HadoopのMapReduceのMapper#map()やReducer#reduce()に相当する部分。
つまりデータ変換や集計といった実際の処理を記述する。

オペレーター用のクラスを用意し、その中にメソッドを定義して処理を記述していく。(このメソッドは、実行時に実際に呼ばれる)
メソッドには、どのような処理を行うかに応じてAsakusa Frameworkで用意されているアノテーションを付ける必要がある。
そして、アノテーションの種類(というか処理の種類)によって、メソッドの書き方(引数や戻り値など)が変わってくる。


オペレーターはJavaの抽象クラス内に定義する決まりになっているので、最初に空のクラスを作る。

afw-wordcount/src/main/java/com/example/operator/WordCountOperator.java

package com.example.operator;

public abstract class WordCountOperator {

}

HadoopのMapReduceのMapperやReducerではクラスの中に1メソッド(map()やreduce())しか書かないが、
AsakusaFWのOperatorクラスは、その1つのクラスの中に複数のメソッド(テキスト分割のメソッド集計のメソッド)を記述する。


テキストの分割処理

まず、入力データを単語ごとに分割する処理を考える。

入力データは一種類で、出力は入力1件(テキスト1行)に対して複数件(複数の単語)になる。

このように複数件の出力を行うには、メソッドの出力部分にResultというクラスが指定できる演算でなければならない。(Result#add()を何度も呼び出してデータを出力する為)
各演算子のサンプルを見てみると、Resultが指定できそうなのはExtractSplitGroupSortCoGroup
Splitは名前からして分割に使えそうに思えるが、結合されたデータモデルを分割するものなので、 全く目的に合わない。
GroupSortも入力が複数件になり、そもそも今の時点でソートは不要(冗長な処理になる)なので、却下。
CoGroupは複数種類のデータを入力としてキーで結合するものであり、今回の目的には冗長なので、却下。
という訳で、1件読み込んで複数件出力できるExtractを採用する。

import java.util.StringTokenizer;

import com.asakusafw.runtime.core.Result;
import com.asakusafw.vocabulary.operator.Extract;
import com.example.modelgen.dmdl.model.LineModel;
import com.example.modelgen.dmdl.model.WordCountModel;
	/**
	 * 単語と発生回数の対を表現するエンティティ
	 */
	private final WordCountModel wordCount = new WordCountModel();
	/**
	 * 単語分割
	 * 
	 * @param line
	 * 	テキストファイルの一行
	 * @param wordCountResult
	 * 	単語と発生回数の対
	 */
	@Extract
	public void split(LineModel line, Result<WordCountModel> wordCountResult) {
		String text = line.getTextAsString();

		StringTokenizer tokenizer = new StringTokenizer(text);
		while (tokenizer.hasMoreTokens()) {
			String word = tokenizer.nextToken();

			wordCount.setWordAsString(word);
			wordCount.setCount(1);

			wordCountResult.add(wordCount);
		}
	}

Extract演算子の場合、MapReduceのMapper#map()と同様に、入力の1行につき1回ずつ呼ばれる。

DMDL上でTEXTとして宣言した項目は、Javaのクラス上はHadoopのTextクラスになっている。
が、セッター・ゲッターには「asString」という接尾辞が付いたメソッドが用意されているので、Stringクラスで読み書きできる。

出力は、MapReduceのcontext.write()の代わりにResultにaddしていく。
出力に使うインスタンスは、フィールドに定義しておいて使い回すことが出来る。
(ここで付けたResultの引数名は、後でジョブフローを定義する際に使われる 。ひとつしか出力が無い場合は、大抵はoutという名前を付ける。(今回は使われていることを分かり易くする為に長ったらしい名前にしてみた)


なお、ここで下記の様な中途半端な状態(コーディング途中)にすると、コンパイルエラーが発生する。(プロジェクトを自動的にビルドする設定になっている場合)

import com.asakusafw.vocabulary.operator.Extract;

public class WordCountOperator {

	@Extract
	public void split() {}
}

エラー位置はクラス名部分、エラーメッセージは「演算子クラスcom.example.operator.WordCountOperatorはabstractとして宣言する必要があります」。
他にもExtractアノテーションの場合はメソッドの引数にResultが要る等のエラーメッセージを出してくれる。

このエラーは、AsakusaFW(Ashigelコンパイラー)が出しているもの。
AsakusaFWのインストール(Eclipseの設定)をすると、Asakusa DSLの文法に沿っているかどうかをチェックする機能が入る。すごい!


単語数の集計処理

次に単語数の集計処理。

HadoopのMapReduceに倣ってCoGroupを使うことにする。(今回は入力が1種類しか無いので、GroupSortでも構わないが)
なお、AsakusaFWとしては「CoGroupを使うのは最後の手段」とされているので、今回の例ならSummarizeを使った集計処理Foldを使った集計処理の方がよい。

入力リストのキーに単語項目を指定すれば、単語ごとにメソッドが呼ばれるようになる。

import java.util.List;

import com.asakusafw.runtime.core.Result;
import com.asakusafw.runtime.value.StringOption;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.CoGroup;
	@CoGroup
	public void sum(
		@Key(group = { "word" }) List<WordCountModel> wordCountList,
		Result<WordCountModel> wordCountResult
	) {
		StringOption word = wordCountList.get(0).getWordOption();

		int count = 0;
		for (WordCountModel wc : wordCountList) {
			count += wc.getCount();
		}

		wordCount.setWordOption(word);
		wordCount.setCount(count);

		wordCountResult.add(wordCount);
	}

出力に使うインスタンスは使い回すことが出来るので、分割処理(split)に使ったのと同じwordCountフィールドを使っている。
(実際に実行される際には、演算子メソッドが異なる場合はOperatorインスタンス自体が別のものになるので、同じwordCountフィールドを使っているように見えても、分割処理と同一のインスタンスが使われるわけではない)


オペレータークラスを書く(保存する)と、(Eclipseがプロジェクトを自動的にビルドする設定になっている場合は)自動的にオペレーターファクトリークラスが生成される。
ジョブフローを記述する際に使うのはファクトリークラス)

なお、Operatorのソースを変更(して保存)する度に自動的にOperatorFactory(やOperatorImpl)も更新されるが、
Operatorを削除した場合はOperatorFactory(とOperatorImpl)は削除されず、(Operatorクラスが見つからなくて)コンパイルエラーになる。

オペレーター(演算子)の単体テスト方法


ジョブフロー

次に、ジョブフローを定義する。
ここで「どのファイルを入力とし、どのオペレーターを呼んで、どのファイルへ出力するか」を記述する。

参考: Asakusa FrameworkのAsakusa DSLスタートガイド#データフローを記述する


まずは入力ファイル(Importer)を定義する。

DMDLファイル上のモデル定義に@directio.という属性を付けておくとImporterの雛形クラス(抽象クラス)が生成されるので、それを継承した具象クラスを作成する。

afw-wordcount/src/main/java/com/example/jobflow/gateway/LineFromFile.java:

package com.example.jobflow.gateway;

import com.example.modelgen.dmdl.csv.AbstractLineModelCsvInputDescription;
public class LineFromFile extends AbstractLineModelLineInputDescription {

	@Override
	public String getBasePath() {
		return "input/file01";
	}

	@Override
	public String getResourcePattern() {
		return "*.csv";
	}
}

line_modelというモデル名に@directio.lineを付けていた場合、AbstractLineModelLineInputDescription(Abstract+モデル名+LineInputDescription)というクラスが生成されているので、それを使う。

ベースパスやリソースパターンはDirect I/O固有の設定。
ベースパスはディレクトリー名相当、リソースパターンがファイル名。
ベースパスは相対パスっぽい指定になる。具体的にどこになるのかは、実行環境の設定によって変わる。


次いで出力ファイル(Exporter)。これもImporterと同様。

afw-wordcount/src/main/java/com/example/jobflow/gateway/WordCountToFile.java:

package com.example.jobflow.gateway;

import com.example.modelgen.dmdl.csv.AbstractWordCountModelCsvOutputDescription;
public class WordCountToFile extends AbstractWordCountModelCsvOutputDescription {

	@Override
	public String getBasePath() {
		return "result/wordcount";
	}

	@Override
	public String getResourcePattern() {
		return "wc-*";
	}
}

word_count_modelというモデル名に@directio.csvを付けていた場合、AbstractWordCountModelCsvOutputDescription(Abstract+モデル名+CsvOutputDescription)というクラスが生成されているので、それを使う。


そして、入力→処理→出力のつながり(順序・フロー)を記述する。

afw-wordcount/src/main/java/com/example/jobflow/WordCountJob.java

package com.example.jobflow;

import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.JobFlow;
@JobFlow(name = "WordCountJob")
public class WordCountJob extends FlowDescription {
}

JobFlowというアノテーションを付け、ジョブ名を指定する。
クラス自体はFlowDescriptionを継承する。


ジョブフローでは、コンストラクターで入力データと出力データを受け取るようだ。
それをフィールドで保持しておき、使用する。

import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.Out;

import com.example.jobflow.gateway.LineFromFile;
import com.example.jobflow.gateway.WordCountToFile;
import com.example.modelgen.dmdl.model.LineModel;
import com.example.modelgen.dmdl.model.WordCountModel;
	private final In<LineModel> in;
	private final Out<WordCountModel> out;

	public WordCountJob(
			@Import(name = "in", description = LineFromFile.class) In<LineModel> in,
			@Export(name = "out", description = WordCountToFile.class) Out<WordCountModel> out) {
		this.in = in;
		this.out = out;
	}

FlowDescriptionのdescribe()メソッドをオーバーライドし、フローを記述する。

import com.example.operator.WordCountOperatorFactory;

import com.asakusafw.vocabulary.flow.Source;
	WordCountOperatorFactory operator = new WordCountOperatorFactory();

	@Override
	protected void describe() {
		Source<WordCountModel> splitResult = operator.split(in).wordCountResult;
		Source<WordCountModel> sumResult = operator.sum(splitResult).wordCountResult;
		out.add(sumResult);
	}

operator.split()やoperator.sum()が、自分で作ったオペレーターを表している。
自分でオペレーターを記述した際に、メソッドの引数で出力用のResultを定義したが、その変数名(上記のwordCountResult)で出力データを受け取るイメージ。
(自分でコーディングしたのはOperatorクラスで、それを元にOperatorFactoryが生成されており、変数名等が流用されている)
そうして受け取ったデータを次の演算子(オペレーター)に渡し、最終的にout(出力ファイル)に渡す。

「Source」は、次の演算子への入力データ(の型)を意味すると思えばいいだろう。

※このdescribe()メソッドはAsakusaFWがジョブフローをコンパイルする際に実行される(実行することによってフロー(グラフ)を作成している)。

ジョブフロークラスのテスト方法
ジョブフローのグラフ化(可視化)


バッチ

最後にBatch DSLで「どのジョブフローを実行するか」を記述する。
(WordCountだとシンプルすぎて、ありがたみが無いかも^^;)

参考: Asakusa FrameworkのAsakusa DSLスタートガイド#バッチを記述する

afw-wordcount/src/main/java/com/example/batch/WordCountBatch.java:

package com.example.batch;

import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
@Batch(name = "WordCountBatch")
public class WordCountBatch extends BatchDescription {
}

BatchDescriptionのdescribe()メソッドをオーバーライドし、どのジョブを実行するかを記述する。

import com.example.jobflow.WordCountJob;
	@Override
	protected void describe() {
		run(WordCountJob.class).soon();
	}

実行するジョブのClassをrun()で指定し、soon()で“すぐ(前提条件が無いので最初に)実行する”という意味になる。

バッチクラスのテスト方法


オペレーター(演算子)のテスト

オペレーター(演算子)はバッチアプリケーションとしての実行時に普通に呼び出されるクラスなので、単体テストは普通にJUnit4で行う。

ただしOperatorは抽象クラスなので、直接インスタンス化することが出来ない。
Operator(演算子)クラスを保存(コンパイル)すると自動的にOperatorFactoryが作られるが、同時にOperatorImplというクラスも作られる。テストにはこれを使用する。

今回のメソッドの入力データは単なるListだからいいとして、出力データを入れるResultにはMockResultを使う。

参考: Asakusa Frameworkのアプリケーションテストスタートガイド

afw-wordcount/src/test/java/com/example/operator/WordCountOperatorTest.java:

package com.example.operator;

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;

import java.util.*;

import org.junit.Test;

import com.asakusafw.runtime.testing.MockResult;

import com.example.modelgen.dmdl.model.LineModel;
import com.example.modelgen.dmdl.model.WordCountModel;

public class WordCountOperatorTest {
	@Test
	public void testSplit() {
		testSplit("word1", "word1");
		testSplit("");
		testSplit("word3 blank", "word3", "blank");
		testSplit("word4 blank", "word4", "blank");
		testSplit("word5\ttab", "word5", "tab");
		testSplit("word6\t tab-blank", "word6", "tab-blank");
		testSplit("word7\ttab blank", "word7", "tab", "blank");
	}

	private void testSplit(String text, String... expected) {
		LineModel line = new LineModel();
		line.setTextAsString(text);

		MockResult<WordCountModel> result = new MockResult<WordCountModel>() {
			@Override
			protected WordCountModel bless(WordCountModel result) {
				WordCountModel copy = new WordCountModel();
				copy.copyFrom(result);
				return copy;
			}
		};

		WordCountOperatorImpl operator = new WordCountOperatorImpl();
		operator.split(line, result);

		List<WordCountModel> r = result.getResults();
		assertThat(r.size(), is(expected.length));
		for (int i = 0; i < expected.length; i++) {
			WordCountModel wc = r.get(i);
			assertThat(wc.getWordAsString(), is(expected[i]));
			assertThat(wc.getCount(), is(1));
		}
	}
	@Test
	public void testSum() {
		WordCountOperatorImpl operator = new WordCountOperatorImpl();

		{ // リストの要素が1個
			List<WordCountModel> list = new ArrayList<WordCountModel>();
			list.add(createWordCount("word", 99));

			MockResult<WordCountModel> result = new MockResult<WordCountModel>();
			operator.sum(list, result);

			List<WordCountModel> r = result.getResults();
			assertThat(r.size(), is(1));
			assertThat(r.get(0).getWordAsString(), is("word"));
			assertThat(r.get(0).getCount(), is(99));
		}

		{ // リストの要素が複数
			List<WordCountModel> list = new ArrayList<WordCountModel>();
			String word = "word1";
			list.add(createWordCount(word, 1));
			list.add(createWordCount(word, 2));
			list.add(createWordCount(word, 3));

			MockResult<WordCountModel> result = new MockResult<WordCountModel>();
			operator.sum(list, result);

			List<WordCountModel> r = result.getResults();
			assertThat(r.size(), is(1));
			assertThat(r.get(0).getWordAsString(), is(word));
			assertThat(r.get(0).getCount(), is(6));
		}
	}

	WordCountModel createWordCount(String word, int count) {
		WordCountModel m = new WordCountModel();
		m.setWordAsString(word);
		m.setCount(count);
		return m;
	}
}

ジョブ・バッチのテスト

AsakusaFWでは、ジョブフロー(とFlowPart)とバッチのテスト用クラスが用意されている。
それらを使ってJUnit4として実行できる。
(実行にはスモールジョブ実行エンジンが使われる)

参考: Asakusa Frameworkのアプリケーションテストスタートガイド#データフローのテスト


また、AsakusaFWの実行環境も必要なので、作っていない場合は作成する。
Shafuを使ってAsakusaFWの実行環境を作成する。

  1. 環境変数ASAKUSA_HOMEを定義していない場合は、一旦Eclipseを終了し、ASAKUSA_HOMEに実行環境をインストールする場所(ディレクトリーのパス)を定義する。
    Windowsの場合の例:「D:\data\asakusa」
  2. パッケージエクスプローラー上でプロジェクト(afw-wordcount)を選択し、右クリックしてコンテキストメニューを出す。
  3. 「Jinrikisha(人力車)」→「Asakusa開発環境の構築」→「Asakusa Frameworkのインストール」を実行する。[/2015-04-18]
    これで、環境変数ASAKUSA_HOMEで指定された場所にAsakusaFWの実行環境がインストールされる。

ジョブフローのテスト

参考: Asakusa Frameworkのアプリケーションテストスタートガイド#ジョブフローのテスト

パッケージエクスプローラー上でジョブフロークラスのソース(WordCountJob)を右クリックして、「新規(W)」→「JUnitテスト・ケース」でJUnit4のテスト用クラスを作成する。
その際、作成先のディレクトリーは「afw-wordcount/src/test/java」とする。

afw-wordcount/src/test/java/com/example/jobflow/WordCountJobTest.java

package com.example.jobflow;

import org.junit.Test;

import com.asakusafw.testdriver.JobFlowTester;

import com.example.modelgen.dmdl.model.LineModel;
import com.example.modelgen.dmdl.model.WordCountModel;
public class WordCountJobTest {

	@Test
	public void testDescribe() {
		String inDataSheet = "line_model.xls";
		String outDataSheet = "word_count_model.xls";

		JobFlowTester tester = new JobFlowTester(getClass());

		tester.input("in", LineModel.class).prepare(inDataSheet + "#input");
		tester.output("out", WordCountModel.class).verify(outDataSheet + "#output", outDataSheet + "#rule");

		tester.runTest(WordCountJob.class);
	}
}

driver.input()で入力データを指定する。
第1引数はジョブフローで指定した入力データの名前。つまりWordCountJobのコンストラクターの「@Import(name = "in"」。 この名前を使って入力データのファイル名を取得しているようだ。(一致しているものが無いと、入力ファイルを作れない)
そのinput()メソッドの後に続いているprepare()で、入力データのExcelファイル名(line_model.xls)を指定する。

driver.output()もdriver.input()と同様。
第1引数はジョブフローの出力データの名前「@Export(name = "out"」。検証時にこの名前を使ってデータを取得するようだ。(一致しているものが無いと、実行が終わった後のチェックでエラーになる)
verify()メソッドの第1引数が出力データのExcelファイル名、第2引数がチェック用ルールのExcelファイル名

「#input」「#output」「#rule」は、そのExcelファイル内のシート名を表している。
数値を使って 「#:1」や「#:2」の様に記述することもでき、その場合は「#:1」が左から2番目のoutputシート、「#:2」が3番目のruleシートを表す。
省略した場合は「#:0」扱いで、すなわち一番左にあるinputシートを指す。


ジョブフローのテストで使用するテストデータをExcelファイルで(またはJSON形式で)用意する必要がある。

参考: Asakusa FrameworkのExcelによるテストデータ定義

DMDLをコンパイルして(Javaソースの他に) デーモデル毎のExcelファイルが生成させることが出来る。

Shafuの「テストデータ・テンプレートを作成」を実行するとafw-wordcount/build/excelの下にExcelファイルが生成されるので、それをsrc/test/resources/com/example/jobflow/の下にコピーし、その中にデータを記述する。
(コピー先のディレクトリー(パッケージ)は、ジョブフローテストクラスのパッケージに合わせる)

afw-wordcount/src/test/resources/example/jobflow/line_model.xlsのinputシート:

  A B C
1 text    
2 Hello Hadoop World    
3 Hello Asakusa    
4      
5      

入力データはinputシートに書く。
1行目(色付きのセル)は自動で入っている。

afw-wordcount/src/test/resources/example/jobflow/word_count_model.xlsのoutputシート:

  A B C
1 word count  
2 Asakusa 1  
3 Hadoop 1  
4 Hello 2  
5 World 1  
6      

期待される出力データをoutputシートに書く。
1行目(色付きのセル)は自動で入っている。

afw-wordcount/src/test/resources/example/jobflow/word_count_model.xlsのruleシート:

  A B C D E F
1 Format EVR-2.0.0        
2 全体の比較 全てのデータを検査 [Strict]        
3 プロパティ 値の比較 NULLの比較 コメント オプション  
4 word 検査キー [Key] NULLなら常に失敗 [DA] 発生した単語    
5 count 完全一致 [=] NULLなら常に失敗 [DA] 発生回数    
6            

outputシートに書かれているデータと実際のデータをどう比較してどういう状態ならテストOK(あるいはNG)とするかをruleシートに書く。
どのセルも初期値は自動で入っている。
(「コメント」の“発生した単語”や“発生回数”は、DMDLに自分が記述した各項目のコメント)

今回のケースでは出力項目がwordとcountの二項目なので、それぞれどういう比較をするかを書く。(書くというか、実際はプルダウンになっているので、選択する)

このシートを見ただけで予想が付くようになっているのは素晴らしい。

Excelファイルを使用せず、独自のデータを入力データにする方法


バッチのテスト

バッチクラスのテスト方法は、ジョブフローのテスト方法とほとんど同じ。[2011-08-04]

参考: Asakusa Frameworkのアプリケーションテストスタートガイド#バッチのテスト

Excelデータについては、ジョブフローのテスト用と全く同じものでも問題ない。

Excelファイルのコピー元・コピー先
コピー元 afw-wordcount/build/excel/ line_model.xls
word_count_model.xls
ジョブフローのテスト用 afw-wordcount/src/test/resources/com/example/jobflow/ line_model.xls
word_count_model.xls
バッチのテスト用 afw-wordcount/src/test/resources/com/example/batch/ line_model.xls
word_count_model.xls

afw-wordcount/src/test/java/com/example/jobflow/WordCountBatchTest.java:

package com.example.batch;

import org.junit.Test;

import com.example.modelgen.dmdl.model.LineModel;
import com.example.modelgen.dmdl.model.WordCountModel;

import com.asakusafw.testdriver.BatchTester;
public class WordCountBatchTest {

	@Test
	public void testDescribe() {
		String inDataSheet = "line_model.xls";
		String outDataSheet = "word_count_model.xls";

		BatchTester tester = new BatchTester(getClass());

		tester.jobflow("WordCountJob").input("in", LineModel.class).prepare(inDataSheet + "#input");
		tester.jobflow("WordCountJob").output("out", WordCountModel.class).verify(outDataSheet + "#output", outDataSheet + "#rule");

		tester.runTest(WordCountBatch.class);
	}
}

tester.jobflow()には、ジョブフローのアノテーション「@JobFlow(name = "WordCountJob")」で付けた名前を指定する。
そこ以降のinput()・output()については、ジョブフローのテストと全く同じ書き方。



単純集計演算子(Summarize)による集計

単語の集計をするオペレーターを単純集計演算子Summarizeで作ってみる。
参考: Asakusa FrameworkのAsakusa DSLスタートガイド#単純集計演算子の実装例


まず、DMDLファイルに集計モデルを定義する。
集計モデルは、キー毎に集計(合算や件数カウント)を行った結果を表すデータモデル。

参考: Asakusa FrameworkのDMDLユーザーガイド#集計モデルを定義する

"WordCountの集計モデル"
@directio.csv
summarized word_count_total = word_count_model => {

  any word -> word;

  sum count -> count;
} % word;

summarizedを先頭に付け、集計モデル名(word_count_total)を指定する。
その後ろに集計元となるモデル名(word_count_model)を書く。
そこから「=>」で続けて、演算の種類を書く。sumで集計する(countで件数カウントも出来る模様)。集計キーとなる項目にはanyを付ける。
「->」で項目の別名(というか集計モデルでの項目名)を付けられる。同じ名前でいい場合でも省略できない。
「%」で集計キーとなる項目を指定する。(SQLのgroup byのような感じ)

※この記述上では型を指定していないので同じ型になると期待していたのだが、word_count_modelのcountはINTなのに、word_count_totalのcountはLONGになる。

データモデルを記述したら、DMDLのコンパイルを行ってJavaクラスを生成しておく。


afw-wordcount/src/main/java/com/example/operator/WordCountOperator.java

import com.asakusafw.vocabulary.operator.Summarize;

import com.example.modelgen.dmdl.model.WordCountTotal;
	@Summarize
	public abstract WordCountTotal sum2(WordCountModel wc);

集計モデルを定義する他に、Operatorクラスにも演算子メソッドを定義する必要がある。

Summarizeの場合、メソッド本体は記述しない(抽象メソッドにする)。
戻り型には集計結果のモデル(集計モデル)、引数には集計対象(集計元)のモデルを指定する。


また、このデータモデルで出力する為のExporterも作る必要がある。

public class WordCountTotalToFile extends AbstractWordCountTotalCsvOutputDescription {

	@Override
	public String getBasePath() {
		return "result/wordcount";
	}

	@Override
	public String getResourcePattern() {
		return "wc-*";
	}
}

ジョブフローを定義する。

afw-wordcount/src/main/java/com/example/jobflow/WordCountJob.java

import com.example.modelgen.dmdl.model.WordCountTotal;
	@Override
	protected void describe() {
		Source<WordCountModel> splitResult = operator.split(in).wordCountResult;

//		Source<WordCountModel> sumResult = operator.sum(splitResult).wordCountResult;
		Source<WordCountTotal> sumResult = operator.sum2(splitResult).out;

		out.add(sumResult);
	}

sumResultの型が変わった(Model→Total)ので、out.add()がコンパイルエラーになる。
WordCountJobのコンストラクターをWordCountTotalに変える必要がある。

	In<LineModel> in;
	Out<WordCountTotal> out;

	public WordCountJob(
			@Import(name = "in", description = LineFromFile.class) In<LineModel> in,
			@Export(name = "out", description = WordCountTotalToFile.class) Out<WordCountTotal> out) {
		this.in = in;
		this.out = out;
	}

テストの実行

Operatorクラスの単体テストとしては、Summarizeはメソッド内容が自動的に生成されるので、テストケースを作る必要は無い(という考え方のようだ)。

Operatorクラスのテストに使うOperatorImplを見ると、以下のようになっている。

	@Override public WordCountTotal sum2(WordCountModel wc) {
		throw new UnsupportedOperationException("単純集計演算子は組み込みの方法で処理されます");
	}

つまり、OperatorImplを使ってインスタンスを生成しても、このメソッドを呼び出してテストすることは出来ないわけだ。

ジョブフローのテストは出力対象クラスが変わったので修正しておく必要がある。

public class WordCountJobTest {

	@Test
	public void testDescribe() {
		String inDataSheet = "line_model.xls";
		String outDataSheet = "word_count_model.xls";

		JobFlowTester driver = new JobFlowTester(this.getClass());

		driver.input("in", LineModel.class).prepare(inDataSheet + "#input");
		driver.output("out", WordCountTotal.class).verify(outDataSheet + "#output", outDataSheet + "#rule");

		driver.runTest(WordCountJob.class);
	}
}

本来なら出力データを記述しているExcelファイルも作り直すべきなのかもしれないが、今回の例では項目名は全く変わっていないので、そのまま使える。


部分集約(partialAggregation)

単純集計演算子(Summarize)には部分集約(partialAggregation)というものがある。
「部分集約を行う場合、〜、ネットワークの転送量を削減しようとします」と説明されているので、MapReduceのCombinerのような中間集計のことなのだろう。

部分集約は、Summarizeアノテーションの引数として指定する。

afw-wordcount/src/main/java/com/example/operator/WordCountOperator.java

import com.asakusafw.vocabulary.flow.processor.PartialAggregation;
	@Summarize(partialAggregation=PartialAggregation.PARTIAL)
	public abstract WordCountTotal sum2(WordCountModel wc);

デフォルトは、PartialAggregation.TOTAL(部分集約(中間集計)を行わない)。


変換演算子(Convert)による変換

Summarizeを使う例としてジョブフローの出力の型(モデル)をWordCountTotalに変更したが、
最終的に出力する前にWordCountTotalからWordCountModelへ変換してやれば、フロー以降(WordCountJobのExportクラスとWordCountToFileとテスト用データのクラス指定)は修正しなくても済む。

モデル(クラス)を変換するには、Convert(変換演算子)を使用する。

afw-wordcount/src/main/java/com/example/operator/WordCountOperator.java

import com.asakusafw.vocabulary.operator.Convert;
	private WordCountModel wordCountModel = new WordCountModel();

	@Convert
	public WordCountModel wordCountTotal2Model(WordCountTotal t) {
		wordCountModel.setWord(t.getWord());
		wordCountModel.setCount((int) t.getCount());
		return wordCountModel;
	}

ただ単にフィールドを移し替えているだけ。
AsakusaFWはスレッドセーフ(Operatorのインスタンスがスレッド毎に作られるという意味)なので、移し先のインスタンスをフィールドに置いておいて使い回しても大丈夫。

afw-wordcount/src/main/java/com/example/jobflow/WordCountJob.java

	@Override
	protected void describe() {
		Source<WordCountModel> splitResult = operator.split(in).wordCountResult;

		Source<WordCountTotal> sumResult = operator.sum2(splitResult).out;
		Source<WordCountModel> convertResult = operator.wordCountTotal2Model(sumResult).out;

		out.add(convertResult);
	}

フローの出力の型はWordCountModelで行ける。

これでOK!と思ってWordCountJobTestを実行したら、こんなエラーが…。

java.lang.RuntimeException: java.io.IOException: 実行計画の作成に失敗しました ([WordCountOperator.wordCountTotal2Modelの出力originalは結線されていません ([WordCountOperator.wordCountTotal2Model(operator#15136722)])])
	at com.asakusafw.testdriver.JobFlowTester.runTest(JobFlowTester.java:95)
〜
Caused by: java.io.IOException: 実行計画の作成に失敗しました ([WordCountOperator.wordCountTotal2Modelの出力originalは結線されていません ([WordCountOperator.wordCountTotal2Model(operator#15136722)])])
	at com.asakusafw.compiler.flow.FlowCompiler.plan(FlowCompiler.java:172)
〜

「出力originalが結線していない」とな?
実は、Convertではoutに変換したデータが出力され、originalに元々の入力データがそのまま出力される。
つまり、originalという出力データをフロー上でどこにも渡していないからこのエラーになったようだ。


データをどこにも渡す必要が無い(データを捨てていい)場合、フロー上で停止演算子stopへデータを渡す。(逆に0件の入力が欲しい場合は空演算子emptyが使える)
stop(やempty)はCoreOperatorFactoryというクラスでAsakusaFWによって定義されているので、それを使う。

afw-wordcount/src/main/java/com/example/jobflow/WordCountJob.java

import com.example.operator.WordCountOperatorFactory.WordCountTotal2Model;

import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
	CoreOperatorFactory core = new CoreOperatorFactory();

	@Override
	protected void describe() {
		Source<WordCountModel> splitResult = operator.split(in).wordCountResult;
		Source<WordCountTotal> sumResult = operator.sum2(splitResult).out;

		WordCountTotal2Model convert = operator.wordCountTotal2Model(sumResult);
		out.add(convert.out);
		core.stop(convert.original);
	}

再構築演算子(restructure)による変換

先ほどは変換演算子ConvertによってWordCountTotalからWordCountModelへ変換したが、AsakusaFWでは、プロパティー(項目名と型)が同一な場合は再構築演算子restructureを使うのが推奨されている。

restructureもCoreOperatorFactoryで定義されているので、自分でOperatorを作る必要は無い。

afw-wordcount/src/main/java/com/example/jobflow/WordCountJob.java

import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
	CoreOperatorFactory core = new CoreOperatorFactory();

	@Override
	protected void describe() {
		Source<WordCountModel> splitResult = operator.split(in).wordCountResult;
		Source<WordCountTotal> sumResult = operator.sum2(splitResult).out;

		out.add(core.restructure(sumResult, WordCountModel.class));
	}

restructure()の第2引数には、変換先のクラスを指定する。


で、実行してみたら、エラーが出た。

ERROR c.a.c.flow.FlowCompilingEnvironment - restructureにおいて、DataModelClass(sample.modelgen.dmdl.model.WordCountTotal).countとDataModelClass(sample.modelgen.dmdl.model.WordCountModel).countのプロパティ型が一致しません

「WordCountTotalのcountとWordCountModelのcountのプロパティ型が一致しない」って。
WordCountModelのcountはint型なのだが、それの集計モデルであるWordCountTotalはlong型になってしまうんだよね。

集計モデルのDMDLで型を指定する方法は無い。
( 「sum count -> count : INT;」とか「sum count : INT -> count;」を試してみたが、駄目だった)

仕方が無いので、word_count_modelをINTからLONGに変えてやったら、動いた。

※併せてWordCountOperatorTestの「assertThat(wc.getCount(), is(1));」とかも「is(1L)」(long型)にしないとコンパイルエラーになる。


畳み込み演算子(Fold)による集計

AsakusaFWのOperator DSLには畳み込み演算子(Fold)という演算子がある。
これを使って集計処理をすることが出来る。

ScalafoldLeftという関数を勉強してなかったら「たたみこみ?fold?意味不明」と思ってスルーしてたところだ^^;
「畳み込み」とは、コレクション(Listとか)内の要素同士を計算して結果を出すもの。
例えばScalaで「List(1,10,100).foldLeft(0){ _ + _ }」と書くと、1・10・100が入っているListに対し、各要素に演算を行っていく。foldLeft直後の括弧内が初期値で、最初に初期値と先頭要素で演算を行う。すなわち「0+1」を行う。そしてその結果の1と次の10で「1+10」を行い、その結果の11と次の100で「11+100」を行い、最終的に111という値が返る。
ScalaではfoldLeftと似た関数にreduceLeftというものがあり、そちらは「List(1,10,100).reduceLeft{ _ + _ }」の様に書く。foldLeftと違って初期値が与えられていないので、最初に先頭2つの要素同士で計算する。後は同じで、最終的に111が返る。
AsakusaFWのFold演算子はScalaのreduceLeftに近い。

afw-wordcount/src/main/java/com/example/operator/WordCountOperator.java

import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.Fold;
	@Fold
	public void sum3(@Key(group = "word") WordCountModel left, WordCountModel right) {
		left.setCount(left.getCount() + right.getCount());
	}

Foldでは、leftとrightの2つの引数をとる。@Keyで集計キーを指定する。(キー毎に処理する為)
そして集計(演算)結果をleftに入れるようにする。

ちなみに、Foldも部分集約(中間集計)を行える。

	@Fold(partialAggregation=PartialAggregation.PARTIAL)
	public void sum3(@Key(group = "word") WordCountModel left, WordCountModel right) {
		left.setCount(left.getCount() + right.getCount());
	}

afw-wordcount/src/main/java/com/example/jobflow/WordCountJob.java

	@Override
	protected void describe() {
		Source<WordCountModel> splitResult = operator.split(in).wordCountResult;
		Source<WordCountModel> sumResult = operator.sum3(splitResult).out;
		out.add(sumResult);
	}

うん、オペレーターもジョブフローも、Foldが一番シンプルに書ける^^


単純集計演算子(Summarize)によるWordCount

前述のSummarizeを使う例では集計モデルで件数項目の合算を行う方式を採ったが、集計モデルではそもそもレコード数をカウントすることが出来る。

これを使うと、以下のようになる。

afw-wordcount/src/main/dmdl/models.dmdl:

"テキストファイルの一行"
@directio.csv
line_model = {

  "テキスト"
  text : TEXT;
};

"単語"
word_model = {

  "発生した単語"
  word : TEXT;
};

"単語数"
@directio.csv
summarized word_count_model = word_model => {

  "単語"
  any word -> word;

  "単語数"
  count word -> count;
} % word;

afw-wordcount/src/main/java/com/example/operator/WordCountOperator.java:

public abstract class WordCountOperator {

	/**
	 * 単語
	 */
	private final WordModel wordModel = new WordModel();

	/**
	 * 単語分割
	 * 
	 * @param line
	 * 	テキストファイルの一行
	 * @param out
	 * 	単語
	 */
	@Extract
	public void split(LineModel line, Result<WordModel> out) {
		String text = line.getTextAsString();

		StringTokenizer tokenizer = new StringTokenizer(text);
		while (tokenizer.hasMoreTokens()) {
			String word = tokenizer.nextToken();

			wordModel.setWordAsString(word);

			out.add(wordModel);
		}
	}
	@Summarize
	public abstract WordCountModel sum(WordModel in);
}

afw-wordcount/src/main/java/com/example/jobflow/WordCountJob.java:

/**
 * 単語数カウントジョブ
 */
@JobFlow(name = "WordCountJob")
public class WordCountJob extends FlowDescription {
	/** テキストファイルの一行 */
	private final In<LineModel> in;
	/** 単語数 */
	private final Out<WordCountModel> out;

	/**
	 * 単語数カウントジョブ
	 * 
	 * @param in
	 * 	テキストファイルの一行
	 * @param out
	 * 	単語数
	 */
	public WordCountJob3(
		@Import(name = "in", description = LineFromFile.class) In<LineModel> in,
		@Export(name = "out", description = WordCountToFile.class) Out<WordCountModel> out
	) {
		this.in = in;
		this.out = out;
	}
	WordCountOperatorFactory operator = new WordCountOperatorFactory();

	@Override
	public void describe() {
		Source<WordModel> word = operator.split(in).out;
		Source<WordCountModel> count = operator.sum(word).out;
		out.add(count);
	}
}

AsakusaFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま