S-JIS[2013-11-09/2016-02-11] 変更履歴

Asakusa Framework 抽出演算子

Asakusa FrameworkOperator DSLの抽出演算子(@Extract)のメモ。


概要

抽出演算子は、入力データを元に複数種類データモデル・複数レコードの出力をする演算子。
性能特性はExtract(旧ドキュメントではMap)。[/2016-02-11]

入力
ポート数
入力データモデル
の制約
イメージ 出力
ポート数
出力データモデル
の制約
入力1レコード
に対する
出力レコード数
1   任意   任意。
(0〜∞レコード)

抽出演算子は、複数種類のデータモデルを出力するのに使う。

入力データモデルを(1種類の)別のデータモデルに変換して出力したい場合は変換演算子(@Convert)を使う。

抽出演算子は性能特性がMapの演算子の中ではオールマイティー(何でも記述できる)の演算子なので、他の演算子の組み合わせで処理が出来るようなら、そちらにすることを検討すべき。
(極端に言えば、Map系演算子は抽出演算子だけあれば全て書けてしまう。しかし抽出演算子だけでフローを構成すると最適化されない)


hogeというデータモデルからデータモデルaとデータモデルbを出力する例。
なおかつ、hogeの1レコードにつきbは2レコード出力する。
(この図はToad Editorを用いて作っています)

入力データ例   出力データ例
in
hoge
value_a value_b0 value_b1
100 1000 0
1200 12100 123
310 3333 456
outA
a
value
100
1200
310
outB
b
value
1000
0
12100
123
3333
456

example.dmdl(DMDL):

hoge = {

    value_a : INT;

    value_b0 : LONG;

    value_b1 : LONG;
};

a = {

    value : INT;
};

b = {

    value : LONG;
};

ExampleOperator.java(Operator DSL):

import com.asakusafw.runtime.core.Result;
import com.asakusafw.vocabulary.operator.Extract;

import com.example.modelgen.dmdl.model.A;
import com.example.modelgen.dmdl.model.B;
import com.example.modelgen.dmdl.model.Hoge;
public abstract class ExampleOperator {

	private final A a = new A();
	private final B b = new B();

	/**
	 * Hogeに含まれるAとBを抽出して出力する
	 * 
	 * @param hoge
	 *         抽出対象のデータモデル
	 * @param outA
	 *         aの抽出結果
	 * @param outB
	 *         bの抽出結果
	 */
	@Extract
	public void extractFields(Hoge hoge, Result<A> outA, Result<B> outB) {
		// aへの出力
		a.reset(); // 初期化
		a.setValue(hoge.getValueA());
		outA.add(a);

		// bへの出力
		b.reset(); // 初期化
		b.setValue(hoge.getValueB0());
		outB.add(b);

		b.reset(); // 初期化
		b.setValue(hoge.getValueB1());
		outB.add(b);
	}
}

第1引数に抽出対象のデータモデルを指定する。
第2引数以降に出力ポートを示すResult<データモデル>を指定する。
戻り型は無し。

Operatorクラスに定義したメソッドはスレッドセーフになる(別スレッドから同時に呼ばれることは無い)ので、フィールドにオブジェクトを定義して使い回してもよい。
(ただし、以前のレコードの値の保持には使えない。実行時には各マシンに分散して処理されるので、レコードの入力順も どのレコードが来るのかも保証されない為)

なお、Resultにaddしたオブジェクトは次に使うときは内容が破壊されている(可能性がある)ので、毎回初期化する必要がある。[2014-12-21]

ExampleJob.java(Flow DSL):

import com.example.modelgen.dmdl.model.A;
import com.example.modelgen.dmdl.model.B;
import com.example.modelgen.dmdl.model.Hoge;

import com.example.operator.ExampleOperatorFactory;
import com.example.operator.ExampleOperatorFactory.ExtractFields;
	private final In<Hoge> in;

	private final Out<A> outA;
	private final Out<B> outB;
	@Override
	public void describe() {
		ExampleOperatorFactory operators = new ExampleOperatorFactory();

		// AとBを抽出する
		ExtractFields ab = extractOperator.extractFields(this.in);

		this.outA.add(ab.outA);
		this.outB.add(ab.outB);
	}

Flow DSLでは、自分が作ったOperatorのFactoryクラス(AsakusaFWのコンパイラーによって生成される)を使用する。
メソッド名はOperatorクラスに書いたメソッド名と同じ。
戻り値の型はAsakusaFWのコンパイラーによって生成されたクラス。(メソッド名を先頭が大文字のキャメルケースに変換したもの)

