Asakusa FrameworkのOperator DSLの抽出演算子(@Extract)のメモ。
|
抽出演算子は、入力データを元に複数種類データモデル・複数レコードの出力をする演算子。
性能特性はExtract(旧ドキュメントではMap)。[/2016-02-11]
| 入力 ポート数 |
入力データモデル の制約 |
イメージ | 出力 ポート数 |
出力データモデル の制約 |
入力1レコード に対する 出力レコード数 |
|---|---|---|---|---|---|
| 1 |
![]() |
任意 | 任意。 (0〜∞レコード) |
抽出演算子は、複数種類のデータモデルを出力するのに使う。
入力データモデルを(1種類の)別のデータモデルに変換して出力したい場合は変換演算子(@Convert)を使う。
抽出演算子は性能特性がMapの演算子の中ではオールマイティー(何でも記述できる)の演算子なので、他の演算子の組み合わせで処理が出来るようなら、そちらにすることを検討すべき。
(極端に言えば、Map系演算子は抽出演算子だけあれば全て書けてしまう。しかし抽出演算子だけでフローを構成すると最適化されない)
hogeというデータモデルからデータモデルaとデータモデルbを出力する例。
なおかつ、hogeの1レコードにつきbは2レコード出力する。
(この図はToad Editorを用いて作っています)

| 入力データ例 | 出力データ例 | |||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| in |
|
→ | outA |
|
||||||||||||||||
| → | outB |
|
||||||||||||||||||
hoge = {
value_a : INT;
value_b0 : LONG;
value_b1 : LONG;
};
a = {
value : INT;
};
b = {
value : LONG;
};
import com.asakusafw.runtime.core.Result; import com.asakusafw.vocabulary.operator.Extract; import com.example.modelgen.dmdl.model.A; import com.example.modelgen.dmdl.model.B; import com.example.modelgen.dmdl.model.Hoge;
public abstract class ExampleOperator {
private final A a = new A();
private final B b = new B();
/**
* Hogeに含まれるAとBを抽出して出力する
*
* @param hoge
* 抽出対象のデータモデル
* @param outA
* aの抽出結果
* @param outB
* bの抽出結果
*/
@Extract
public void extractFields(Hoge hoge, Result<A> outA, Result<B> outB) {
// aへの出力
a.reset(); // 初期化
a.setValue(hoge.getValueA());
outA.add(a);
// bへの出力
b.reset(); // 初期化
b.setValue(hoge.getValueB0());
outB.add(b);
b.reset(); // 初期化
b.setValue(hoge.getValueB1());
outB.add(b);
}
}
第1引数に抽出対象のデータモデルを指定する。
第2引数以降に出力ポートを示すResult<データモデル>を指定する。
戻り型は無し。
Operatorクラスに定義したメソッドはスレッドセーフになる(別スレッドから同時に呼ばれることは無い)ので、フィールドにオブジェクトを定義して使い回してもよい。
(ただし、以前のレコードの値の保持には使えない。実行時には各マシンに分散して処理されるので、レコードの入力順も どのレコードが来るのかも保証されない為)
なお、Resultにaddしたオブジェクトは次に使うときは内容が破壊されている(可能性がある)ので、毎回初期化する必要がある。[2014-12-21]
import com.example.modelgen.dmdl.model.A; import com.example.modelgen.dmdl.model.B; import com.example.modelgen.dmdl.model.Hoge; import com.example.operator.ExampleOperatorFactory; import com.example.operator.ExampleOperatorFactory.ExtractFields;
private final In<Hoge> in; private final Out<A> outA; private final Out<B> outB;
@Override
public void describe() {
ExampleOperatorFactory operators = new ExampleOperatorFactory();
// AとBを抽出する
ExtractFields ab = extractOperator.extractFields(this.in);
this.outA.add(ab.outA);
this.outB.add(ab.outB);
}
Flow DSLでは、自分が作ったOperatorのFactoryクラス(AsakusaFWのコンパイラーによって生成される)を使用する。
メソッド名はOperatorクラスに書いたメソッド名と同じ。
戻り値の型はAsakusaFWのコンパイラーによって生成されたクラス。(メソッド名を先頭が大文字のキャメルケースに変換したもの)
出力ポートの名前は、Operatorクラスのメソッドに出力ポート(Result<データモデル>)として指定した変数名が使われる。
抽出演算子の単体テストの実装例。
package com.example.operator;
import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import org.junit.Test; import com.asakusafw.runtime.testing.MockResult; import com.example.modelgen.dmdl.model.A; import com.example.modelgen.dmdl.model.B; import com.example.modelgen.dmdl.model.Hoge;
/**
* {@link ExampleOperator}のテスト.
*/
public class ExampleOperatorTest {
@Test
public void extractFields() {
ExampleOperator operator = new ExampleOperatorImpl();
Hoge hoge = new Hoge();
hoge.setValueA(123);
hoge.setValueB0(456);
hoge.setValueB1(789);
MockResult<A> outA = new AMockResult();
MockResult<B> outB = new BMockResult();
operator.extractFields(hoge, outA, outB);
{
List<A> result = outA.getResults();
assertThat(result.size(), is(1));
A a = result.get(0);
assertThat(a.getValue(), is(123));
}
{
List<B> result = outB.getResults();
assertThat(result.size(), is(2));
B b0 = result.get(0);
assertThat(b0.getValue(), is(456L));
B b1 = result.get(1);
assertThat(b1.getValue(), is(789L));
}
}
}
Operatorのテストクラスは、通常のJavaのJUnitのテストケースクラスとして作成する。
テスト対象のOperatorクラス自身は抽象クラスだが、Operatorクラス名の末尾に「Impl」の付いた具象クラスがAsakusaFWによって生成されるので、それを使う。
抽出演算子では引数にResultが使われているので、その引数にはMockResultを渡す。
OperatorメソッドでResultに追加したデータモデルは、MockResult内のリストに追加されていく。MockResult#getResults()でそのリストを取得できる。
ただし、Operatorクラス側でデータモデルのインスタンスを使い回している場合は、MockResultのリストにそのインスタンスを追加するだけだと、後から値を確認したくても最後の値しか取れない。
そこで、MockResult#bless()をオーバーライドし、インスタンスのコピーを返すようにする。こうすると、MockResultのリストにはコピーしたインスタンスが保持される。
public class AMockResult extends MockResult<A> {
@Override
protected A bless(A result) {
A copy = new A();
copy.copyFrom(result);
return copy;
}
}
public class BMockResult extends MockResult<B> {
@Override
protected B bless(B result) {
B copy = new B();
copy.copyFrom(result);
return copy;
}
}
抽出演算子では、抽出対象と出力のデータモデルの他にも引数を指定することが出来る。
こうすると、引数の内容分だけ異なる同じ演算子(処理内容)をいくつかの場所で使用できるようになる。
先の例に値引数を追加する例。

public abstract class ExampleOperator {
private final A a = new A();
private final B b = new B();
/**
* Hogeに含まれるAとBを抽出して出力する
*
* @param hoge
* 抽出対象のデータモデル
* @param outA
* aの抽出結果
* @param outB
* bの抽出結果
* @param valueB
* bに加算する値
*/
@Extract
public void extractFields(Hoge hoge, Result<A> outA, Result<B> outB, int valueB) {
// aへの出力
a.setValue(hoge.getValueA());
outA.add(a);
// bへの出力
b.setValue(hoge.getValueB0() + valueB);
outB.add(b);
b.setValue(hoge.getValueB1() + valueB);
outB.add(b);
}
}
出力ポートを示す引数以降を通常のJavaメソッドと同様に自由な引数とすることが出来る。(ただしプリミティブのみ)
@Override
public void describe() {
ExampleOperatorFactory operators = new ExampleOperatorFactory();
// AとBを抽出する
ExtractFields ab = extractOperator.extractFields(this.in, 100);
this.outA.add(ab.outA);
this.outB.add(ab.outB);
}
演算子を使用するFlow DSLで、引数に具体的な値を指定する。
@Test
public void extractFields() {
ExampleOperator operator = new ExampleOperatorImpl();
Hoge hoge = new Hoge();
hoge.setValueA(123);
hoge.setValueB0(456);
hoge.setValueB1(789);
int valueB = 100;
AMockResult outA = new AMockResult();
BMockResult outB = new BMockResult();
operator.extractFields(hoge, outA, outB, valueB);
{
List<A> result = outA.getResults();
assertThat(result.size(), is(1));
A a = result.get(0);
assertThat(a.getValue(), is(123));
}
{
List<B> result = outB.getResults();
assertThat(result.size(), is(2));
B b0 = result.get(0);
assertThat(b0.getValue(), is(456L + valueB));
B b1 = result.get(1);
assertThat(b1.getValue(), is(789L + valueB));
}
}
抽出演算子を使って、いわゆる横持ちデータを縦持ちに変換する例。[2013-11-30]
(→縦持ちを横持ちに変換する例)
| 入力データ例 | 出力データ例 | |||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| in |
|
→ | out |
|
||||||||||||||||||||||||
hoge_v = {
key : TEXT;
value : INT;
};
hoge_h = {
key : TEXT;
value0 : INT;
value1 : INT;
value2 : INT;
};
import java.util.List; import com.asakusafw.runtime.core.Result; import com.asakusafw.vocabulary.model.Key; import com.asakusafw.vocabulary.operator.GroupSort; import com.example.modelgen.dmdl.model.HogeH; import com.example.modelgen.dmdl.model.HogeV;
private HogeV hogeV = new HogeV();
/**
* 横持ちデータを縦持ちに変換
*
* @param in
* 入力
* @param out
* 出力
*/
@Extract
public void convert(HogeH in, Result<HogeV> out) {
hogeV.setKey(in.getKey());
if (!in.getValue0Option().isNull()) {
hogeV.setValue(in.getValue0());
out.add(hogeV);
}
if (!in.getValue1Option().isNull()) {
hogeV.setValue(in.getValue1());
out.add(hogeV);
}
if (!in.getValue2Option().isNull()) {
hogeV.setValue(in.getValue2());
out.add(hogeV);
}
}
値がセット
されていない項目はnullが入っている。そのままgetするとNullPointerExceptionが発生するので、使う前にnullチェックする必要がある。
末尾に「Option」の付いたgetterメソッドを使って「hogeH.getValue2Option().isNull()」の様にしてnullかどうかをチェックすることが出来る。
import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import java.util.ArrayList; import java.util.List; import org.junit.Test; import com.asakusafw.runtime.testing.MockResult; import com.example.modelgen.dmdl.model.HogeH; import com.example.modelgen.dmdl.model.HogeV;
public class ExampleOperatorTest {
@Test
public void convert3() {
HogeH in = new HogeH();
in.setKeyAsString("aaa");
in.setValue0(123);
in.setValue1(456);
in.setValue2(789);
List<HogeV> expected = new ArrayList<HogeV>();
expected.add(create("aaa", 123));
expected.add(create("aaa", 456));
expected.add(create("aaa", 789));
test(in, expected);
}
@Test
public void convert2() {
HogeH in = new HogeH();
in.setKeyAsString("bbb");
in.setValue0(1);
in.setValue1(2);
List<HogeV> expected = new ArrayList<HogeV>();
expected.add(create("bbb", 1));
expected.add(create("bbb", 2));
test(in, expected);
}
private HogeV create(String key, int value) {
HogeV hogeV = new HogeV();
hogeV.setKeyAsString(key);
hogeV.setValue(value);
return hogeV;
}
private void test(HogeH in, List<HogeV> expected) {
ExampleOperator operator = new ExampleOperatorImpl();
MockResult<HogeV> out = new MockResult<HogeV>() {
@Override
protected HogeV bless(HogeV result) {
HogeV copy = new HogeV();
copy.copyFrom(result);
return copy;
}
};
operator.convert(in, out);
assertThat(out.getResults(), is(expected));
}
}
抽出演算子は、ScalaのflatMapメソッドに似ている部分がある。
(入力1レコードに対して出力レコード数が不定な部分が似ている)
case class Hoge(valueB0: Long, valueB1: Long) case class B(value: Long) def extractB(hoge: Hoge) = List(B(hoge.valueB0), B(hoge.valueB1)) val in : List[Hoge] = 〜 val out: List[B] = in.flatMap(extractB)