S-JIS[2013-11-18/2017-12-11] 変更履歴

Asakusa Framework グループ整列演算子

Asakusa FrameworkOperator DSLのグループ整列演算子(@GroupSort)のメモ。


概要

グループ整列演算子は、集計キーでグルーピング、ソートキーでソートし、キーが一致するデータ毎に処理して複数種類(複数データモデル)・複数レコードの出力を行う。
性能特性はCoGroup(旧ドキュメントではReduce)。[/2016-02-11]

入力
ポート数
入力データモデル
の制約
イメージ 出力
ポート数
出力データモデル
の制約
入力1レコード
に対する
出力レコード数
1   任意   任意。
(0〜∞レコード)

グループ整列演算子はグルーピングした複数レコードにまたがって処理をしたい場合に使う。
更新演算子(@Update)変換演算子(@Convert)は1レコード内のデータしか処理できない)
例えば、ソートした先頭n件を出力したいとか、1時間1レコードで24レコードにしたいが24時間の中にレコードが存在しない時間帯を見つけて空レコードを作るとか。
(24レコードだとAsakusaFW(Hadoop)で扱うには少なすぎて、Hadoop税が相対的に大きくなる(MapReduceジョブの起動時間の方が長くなる)可能性が高いが^^;)

複数レコードを集計したい場合は、単純集計演算子(@Summarize)畳み込み演算子(@Fold)を使う。


グループ整列演算子はグループ結合演算子(@CoGroup)の入力ポートが1つしかないバージョン。
したがって、乱用しないように、という注意点はグループ結合演算子(@CoGroup)と同じ。


hogeをIDでグループ化し、ageが一番小さいレコードと大きいレコードを1レコードずつ出力する例。
すなわち、ageでソートして、先頭と末尾のレコードを1つずつ出力する。
(この図はToad Editorを用いて作っています)

入力データ例   出力データ例
in
hoge
id age
aaa 17
aaa 18
aaa 19
bbb 18
bbb 18
bbb 21
ccc 30
first
hoge
id age
aaa 17
bbb 18
ccc 30
last
hoge
id age
aaa 19
bbb 21
ccc 30

example.dmdl(DMDL):

hoge = {

    id : TEXT;

    age : INT;
};

ExampleOperator.java(Operator DSL):

import java.util.List;

import com.asakusafw.runtime.core.Result;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.GroupSort;

import com.example.modelgen.dmdl.model.Hoge;
public abstract class ExampleOperator {

	/**
	 * 先頭と末尾のレコードを出力する
	 * 
	 * @param in    グループ毎のリスト
	 * @param first グループ内の先頭要素
	 * @param last  グループ内の末尾要素
	 */
	@GroupSort
	public void firstLast(
		@Key(group = "id", order = "age ASC") List<Hoge> in,
		Result<Hoge> first,
		Result<Hoge> last
	) {
		if (in.size() == 1) { // [2017-12-10]
			Hoge hoge = in.get(0);
			Hoge temp = new Hoge();
			temp.copyFrom(hoge);
			first.add(hoge);
			last.add(temp);
		} else {
			first.add(in.get(0));
			last.add(in.get(in.size() - 1));
		}
	}
}

第1引数で、入力ポートを表すList<データモデル>を指定する。
この引数には@Keyアノテーションを付けて、集計キー(グループ化するキー)を指定する。
複数の項目を集計キーとする場合は「@Key(group = { "key1", "key2" })」のように波括弧でくくってカンマ区切りで指定する。
また、@Keyアノテーションのorderでソートキーを指定できる。
このListには必ず1件以上データが入ってくる。(たぶん自明なので、ドキュメントには明記されていない)[2017-02-11]

第2引数以降に、出力ポートを表すResult<データモデル>を指定する。

Resultにaddする場合、一度addしたオブジェクトは使用することは出来ない(内容が破壊される可能性がある)。[2017-12-10]
そのため、2回addしたい場合はオブジェクトをコピーして取っておく必要がある。

ExampleJob.java(Flow DSL):

import com.example.modelgen.dmdl.model.Foo;
import com.example.modelgen.dmdl.model.Hoge;