出力ポートの名前は、Operatorクラスのメソッドに出力ポート(Result<データモデル>)として指定した変数名が使われる。


単体テスト

抽出演算子の単体テストの実装例。

ExampleOperatorTest.java:

package com.example.operator;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

import org.junit.Test;

import com.asakusafw.runtime.testing.MockResult;

import com.example.modelgen.dmdl.model.A;
import com.example.modelgen.dmdl.model.B;
import com.example.modelgen.dmdl.model.Hoge;
/**
 * {@link ExampleOperator}のテスト.
 */
public class ExampleOperatorTest {

	@Test
	public void extractFields() {
		ExampleOperator operator = new ExampleOperatorImpl();

		Hoge hoge = new Hoge();
		hoge.setValueA(123);
		hoge.setValueB0(456);
		hoge.setValueB1(789);

		MockResult<A> outA = new AMockResult();
		MockResult<B> outB = new BMockResult();

		operator.extractFields(hoge, outA, outB);

		{
			List<A> result = outA.getResults();
			assertThat(result.size(), is(1));

			A a = result.get(0);
			assertThat(a.getValue(), is(123));
		}
		{
			List<B> result = outB.getResults();
			assertThat(result.size(), is(2));

			B b0 = result.get(0);
			assertThat(b0.getValue(), is(456L));

			B b1 = result.get(1);
			assertThat(b1.getValue(), is(789L));
		}
	}
}

Operatorのテストクラスは、通常のJavaのJUnitのテストケースクラスとして作成する。

テスト対象のOperatorクラス自身は抽象クラスだが、Operatorクラス名の末尾に「Impl」の付いた具象クラスがAsakusaFWによって生成されるので、それを使う。

抽出演算子では引数にResultが使われているので、その引数にはMockResultを渡す。
OperatorメソッドでResultに追加したデータモデルは、MockResult内のリストに追加されていく。MockResult#getResults()でそのリストを取得できる。
ただし、Operatorクラス側でデータモデルのインスタンスを使い回している場合は、MockResultのリストにそのインスタンスを追加するだけだと、後から値を確認したくても最後の値しか取れない。
そこで、MockResult#bless()をオーバーライドし、インスタンスのコピーを返すようにする。こうすると、MockResultのリストにはコピーしたインスタンスが保持される。

public class AMockResult extends MockResult<A> {

	@Override
	protected A bless(A result) {
		A copy = new A();
		copy.copyFrom(result);
		return copy;
	}
}
public class BMockResult extends MockResult<B> {

	@Override
	protected B bless(B result) {
		B copy = new B();
		copy.copyFrom(result);
		return copy;
	}
}

値引数の例

抽出演算子では、抽出対象と出力のデータモデルの他にも引数を指定することが出来る。
こうすると、引数の内容分だけ異なる同じ演算子(処理内容)をいくつかの場所で使用できるようになる。

先の例に値引数を追加する例。

ExampleOperator.java(Operator DSL):

public abstract class ExampleOperator {

	private final A a = new A();
	private final B b = new B();

	/**
	 * Hogeに含まれるAとBを抽出して出力する
	 * 
	 * @param hoge
	 *         抽出対象のデータモデル
	 * @param outA
	 *         aの抽出結果
	 * @param outB
	 *         bの抽出結果
	 * @param valueB
	 *         bに加算する値
	 */
	@Extract
	public void extractFields(Hoge hoge, Result<A> outA, Result<B> outB, int valueB) {
		// aへの出力
		a.setValue(hoge.getValueA());
		outA.add(a);

		// bへの出力
		b.setValue(hoge.getValueB0() + valueB);
		outB.add(b);

		b.setValue(hoge.getValueB1() + valueB);
		outB.add(b);
	}
}

出力ポートを示す引数以降を通常のJavaメソッドと同様に自由な引数とすることが出来る。(ただしプリミティブのみ)

ExampleJob.java(Flow DSL):

	@Override
	public void describe() {
		ExampleOperatorFactory operators = new ExampleOperatorFactory();

		// AとBを抽出する
		ExtractFields ab = extractOperator.extractFields(this.in, 100);

		this.outA.add(ab.outA);
		this.outB.add(ab.outB);
	}

演算子を使用するFlow DSLで、引数に具体的な値を指定する。


ExampleOperatorTest.java:

	@Test
	public void extractFields() {
		ExampleOperator operator = new ExampleOperatorImpl();

		Hoge hoge = new Hoge();
		hoge.setValueA(123);
		hoge.setValueB0(456);
		hoge.setValueB1(789);
		int valueB = 100;

		AMockResult outA = new AMockResult();
		BMockResult outB = new BMockResult();

		operator.extractFields(hoge, outA, outB, valueB);

		{
			List<A> result = outA.getResults();
			assertThat(result.size(), is(1));

			A a = result.get(0);
			assertThat(a.getValue(), is(123));
		}
		{
			List<B> result = outB.getResults();
			assertThat(result.size(), is(2));

			B b0 = result.get(0);
			assertThat(b0.getValue(), is(456L + valueB));

			B b1 = result.get(1);
			assertThat(b1.getValue(), is(789L + valueB));
		}
	}

横持ちを縦持ちに変換する例

抽出演算子を使って、いわゆる横持ちデータを縦持ちに変換する例。[2013-11-30]
(→縦持ちを横持ちに変換する例

入力データ例   出力データ例
in
hoge_h
key value0 value1 value2
aaa 123 456 789
bbb 1 2  
out
hoge_v
key value
aaa 123
aaa 456
aaa 789
bbb 1
bbb 2

example.dmdl(DMDL):

hoge_v = {

    key : TEXT;

    value : INT;
};

hoge_h = {

    key : TEXT;

    value0 : INT;

    value1 : INT;

    value2 : INT;
};

ExampleOperator.java(Operator DSL):

import java.util.List;

import com.asakusafw.runtime.core.Result;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.GroupSort;

import com.example.modelgen.dmdl.model.HogeH;
import com.example.modelgen.dmdl.model.HogeV;
	private HogeV hogeV = new HogeV();

	/**
	 * 横持ちデータを縦持ちに変換
	 * 
	 * @param in
	 *         入力
	 * @param out
	 *         出力
	 */
	@Extract
	public void convert(HogeH in, Result<HogeV> out) {
		hogeV.setKey(in.getKey());

		if (!in.getValue0Option().isNull()) {
			hogeV.setValue(in.getValue0());
			out.add(hogeV);
		}
		if (!in.getValue1Option().isNull()) {
			hogeV.setValue(in.getValue1());
			out.add(hogeV);
		}
		if (!in.getValue2Option().isNull()) {
			hogeV.setValue(in.getValue2());
			out.add(hogeV);
		}
	}

値がセット されていない項目はnullが入っている。そのままgetするとNullPointerExceptionが発生するので、使う前にnullチェックする必要がある。
末尾に「Option」の付いたgetterメソッドを使って「hogeH.getValue2Option().isNull()」の様にしてnullかどうかをチェックすることが出来る。


ExampleOperatorTest.java:

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

import java.util.ArrayList;
import java.util.List;

import org.junit.Test;

import com.asakusafw.runtime.testing.MockResult;

import com.example.modelgen.dmdl.model.HogeH;
import com.example.modelgen.dmdl.model.HogeV;
public class ExampleOperatorTest {

	@Test
	public void convert3() {
		HogeH in = new HogeH();
		in.setKeyAsString("aaa");
		in.setValue0(123);
		in.setValue1(456);
		in.setValue2(789);

		List<HogeV> expected = new ArrayList<HogeV>();
		expected.add(create("aaa", 123));
		expected.add(create("aaa", 456));
		expected.add(create("aaa", 789));

		test(in, expected);
	}
	@Test
	public void convert2() {
		HogeH in = new HogeH();
		in.setKeyAsString("bbb");
		in.setValue0(1);
		in.setValue1(2);

		List<HogeV> expected = new ArrayList<HogeV>();
		expected.add(create("bbb", 1));
		expected.add(create("bbb", 2));

		test(in, expected);
	}
	private HogeV create(String key, int value) {
		HogeV hogeV = new HogeV();
		hogeV.setKeyAsString(key);
		hogeV.setValue(value);
		return hogeV;
	}
	private void test(HogeH in, List<HogeV> expected) {
		ExampleOperator operator = new ExampleOperatorImpl();

		MockResult<HogeV> out = new MockResult<HogeV>() {
			@Override
			protected HogeV bless(HogeV result) {
				HogeV copy = new HogeV();
				copy.copyFrom(result);
				return copy;
			}
		};

		operator.convert(in, out);

		assertThat(out.getResults(), is(expected));
	}
}

類似

抽出演算子は、ScalaflatMapメソッドに似ている部分がある。
(入力1レコードに対して出力レコード数が不定な部分が似ている)

  case class Hoge(valueB0: Long, valueB1: Long)
  case class B(value: Long)

  def extractB(hoge: Hoge) = List(B(hoge.valueB0), B(hoge.valueB1))

  val in : List[Hoge] = 〜
  val out: List[B]    = in.flatMap(extractB)

Operator DSLへ戻る / AsakusaFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま