S-JIS[2010-04-07/2010-05-01] 変更履歴

Cascading Every

CascadingPipeのサブクラスであるEveryクラスについて。


Everyの概要

Cascadingにおいて、キー毎の集計処理を行うのがEvery

import cascading.pipe.Every;

EveryにはSumCountといった集約関数(Aggregator)、あるいはBufferを指定する。
HadoopReducer、あるいはSQLのsumやcount等の集約関数に相当する。

また、Everyを使う前にはGroupByでソートしておく必要がある。
GroupByで指定したソートキーが集計に対するキーとなる。
(デフォルトの出力は、キーと集約関数の結果を合わせたTupleになる)

ソートキーを指定しない、全データに対する集計という処理は行えない。[2010-04-08]
全データに対して集計したい場合は、全データで共通の値となる項目を追加する必要がある。
(Reducer#reduce()に置き換えられるわけだが、入出力時にキーが必要だから)
Insertを使ってダミーのソートキーを追加する方法

Everyの使用例 [/2010-04-13]
説明
pipe = new Every(
 new GroupBy(pipe, new Fields(ソート項目名,…)),
 集約関数オブジェクト
);
ソート項目毎に集約を行う。
集約関数に渡されるのはGroupByの出力の全項目Fields.ALL)で、
Everyとしての出力は入力キー集約関数の出力結果を合わせた全項目(Fields.ALL)。
pipe = new Every(
 new GroupBy(pipe, new Fields(ソート項目名,…)),
 new Fields(項目名,…),
 集約関数オブジェクト
);
指定された項目argumentSelector)を集約対象とする。
SumAverage入力データTuple)の第1項目を対象とするので、
Tupleに複数の項目がある場合は対象項目を指定する必要がある。
pipe = new Every(
 new GroupBy(pipe, new Fields(ソート項目名,…)),
 集約関数オブジェクト,
 new Fields(項目名,…)
);
指定された項目outputSelector)を出力する。
Fields dummyKey = new Fields("dummyKey");
pipe = new Each(pipe, new Insert(dummyKey, "dummyValue"), Fields.ALL);
pipe = new GroupBy(pipe, dummyKey);
pipe = new Every(pipe, 〜);
全データに対する集計を行う例。[2010-04-22]
全データにInsert関数を使って同一の固定値を挿入し、それをキーとしてソートする。

Aggregator

Everyに指定し、キー毎の実際の処理を記述するのがAggregator(集約関数)。

	pipe = new Every(pipe, new MyAggregator());

自分でAggregatorを作るには、Aggregatorインターフェースを実装すればよい。(Functionと同様)
さらにBaseOperationを親クラスにすれば、実際に実装するのは最小限のメソッドだけで済む。
Aggregator#start()メソッドで初期処理を行い、aggregate()で1データずつ集計処理を行う。そしてcomplete()で出力する。
これら3つのメソッドが Hadoop本来のReducer#reduce()に相当する。
(参考: Average


Aggregatorのコーディング例

import cascading.flow.FlowProcess;
import cascading.operation.Aggregator;
import cascading.operation.AggregatorCall;
import cascading.operation.BaseOperation;
public class MyAggregator extends BaseOperation<MyAggregator.Context> implements Aggregator<MyAggregator.Context> {
	private static final long serialVersionUID = 1L;

	/** コンストラクター */
	public MyAggregator() {
		// 出力項目名を指定
		super(new Fields("count", "sum"));
	}

	protected static class Context {
		protected int count;
		protected int sum;

		public void reset() { count = 0; sum = 0; }
	}

	@Override
	public void start(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
		Context context = aggregatorCall.getContext();
		if (context != null) {
			context.reset();
		} else {
			aggregatorCall.setContext(new Context());
		}
	}

	@Override
	public void aggregate(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
		TupleEntry entry = aggregatorCall.getArguments(); //入力データ

		// データの集計
		Context context = aggregatorCall.getContext();
		context.count++;
		context.sum += entry.getInteger("data");
	}

	@Override
	public void complete(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {

		//出力データの作成
		Context context = aggregatorCall.getContext();
		Tuple tuple = new Tuple(context.count, context.sum);

		aggregatorCall.getOutputCollector().add(tuple);	//集計結果の出力
	}
}

BaseOperationやAggregatorの型引数は、メソッド呼び出しの間で保持して受け渡すデータの型(Contextクラス)を指定する。
Aggregatorはstart()で初期化してaggregate()で集計してcomplete()で出力するので、その間のデータの受け渡しにContextを使用する。
(キーの値が変わる度にstart()が呼ばれる)
(complete()以外でタプルを出力してはいけない(→単体テスト方法参照)。そういう事がしたい場合はBufferを使う。[2010-04-17]

引数のAggregatorCallの使い方は、FunctionCallと同様。

メソッド 説明
TupleEntry aggregatorCall.getGroup() グルーピングのキーとなっているタプルを取得する。
TupleEntry aggregatorCall.getArguments() 入力データとなるタプルを取得する。
Fields aggregatorCall.getArgumentFields() 入力データのフィールド名を取得する。Cascading1.1以降。[2010-05-01]
TupleEntryCollector aggregatorCall.getOutputCollector() データの出力先を取得する。
Context aggregatorCall.getContext() Aggregatorの型引数で指定したContextを取得・設定する。
Contextインスタンスを生成・初期化するのはプログラマーの責任。
void aggregatorCall.setContext(Context)

Aggregatorの種類

Cascading1.0.18のAggregatorには以下のようなクラスがある。

クラス名 説明 データ型 デフォルトの
出力項目名
cascading.operation.
Aggregator
集約関数のインターフェース。    
cascading.operation.aggregator.
Count
キー毎のデータ数をカウントする。
CountBy
Long count
cascading.operation.aggregator.
Sum
キー毎のデータ(Tupleの第1項目)の集計を行う。
データはデフォルトではdoubleとして扱われる。
SumBy
Double sum
cascading.operation.aggregator.
Average
キー毎のデータ(Tupleの第1項目)の平均を算出する。
合計をdouble、件数をlongで計算する。
AverageBy
Double average
cascading.operation.aggregator.
Min
同一キー内の(Tupleの第1項目の)最小/最大データを出力する。
データはdoubleとして比較する。
引数の型 min
cascading.operation.aggregator.
Max
引数の型 max
cascading.operation.aggregator.
First
同一キー内の先頭/末尾データ(Tupleの全項目)を出力する。 引数 引数と同じ
cascading.operation.aggregator.
Last
引数 引数と同じ

データをdoubleとして扱う場合、TupleEntry#getDouble()を使ってdouble型の値を取得している。
(つまりNumberであればdoubleValue()、それ以外だとDouble.toDouble(obj.toString())で変換する)


Buffer

Everyには、Aggregator以外にBufferというものも指定できる。[2010-04-17]

	pipe = new Every(pipe, new MyBuffer());

Bufferのコーディングは、Hadoop本来のReducerとそっくりになる。
BufferだけあればAggregatorなんか要らないんじゃないかと思うが、Cascadingとしては集約・集計はAggregatorを使うのが標準のようだ。

ではBufferの目的は何かと言うと、前後のレコード(タプル)の値を参照したい場合に使用する。


Bufferのコーディング例

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Buffer;
import cascading.operation.BufferCall;
/**
 * 時刻項目から前レコードとの差分(時間)を算出する.
 */
public class TimeDeltaBuffer extends BaseOperation<DateFormat> implements Buffer<DateFormat> {
	private static final long serialVersionUID = 1L;

	// コンストラクター
	public TimeDeltaBuffer(Fields fieldDeclaration) {
		super(1, fieldDeclaration);
	}

	@Override
	public void prepare(FlowProcess flowProcess, OperationCall<DateFormat> operationCall) {
		super.prepare(flowProcess, operationCall);

		if (operationCall.getContext() == null) {
			operationCall.setContext(new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"));
		}
	}

	@Override
	public void operate(FlowProcess flowProcess, BufferCall<DateFormat> bufferCall) {
		TupleEntryCollector collector = bufferCall.getOutputCollector();
		DateFormat df = bufferCall.getContext();

		// 前データ(前レコードの時刻)
		long prev = 0;
		// 現在時刻(現在のレコードの時刻)
		long now;

		for (Iterator<TupleEntry> i = bufferCall.getArgumentsIterator(); i.hasNext(); prev = now) {
			try {
				Tuple value = i.next().getTuple();
				String s = value.getString(0);	//第1項目をStringとして取得
				now = df.parse(s).getTime();
			} catch (ParseException e) {
				throw new RuntimeException(e);
			}

			if (prev == 0) {	//初回(先頭レコード)用の初期化
				prev = now;
			}

			// 前の時刻との差分(経過時間)の算出
			long delta = now - prev;

			// 時間の出力
			collector.add(new Tuple(delta));
		}
	}
}

SimpleDateFormatインスタンスをフィールドで保持せずにわざわざContext経由で渡しているのは、SimpleDateFormatはMTセーフにする必要がある為。
(現時点のHadoopではマルチスレッドで動作することは無いようだが、将来的には分からないという理由でContextが存在しているらしい)
(まぁ、Contextの使い方としては、集計値と直接関係ないDateFormatのようなオブジェクトを渡すのはイレギュラーな使い方なんじゃないかという気はしなくもないが(苦笑))


TimeDeltaBufferの使用例

		Pipe pipe = new Pipe("pipe");
		pipe = new Each(pipe, new RegexSplitter(new Fields("key", "time"), ","));

		// keyでグルーピング、timeで2次ソート
		pipe = new GroupBy(pipe, new Fields("key"), new Fields("time"));
		pipe = new Every(pipe, new Fields("time"), new TimeDeltaBuffer(new Fields("delta")));

まず入力データをカンマ区切りでkeyとtimeという2つの項目に分割している。
次にkeyでグルーピングする。この際、key毎にtimeでさらにソートしておく。
で、TimeDeltaBufferにtime項目を渡し、deltaという項目名で時間を出力する。

なお、Aggregatorではグループキー項目と集約結果のペアが出力されるが、Bufferでは入力の全項目(collector.add()時の処理対象の(Iterator#next()で取得した)入力タプル)と編集結果がペアで出力される。

入力データの例 出力結果
key1,2010/04/15 01:02:03.000
key1,2010/04/15 01:02:04.000
key1,2010/04/15 01:02:04.001
key1,2010/04/15 01:02:05.000
key1,2010/04/15 01:03:03.000
key2,2010/04/15 01:03:04.000
key1,2010/04/15 01:03:05.000
key2,2010/04/15 01:03:06.000
key1,2010/04/16 01:00:00.000
key2,2010/04/16 01:00:01.000
key1    2010/04/15 01:02:03.000	0
key1    2010/04/15 01:02:04.000	1000
key1    2010/04/15 01:02:04.001	1
key1    2010/04/15 01:02:05.000	999
key1    2010/04/15 01:03:03.000	58000
key1    2010/04/15 01:03:05.000	2000
key1    2010/04/16 01:00:00.000	86215000
key2    2010/04/15 01:03:04.000	0
key2    2010/04/15 01:03:06.000	2000
key2    2010/04/16 01:00:01.000	86215000
keyとtimeのカンマ区切り   key・time・deltaをタブ区切り

Cascadingへ戻る / Hadoopへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま