Asakusa FrameworkのOperator DSLの抽出演算子(@Extract)のメモ。
|
抽出演算子は、入力データを元に複数種類データモデル・複数レコードの出力をする演算子。
性能特性はExtract(旧ドキュメントではMap)。[/2016-02-11]
入力 ポート数 |
入力データモデル の制約 |
イメージ | 出力 ポート数 |
出力データモデル の制約 |
入力1レコード に対する 出力レコード数 |
---|---|---|---|---|---|
1 |
![]() |
任意 | 任意。 (0〜∞レコード) |
抽出演算子は、複数種類のデータモデルを出力するのに使う。
入力データモデルを(1種類の)別のデータモデルに変換して出力したい場合は変換演算子(@Convert)を使う。
抽出演算子は性能特性がMapの演算子の中ではオールマイティー(何でも記述できる)の演算子なので、他の演算子の組み合わせで処理が出来るようなら、そちらにすることを検討すべき。
(極端に言えば、Map系演算子は抽出演算子だけあれば全て書けてしまう。しかし抽出演算子だけでフローを構成すると最適化されない)
hogeというデータモデルからデータモデルaとデータモデルbを出力する例。
なおかつ、hogeの1レコードにつきbは2レコード出力する。
(この図はToad Editorを用いて作っています)
入力データ例 | 出力データ例 | |||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
in |
|
→ | outA |
|
||||||||||||||||
→ | outB |
|
hoge = { value_a : INT; value_b0 : LONG; value_b1 : LONG; }; a = { value : INT; }; b = { value : LONG; };
import com.asakusafw.runtime.core.Result; import com.asakusafw.vocabulary.operator.Extract; import com.example.modelgen.dmdl.model.A; import com.example.modelgen.dmdl.model.B; import com.example.modelgen.dmdl.model.Hoge;
public abstract class ExampleOperator { private final A a = new A(); private final B b = new B(); /** * Hogeに含まれるAとBを抽出して出力する * * @param hoge * 抽出対象のデータモデル * @param outA * aの抽出結果 * @param outB * bの抽出結果 */ @Extract public void extractFields(Hoge hoge, Result<A> outA, Result<B> outB) { // aへの出力 a.reset(); // 初期化 a.setValue(hoge.getValueA()); outA.add(a); // bへの出力 b.reset(); // 初期化 b.setValue(hoge.getValueB0()); outB.add(b); b.reset(); // 初期化 b.setValue(hoge.getValueB1()); outB.add(b); } }
第1引数に抽出対象のデータモデルを指定する。
第2引数以降に出力ポートを示すResult<データモデル>を指定する。
戻り型は無し。
Operatorクラスに定義したメソッドはスレッドセーフになる(別スレッドから同時に呼ばれることは無い)ので、フィールドにオブジェクトを定義して使い回してもよい。
(ただし、以前のレコードの値の保持には使えない。実行時には各マシンに分散して処理されるので、レコードの入力順も どのレコードが来るのかも保証されない為)
なお、Resultにaddしたオブジェクトは次に使うときは内容が破壊されている(可能性がある)ので、毎回初期化する必要がある。[2014-12-21]
import com.example.modelgen.dmdl.model.A; import com.example.modelgen.dmdl.model.B; import com.example.modelgen.dmdl.model.Hoge; import com.example.operator.ExampleOperatorFactory; import com.example.operator.ExampleOperatorFactory.ExtractFields;
private final In<Hoge> in; private final Out<A> outA; private final Out<B> outB;
@Override public void describe() { ExampleOperatorFactory operators = new ExampleOperatorFactory(); // AとBを抽出する ExtractFields ab = extractOperator.extractFields(this.in); this.outA.add(ab.outA); this.outB.add(ab.outB); }
Flow DSLでは、自分が作ったOperatorのFactoryクラス(AsakusaFWのコンパイラーによって生成される)を使用する。
メソッド名はOperatorクラスに書いたメソッド名と同じ。
戻り値の型はAsakusaFWのコンパイラーによって生成されたクラス。(メソッド名を先頭が大文字のキャメルケースに変換したもの)
出力ポートの名前は、Operatorクラスのメソッドに出力ポート(Result<データモデル>)として指定した変数名が使われる。
抽出演算子の単体テストの実装例。
package com.example.operator;
import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import org.junit.Test; import com.asakusafw.runtime.testing.MockResult; import com.example.modelgen.dmdl.model.A; import com.example.modelgen.dmdl.model.B; import com.example.modelgen.dmdl.model.Hoge;
/** * {@link ExampleOperator}のテスト. */ public class ExampleOperatorTest { @Test public void extractFields() { ExampleOperator operator = new ExampleOperatorImpl(); Hoge hoge = new Hoge(); hoge.setValueA(123); hoge.setValueB0(456); hoge.setValueB1(789); MockResult<A> outA = new AMockResult(); MockResult<B> outB = new BMockResult(); operator.extractFields(hoge, outA, outB); { List<A> result = outA.getResults(); assertThat(result.size(), is(1)); A a = result.get(0); assertThat(a.getValue(), is(123)); } { List<B> result = outB.getResults(); assertThat(result.size(), is(2)); B b0 = result.get(0); assertThat(b0.getValue(), is(456L)); B b1 = result.get(1); assertThat(b1.getValue(), is(789L)); } } }
Operatorのテストクラスは、通常のJavaのJUnitのテストケースクラスとして作成する。
テスト対象のOperatorクラス自身は抽象クラスだが、Operatorクラス名の末尾に「Impl」の付いた具象クラスがAsakusaFWによって生成されるので、それを使う。
抽出演算子では引数にResultが使われているので、その引数にはMockResultを渡す。
OperatorメソッドでResultに追加したデータモデルは、MockResult内のリストに追加されていく。MockResult#getResults()でそのリストを取得できる。
ただし、Operatorクラス側でデータモデルのインスタンスを使い回している場合は、MockResultのリストにそのインスタンスを追加するだけだと、後から値を確認したくても最後の値しか取れない。
そこで、MockResult#bless()をオーバーライドし、インスタンスのコピーを返すようにする。こうすると、MockResultのリストにはコピーしたインスタンスが保持される。
public class AMockResult extends MockResult<A> { @Override protected A bless(A result) { A copy = new A(); copy.copyFrom(result); return copy; } }
public class BMockResult extends MockResult<B> { @Override protected B bless(B result) { B copy = new B(); copy.copyFrom(result); return copy; } }
抽出演算子では、抽出対象と出力のデータモデルの他にも引数を指定することが出来る。
こうすると、引数の内容分だけ異なる同じ演算子(処理内容)をいくつかの場所で使用できるようになる。
先の例に値引数を追加する例。
public abstract class ExampleOperator { private final A a = new A(); private final B b = new B(); /** * Hogeに含まれるAとBを抽出して出力する * * @param hoge * 抽出対象のデータモデル * @param outA * aの抽出結果 * @param outB * bの抽出結果 * @param valueB * bに加算する値 */ @Extract public void extractFields(Hoge hoge, Result<A> outA, Result<B> outB, int valueB) { // aへの出力 a.setValue(hoge.getValueA()); outA.add(a); // bへの出力 b.setValue(hoge.getValueB0() + valueB); outB.add(b); b.setValue(hoge.getValueB1() + valueB); outB.add(b); } }
出力ポートを示す引数以降を通常のJavaメソッドと同様に自由な引数とすることが出来る。(ただしプリミティブのみ)
@Override
public void describe() {
ExampleOperatorFactory operators = new ExampleOperatorFactory();
// AとBを抽出する
ExtractFields ab = extractOperator.extractFields(this.in, 100);
this.outA.add(ab.outA);
this.outB.add(ab.outB);
}
演算子を使用するFlow DSLで、引数に具体的な値を指定する。
@Test public void extractFields() { ExampleOperator operator = new ExampleOperatorImpl(); Hoge hoge = new Hoge(); hoge.setValueA(123); hoge.setValueB0(456); hoge.setValueB1(789); int valueB = 100; AMockResult outA = new AMockResult(); BMockResult outB = new BMockResult(); operator.extractFields(hoge, outA, outB, valueB); { List<A> result = outA.getResults(); assertThat(result.size(), is(1)); A a = result.get(0); assertThat(a.getValue(), is(123)); } { List<B> result = outB.getResults(); assertThat(result.size(), is(2)); B b0 = result.get(0); assertThat(b0.getValue(), is(456L + valueB)); B b1 = result.get(1); assertThat(b1.getValue(), is(789L + valueB)); } }
抽出演算子を使って、いわゆる横持ちデータを縦持ちに変換する例。[2013-11-30]
(→縦持ちを横持ちに変換する例)
入力データ例 | 出力データ例 | |||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
in |
|
→ | out |
|
hoge_v = { key : TEXT; value : INT; }; hoge_h = { key : TEXT; value0 : INT; value1 : INT; value2 : INT; };
import java.util.List; import com.asakusafw.runtime.core.Result; import com.asakusafw.vocabulary.model.Key; import com.asakusafw.vocabulary.operator.GroupSort; import com.example.modelgen.dmdl.model.HogeH; import com.example.modelgen.dmdl.model.HogeV;
private HogeV hogeV = new HogeV(); /** * 横持ちデータを縦持ちに変換 * * @param in * 入力 * @param out * 出力 */ @Extract public void convert(HogeH in, Result<HogeV> out) { hogeV.setKey(in.getKey()); if (!in.getValue0Option().isNull()) { hogeV.setValue(in.getValue0()); out.add(hogeV); } if (!in.getValue1Option().isNull()) { hogeV.setValue(in.getValue1()); out.add(hogeV); } if (!in.getValue2Option().isNull()) { hogeV.setValue(in.getValue2()); out.add(hogeV); } }
値がセット
されていない項目はnullが入っている。そのままgetするとNullPointerExceptionが発生するので、使う前にnullチェックする必要がある。
末尾に「Option」の付いたgetterメソッドを使って「hogeH.getValue2Option().isNull()
」の様にしてnullかどうかをチェックすることが出来る。
import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import java.util.ArrayList; import java.util.List; import org.junit.Test; import com.asakusafw.runtime.testing.MockResult; import com.example.modelgen.dmdl.model.HogeH; import com.example.modelgen.dmdl.model.HogeV;
public class ExampleOperatorTest { @Test public void convert3() { HogeH in = new HogeH(); in.setKeyAsString("aaa"); in.setValue0(123); in.setValue1(456); in.setValue2(789); List<HogeV> expected = new ArrayList<HogeV>(); expected.add(create("aaa", 123)); expected.add(create("aaa", 456)); expected.add(create("aaa", 789)); test(in, expected); }
@Test public void convert2() { HogeH in = new HogeH(); in.setKeyAsString("bbb"); in.setValue0(1); in.setValue1(2); List<HogeV> expected = new ArrayList<HogeV>(); expected.add(create("bbb", 1)); expected.add(create("bbb", 2)); test(in, expected); }
private HogeV create(String key, int value) { HogeV hogeV = new HogeV(); hogeV.setKeyAsString(key); hogeV.setValue(value); return hogeV; }
private void test(HogeH in, List<HogeV> expected) { ExampleOperator operator = new ExampleOperatorImpl(); MockResult<HogeV> out = new MockResult<HogeV>() { @Override protected HogeV bless(HogeV result) { HogeV copy = new HogeV(); copy.copyFrom(result); return copy; } }; operator.convert(in, out); assertThat(out.getResults(), is(expected)); } }
抽出演算子は、ScalaのflatMapメソッドに似ている部分がある。
(入力1レコードに対して出力レコード数が不定な部分が似ている)
case class Hoge(valueB0: Long, valueB1: Long) case class B(value: Long) def extractB(hoge: Hoge) = List(B(hoge.valueB0), B(hoge.valueB1)) val in : List[Hoge] = 〜 val out: List[B] = in.flatMap(extractB)