import com.example.operator.ExampleOperatorFactory;
import com.example.operator.ExampleOperatorFactory.FirstLast;
	private final In<Hoge> in;

	private final Out<Hoge> out1;
	private final Out<Hoge> out2;
	@Override
	public void describe() {
		ExampleOperatorFactory operators = new ExampleOperatorFactory();

		// 先頭と末尾のレコードを出力する
		FirstLast fl = operators.firstLast(this.in);

		this.out1.add(fl.first);
		this.out2.add(fl.last);
	}

Flow DSLでは、自分が作ったOperatorのFactoryクラス(AsakusaFWのコンパイラーによって生成される)を使用する。
メソッド名はOperatorクラスに書いたメソッド名と同じ。
戻り値の型はAsakusaFWのコンパイラーによって生成されたクラス。(メソッド名を先頭が大文字のキャメルケースに変換したもの)
メソッドの引数はOperatorクラスのメソッドに入力ポート(List<データモデル>)として指定したデータ。

出力ポートの名前は、Operatorクラスのメソッドに出力ポート(Result<データモデル>)として指定した変数名が使われる。


単体テスト

グループ整列演算子の単体テストの実装例。

ExampleOperatorTest.java:

package com.example.operator;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

import java.util.ArrayList;
import java.util.List;

import org.junit.Test;

import com.asakusafw.runtime.testing.MockResult;
import com.example.modelgen.dmdl.model.Hoge;
/**
 * {@link ExampleOperator}のテスト.
 */
public class ExampleOperatorTest {

	@Test
	public void firstLast() {
		ExampleOperator operator = new ExampleOperatorImpl();

		List<Hoge> in = new ArrayList<Hoge>();
		for (int i =0; i < 3; i++) {
			Hoge hoge = new Hoge();
			hoge.setAge(20 + i);
			in.add(hoge);
		}

		MockResult<Hoge> first = new HogeMockResult();
		MockResult<Hoge> last = new HogeMockResult();

		operator.firstLast(in, first, last);

		{
			assertThat(first.getResults().size(), is(1));
			assertThat(first.getResults().get(0).getAge(), is(20));
		}
		{
			assertThat(last.getResults().size(), is(1));
			assertThat(last.getResults().get(0).getAge(), is(22));
		}
	}
}

Operatorのテストクラスは、通常のJavaのJUnitのテストケースクラスとして作成する。

テスト対象のOperatorクラス自身は抽象クラスだが、Operatorクラス名の末尾に「Impl」の付いた具象クラスがAsakusaFWによって生成されるので、それを使う。

グループ結合演算子では引数にResultが使われているので、その引数にはMockResultを渡す。
OperatorメソッドでResultに追加したデータモデルは、MockResult内のリストに追加されていく。MockResult#getResults()でそのリストを取得できる。
ただし、Operatorクラス側でデータモデルのインスタンスを使い回している場合は、MockResultのリストにそのインスタンスを追加するだけだと、後から値を確認したくても最後の値しか取れない。
そこで、MockResult#bless()をオーバーライドし、インスタンスのコピーを返すようにする。こうすると、MockResultのリストにはコピーしたインスタンスが保持される。

public class HogeMockResult extends MockResult<Hoge> {

	@Override
	protected Hoge bless(Hoge result) {
		Hoge copy = new Hoge();
		copy.copyFrom(result);
		return copy;
	}
}

値引数の例

グループ整列演算子では、入出力のデータモデルの他にも引数を指定することが出来る。
こうすると、引数の内容分だけ異なる同じ演算子(処理内容)をいくつかの場所で使用できるようになる。

先の例に値引数を追加し、先頭n件・末尾n件を出力する例。

ExampleOperator.java(Operator DSL):

	@GroupSort
	public void firstLast(
		@Key(group = "id", order = "age ASC") List<Hoge> in,
		Result<Hoge> first,
		Result<Hoge> last,
		int size
	) {
		int end = Math.min(size, in.size());
		for (int i = 0; i < end; i++) {
			first.add(in.get(i));
		}

		int start = Math.max(0, in.size() - size);
		for (int i = start; i < in.size(); i++) {
			last.add(in.get(i));
		}
	}

出力ポートを示す引数以降を通常のJavaメソッドと同様に自由な引数とすることが出来る。(ただしプリミティブのみ)

ExampleJob.java(Flow DSL):

	@Override
	public void describe() {
		ExampleOperatorFactory operators = new ExampleOperatorFactory();

		// 先頭と末尾から10レコードを出力する
		FirstLast fl = operators.firstLast(this.in, 10);
〜
	}

演算子を使用するFlow DSLで、引数に具体的な値を指定する。


レコードを補填する例

グループ整列演算子を使って、抜けているレコードを補填する例。[2013-11-30]

key2が0〜9の10レコードになるように出力する。

入力データ例   出力データ例
in
hoge_fill
key1 key2 value
aaa 4 123
aaa 5 456
aaa 8 789
out
hoge_fill
key2 key2 value
aaa 0 0
aaa 1 0
aaa 2 0
aaa 3 0
aaa 4 123
aaa 5 456
aaa 6 0
aaa 7 0
aaa 8 789
aaa 9 0

example.dmdl(DMDL):

hoge_fill = {

    key1 : TEXT;

    key2 : INT;

    value : INT;
};

ExampleOperator.java(Operator DSL):

import java.util.List;

import com.asakusafw.runtime.core.Result;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.GroupSort;

import com.example.modelgen.dmdl.model.HogeFill;
	/** 穴埋め用レコード */
	private final HogeFill hogeFill = new HogeFill();
	/**
	 * レコード埋め
	 * 
	 * @param in  入力
	 * @param out 出力
	 */
	@GroupSort
	public void fill(@Key(group = "key1") List<HogeFill> in, Result<HogeFill> out) {
		boolean[] exists = new boolean[10];

		HogeFill first = null;

		for (HogeFill record : in) {
			exists[record.getKey2()] = true;

			// 存在するレコードの出力
			out.add(record);

			// 先頭レコードを保持
			if (first == null) {
				first = record;
			}
		}

		// 穴埋めレコードの出力
		for (int i = 0; i < exists.length; i++) {
			if (!exists[i]) {
				hogeFill.reset();
				hogeFill.setKey1Option(first.getKey1Option());
				hogeFill.setKey2(i);
				hogeFill.setValue(0);

				out.add(hogeFill);
			}
		}
	}

GroupSortのメソッドが呼ばれている以上、入力データのListは必ず1レコード以上存在する。
したがって、上記のfirstは必ず初期化される。

Result(上記のout)に出力(add)すると、そのデータモデル(上記のhogeFill)の内容は破壊される(可能性がある)ので、使うときに毎回初期化する必要がある。[2017-04-25]


ExampleOperatorTest.java:

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

import java.util.ArrayList;
import java.util.List;

import org.junit.Test;

import com.asakusafw.runtime.testing.MockResult;
import com.example.modelgen.dmdl.model.HogeFill;
public class ExampleOperatorTest {

	@Test
	public void fill() {
		ExampleOperator operator = new ExampleOperatorImpl();

		List<HogeFill> in = new ArrayList<HogeFill>(3);
		in.add(create(4, 123));
		in.add(create(5, 456));
		in.add(create(8, 789));

		MockResult<HogeFill> out = new MockResult<HogeFill>() {
			@Override
			protected HogeFill bless(HogeFill result) {
				HogeFill copy = new HogeFill();
				copy.copyFrom(result);
				return copy;
			}
		};

		operator.fill(in, out);

		List<HogeFill> expected = new ArrayList<HogeFill>(10);
		expected.add(create(4, 123));
		expected.add(create(5, 456));
		expected.add(create(8, 789));
		expected.add(create(0, 0));
		expected.add(create(1, 0));
		expected.add(create(2, 0));
		expected.add(create(3, 0));
		expected.add(create(6, 0));
		expected.add(create(7, 0));
		expected.add(create(9, 0));
		assertThat(out.getResults(), is(expected));
	}
	private HogeFill create(int key2, int value) {
		HogeFill record = new HogeFill();
		record.setKey1AsString("aaa");
		record.setKey2(key2);
		record.setValue(value);
		return record;
	}
}

縦持ちを横持ちに変換する例

グループ整列演算子を使って、いわゆる縦持ちデータを横持ちに変換する例。[2013-11-30]
(→横持ちを縦持ちに変換する例

入力データ例   出力データ例
in
hoge_v
key value
aaa 123
aaa 456
aaa 789
bbb 1
bbb 2
out
hoge_h
key value0 value1 value2
aaa 123 456 789
bbb 1 2  

example.dmdl(DMDL):

hoge_v = {

    key : TEXT;

    value : INT;
};

hoge_h = {

    key : TEXT;

    value0 : INT;

    value1 : INT;

    value2 : INT;
};

ExampleOperator.java(Operator DSL):

import java.util.List;

import com.asakusafw.runtime.core.Result;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.GroupSort;

import com.example.modelgen.dmdl.model.HogeH;
import com.example.modelgen.dmdl.model.HogeV;
	/**
	 * 縦持ちデータを横持ちに変換
	 * 
	 * @param in  入力
	 * @param out 出力
	 */
	@GroupSort
	public void convert(@Key(group = "key") List<HogeV> in, Result<HogeH> out) {
		HogeH hogeH = new HogeH();

		HogeV hogeV = in.get(0);
		hogeH.setKey(hogeV.getKey());
		hogeH.setValue0(hogeV.getValue());

		if (in.size() > 1) {
			hogeV = in.get(1);
			hogeH.setValue1(hogeV.getValue());
		}
		if (in.size() > 2) {
			hogeV = in.get(2);
			hogeH.setValue2(hogeV.getValue());
		}

		out.add(hogeH);
	}

値をセットしない場合は、nullがセットされたのと同じ扱いになる。
使う側は、末尾に「Option」の付いたgetterメソッドを使って「hogeH.getValue2Option().isNull()」の様にしてnullかどうかをチェックすることが出来る。


ExampleOperatorTest.java:

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

import java.util.ArrayList;
import java.util.List;

import org.junit.Test;

import com.asakusafw.runtime.testing.MockResult;

import com.example.modelgen.dmdl.model.HogeH;
import com.example.modelgen.dmdl.model.HogeV;
public class ExampleOperatorTest {

	@Test
	public void convert3() {
		List<HogeV> in = new ArrayList<HogeV>();
		in.add(create("aaa", 123));
		in.add(create("aaa", 456));
		in.add(create("aaa", 789));

		HogeH expected = new HogeH();
		expected.setKeyAsString("aaa");
		expected.setValue0(123);
		expected.setValue1(456);
		expected.setValue2(789);

		test(in, expected);
	}
	@Test
	public void convert2() {
		List<HogeV> in = new ArrayList<HogeV>();
		in.add(create("bbb", 1));
		in.add(create("bbb", 2));

		HogeH expected = new HogeH();
		expected.setKeyAsString("bbb");
		expected.setValue0(1);
		expected.setValue1(2);

		test(in, expected);
	}
	private HogeV create(String key, int value) {
		HogeV record = new HogeV();
		record.setKeyAsString(key);
		record.setValue(value);
		return record;
	}
	private void test(List<HogeV> in, HogeH expected) {
		GroupSortHorizontalOperator operator = new GroupSortHorizontalOperatorImpl();

		MockResult<HogeH> out = new MockResult<HogeH>();

		operator.convert(in, out);

		assertThat(out.getResults().size(), is(1));
		assertThat(out.getResults().get(0), is(expected));
	}
}

この例ではキー1つにつき1レコードしか出力されないので、MockResultの中で複製を作る(bless()をオーバーライドする)必要は無い。


巨大な入力データ

グループ整列演算子のOperatorクラスのメソッドで入力データを表すList<データモデル>は、
デフォルトでは少量のデータ、つまりメモリー内に乗り切るデータ量が想定されている。
その為、1つのグループに大量のデータが来るとOutOfMemoryErrorになってしまう。

1つのグループで大量のデータを扱いたい場合は、@GroupSortアノテーションでInputBuffer.ESCAPEを指定する。
(デフォルトではInputBuffer.EXPANDが指定されている)

ただし、ESCAPEを指定すると、入力のListの使い方に制限がかかる


グループ内の先頭と末尾のレコードの平均を出力する例。

ExampleOperator.java(Operator DSL):

import java.util.List;

import com.asakusafw.vocabulary.flow.processor.InputBuffer;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.GroupSort;
	@GroupSort(inputBuffer = InputBuffer.ESCAPE)
	public void averageFirstLast(
		@Key(group = "id", order = "age ASC") List<Hoge> in,
		Result<Hoge> out
	) {
		int firstAge = in.get(0).getAge();	//先頭の値を保持

		Hoge lastHoge = in.get(in.size() - 1);
		int lastAge = lastHoge.getAge();

		Hoge hoge = new Hoge();
		hoge.setId(lastHoge.getId());
		hoge.setAge((firstAge + lastAge) / 2);

		out.add(hoge);
	}

ソート

グループ整列演算子では、入力データをソートすることが出来る。
ソートキーは、集計キーとは別に指定できる。

ExampleOperator.java(Operator DSL):

	@GroupSort
	public void firstLast(
		@Key(group = "id", order = "age ASC") List<Hoge> in,
		〜
	) {
		〜
	}

@Keyアノテーションのorderにソートキーを指定する。

説明
order = "key"
ソート項目を1つ指定する例。
ソート順は昇順となる。
order = "key ASC"
order = "key DESC"
ASCを指定すると昇順になる。
DESCを指定すると降順になる。
order = { "key1 ASC", "key2 ASC" }
複数のソート項目を指定する例。

このソートは、あくまでもグループ整列演算子の入力データに対するソート。
グループ整列演算子から出力したデータは、必ずしも出力した順序になるとは限らない。
最終的な出力ファイルをソートしたい場合は、Direct I/OだとExporterクラスでソートキーを指定する。


Iterableによる入力

AsakusaFW 0.9.1で、入力データを(Listでなく)Iterableで受け取ることが出来るようになった。[2017-04-30]
(ただし、実行基盤がHadoop(MapReduce)のバッチでは使えないらしい?[2017-05-01]

import com.asakusafw.runtime.core.Result;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.GroupSort;

import com.example.modelgen.dmdl.model.Hoge;
public abstract class ExampleOperator {

	/**
	 * 最小年齢を取得する
	 * 
	 * @param ins グループ毎の一覧
	 * @param out グループ内の先頭要素
	 */
	@GroupSort
	public void getMinAge(
		@Key(group = "id", order = "age ASC") Iterable<Hoge> ins,
		Result<Hoge> out
	) {
		for (Hoge in : ins) {
			out.add(in);
			break;
		}
	}
}

メソッドの引数のListをIterableに変えるだけ。


@Once

AsakusaFW 0.9.1以降では、インスタンスを一回ずつしか使用しない場合は、@Onceを付けて明示できる。[2017-04-30]
(AsakusaFW 0.10.0で正式機能となった。[2017-12-11]

これは、先頭レコードしか使わないような場合、あるいはレコード毎に順次処理だけ行うような場合に有用。

通常のListの場合、同一キーの全データをメモリー上に展開する。
(そのため、データ量が多い場合はOutOfMemoryErrorが発生することがあり、その回避策としてInputBuffer.ESCAPEがあった)
@Onceにすると、データモデル1つのインスタンスが使い回されるようになる(インスタンス1つ分のメモリーしか使用しない)、と思う。(したがって、前のレコードのインスタンス(の参照)をそのまま保持して使うことは出来ない)
全データの(先頭の)一部しか使わないと分かっている場合は、この方がメモリー使用効率が良い。

import com.asakusafw.vocabulary.model.Once;
	@GroupSort
	public void onceExample(
		@Key(group = "id", order = "age ASC") @Once Iterable<Hoge> ins,
		Result<Hoge> out
	) {
		for (Hoge in : ins) {
			out.add(in);
		}
	}

※@OnceはIterableにしか付けられない。(Listには付けられない)


@Spill

AsakusaFW 0.9.1以降では、入力データ(ListやIterable)に@Spillを付けることが出来る。[2017-04-30]
(AsakusaFW 0.10.0で正式機能となった。[2017-12-11]

CoGroupの@Spillの例を参照。(@Spillの使い方はCoGroupと同じ)


Operator DSLへ戻る / AsakusaFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま