CascadingのPipeのサブクラスであるEveryクラスについて。
|
|
Cascadingにおいて、キー毎の集計処理を行うのがEvery。
import cascading.pipe.Every;
EveryにはSumやCountといった集約関数(Aggregator)、あるいはBufferを指定する。
HadoopのReducer、あるいはSQLのsumやcount等の集約関数に相当する。
また、Everyを使う前にはGroupByでソートしておく必要がある。
GroupByで指定したソートキーが集計に対するキーとなる。
(デフォルトの出力は、キーと集約関数の結果を合わせたTupleになる)
ソートキーを指定しない、全データに対する集計という処理は行えない。[2010-04-08]
全データに対して集計したい場合は、全データで共通の値となる項目を追加する必要がある。
(Reducer#reduce()に置き換えられるわけだが、入出力時にキーが必要だから)
→Insertを使ってダミーのソートキーを追加する方法
例 | 説明 |
---|---|
pipe = new Every( |
ソート項目毎に集約を行う。 集約関数に渡されるのはGroupByの出力の全項目(Fields.ALL)で、 Everyとしての出力は入力キーと集約関数の出力結果を合わせた全項目(Fields.ALL)。 |
pipe = new Every( |
指定された項目(argumentSelector)を集約対象とする。 SumやAverageは入力データ(Tuple)の第1項目を対象とするので、 Tupleに複数の項目がある場合は対象項目を指定する必要がある。 |
pipe = new Every( |
指定された項目(outputSelector)を出力する。 |
Fields dummyKey = new Fields("dummyKey"); |
全データに対する集計を行う例。[2010-04-22] 全データにInsert関数を使って同一の固定値を挿入し、それをキーとしてソートする。 |
Everyに指定し、キー毎の実際の処理を記述するのがAggregator(集約関数)。
pipe = new Every(pipe, new MyAggregator());
自分でAggregatorを作るには、Aggregatorインターフェースを実装すればよい。(Functionと同様)
さらにBaseOperationを親クラスにすれば、実際に実装するのは最小限のメソッドだけで済む。
Aggregator#start()メソッドで初期処理を行い、aggregate()で1データずつ集計処理を行う。そしてcomplete()で出力する。
これら3つのメソッドが
Hadoop本来のReducer#reduce()に相当する。
(参考: Average)
import cascading.flow.FlowProcess; import cascading.operation.Aggregator; import cascading.operation.AggregatorCall; import cascading.operation.BaseOperation;
public class MyAggregator extends BaseOperation<MyAggregator.Context> implements Aggregator<MyAggregator.Context> { private static final long serialVersionUID = 1L; /** コンストラクター */ public MyAggregator() { // 出力項目名を指定 super(new Fields("count", "sum")); } protected static class Context { protected int count; protected int sum; public void reset() { count = 0; sum = 0; } } @Override public void start(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) { Context context = aggregatorCall.getContext
(); if (context != null) { context.reset(); } else { aggregatorCall.setContext
(new Context()); } } @Override public void aggregate(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) { TupleEntry entry = aggregatorCall.getArguments(); //入力データ // データの集計 Context context = aggregatorCall.getContext
(); context.count++; context.sum += entry.getInteger("data"); } @Override public void complete(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) { //出力データの作成 Context context = aggregatorCall.getContext
(); Tuple tuple = new Tuple(context.count, context.sum); aggregatorCall.getOutputCollector().add(tuple); //集計結果の出力 } }
BaseOperationやAggregatorの型引数は、メソッド呼び出しの間で保持して受け渡すデータの型(Contextクラス)を指定する。
Aggregatorはstart()で初期化してaggregate()で集計してcomplete()で出力するので、その間のデータの受け渡しにContextを使用する。
(キーの値が変わる度にstart()が呼ばれる)
(complete()以外でタプルを出力してはいけない(→単体テスト方法参照)。そういう事がしたい場合はBufferを使う。[2010-04-17])
引数のAggregatorCallの使い方は、FunctionCallと同様。
メソッド | 説明 | |
---|---|---|
TupleEntry |
aggregatorCall.getGroup() |
グルーピングのキーとなっているタプルを取得する。 |
TupleEntry |
aggregatorCall.getArguments() |
入力データとなるタプルを取得する。 |
Fields |
aggregatorCall.getArgumentFields() |
入力データのフィールド名を取得する。Cascading1.1以降。[2010-05-01] |
TupleEntryCollector |
aggregatorCall.getOutputCollector() |
データの出力先を取得する。 |
Context |
aggregatorCall.getContext() |
Aggregatorの型引数で指定したContextを取得・設定する。 Contextインスタンスを生成・初期化するのはプログラマーの責任。 |
void |
aggregatorCall.setContext(Context) |
Cascading1.0.18のAggregatorには以下のようなクラスがある。
クラス名 | 説明 | データ型 | デフォルトの 出力項目名 |
---|---|---|---|
cascading.operation. Aggregator |
集約関数のインターフェース。 | ||
cascading.operation.aggregator. Count |
キー毎のデータ数をカウントする。 →CountBy |
Long | count |
cascading.operation.aggregator. Sum |
キー毎のデータ(Tupleの第1項目)の集計を行う。 データはデフォルトではdoubleとして扱われる。 →SumBy |
Double | sum |
cascading.operation.aggregator. Average |
キー毎のデータ(Tupleの第1項目)の平均を算出する。 合計をdouble、件数をlongで計算する。 →AverageBy |
Double | average |
cascading.operation.aggregator. Min |
同一キー内の(Tupleの第1項目の)最小/最大データを出力する。 データはdoubleとして比較する。 |
引数の型 | min |
cascading.operation.aggregator. Max |
引数の型 | max | |
cascading.operation.aggregator. First |
同一キー内の先頭/末尾データ(Tupleの全項目)を出力する。 | 引数 | 引数と同じ |
cascading.operation.aggregator. Last |
引数 | 引数と同じ |
データをdoubleとして扱う場合、TupleEntry#getDouble()を使ってdouble型の値を取得している。
(つまりNumberであればdoubleValue()、それ以外だとDouble.toDouble(obj.toString())で変換する)
Everyには、Aggregator以外にBufferというものも指定できる。[2010-04-17]
pipe = new Every(pipe, new MyBuffer());
Bufferのコーディングは、Hadoop本来のReducerとそっくりになる。
BufferだけあればAggregatorなんか要らないんじゃないかと思うが、Cascadingとしては集約・集計はAggregatorを使うのが標準のようだ。
ではBufferの目的は何かと言うと、前後のレコード(タプル)の値を参照したい場合に使用する。
import cascading.flow.FlowProcess; import cascading.operation.BaseOperation; import cascading.operation.Buffer; import cascading.operation.BufferCall;
/** * 時刻項目から前レコードとの差分(時間)を算出する. */ public class TimeDeltaBuffer extends BaseOperation<DateFormat> implements Buffer<DateFormat> { private static final long serialVersionUID = 1L; // コンストラクター public TimeDeltaBuffer(Fields fieldDeclaration) { super(1, fieldDeclaration); } @Override public void prepare(FlowProcess flowProcess, OperationCall<DateFormat> operationCall) { super.prepare(flowProcess, operationCall); if (operationCall.getContext() == null) { operationCall.setContext(new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS")); } } @Override public void operate(FlowProcess flowProcess, BufferCall<DateFormat> bufferCall) { TupleEntryCollector collector = bufferCall.getOutputCollector(); DateFormat df = bufferCall.getContext(); // 前データ(前レコードの時刻) long prev = 0; // 現在時刻(現在のレコードの時刻) long now; for (Iterator<TupleEntry> i = bufferCall.getArgumentsIterator(); i.hasNext(); prev = now) { try { Tuple value = i.next().getTuple(); String s = value.getString(0); //第1項目をStringとして取得 now = df.parse(s).getTime(); } catch (ParseException e) { throw new RuntimeException(e); } if (prev == 0) { //初回(先頭レコード)用の初期化 prev = now; } // 前の時刻との差分(経過時間)の算出 long delta = now - prev; // 時間の出力 collector.add(new Tuple(delta)); } } }
SimpleDateFormatインスタンスをフィールドで保持せずにわざわざContext経由で渡しているのは、SimpleDateFormatはMTセーフにする必要がある為。
(現時点のHadoopではマルチスレッドで動作することは無いようだが、将来的には分からないという理由でContextが存在しているらしい)
(まぁ、Contextの使い方としては、集計値と直接関係ないDateFormatのようなオブジェクトを渡すのはイレギュラーな使い方なんじゃないかという気はしなくもないが(苦笑))
Pipe pipe = new Pipe("pipe"); pipe = new Each(pipe, new RegexSplitter(new Fields("key", "time"), ",")); // keyでグルーピング、timeで2次ソート pipe = new GroupBy(pipe, new Fields("key"), new Fields("time")); pipe = new Every(pipe, new Fields("time"), new TimeDeltaBuffer(new Fields("delta")));
まず入力データをカンマ区切りでkeyとtimeという2つの項目に分割している。
次にkeyでグルーピングする。この際、key毎にtimeでさらにソートしておく。
で、TimeDeltaBufferにtime項目を渡し、deltaという項目名で時間を出力する。
なお、Aggregatorではグループキー項目と集約結果のペアが出力されるが、Bufferでは入力の全項目(collector.add()時の処理対象の(Iterator#next()で取得した)入力タプル)と編集結果がペアで出力される。
入力データの例 | → | 出力結果 |
---|---|---|
key1,2010/04/15 01:02:03.000 key1,2010/04/15 01:02:04.000 key1,2010/04/15 01:02:04.001 key1,2010/04/15 01:02:05.000 key1,2010/04/15 01:03:03.000 key2,2010/04/15 01:03:04.000 key1,2010/04/15 01:03:05.000 key2,2010/04/15 01:03:06.000 key1,2010/04/16 01:00:00.000 key2,2010/04/16 01:00:01.000 |
→ |
key1 2010/04/15 01:02:03.000 0 key1 2010/04/15 01:02:04.000 1000 key1 2010/04/15 01:02:04.001 1 key1 2010/04/15 01:02:05.000 999 key1 2010/04/15 01:03:03.000 58000 key1 2010/04/15 01:03:05.000 2000 key1 2010/04/16 01:00:00.000 86215000 key2 2010/04/15 01:03:04.000 0 key2 2010/04/15 01:03:06.000 2000 key2 2010/04/16 01:00:01.000 86215000 |
keyとtimeのカンマ区切り | key・time・deltaをタブ区切り |