CascadingのPipeのサブクラスであるEveryクラスについて。
|
|
Cascadingにおいて、キー毎の集計処理を行うのがEvery。
import cascading.pipe.Every;
EveryにはSumやCountといった集約関数(Aggregator)、あるいはBufferを指定する。
HadoopのReducer、あるいはSQLのsumやcount等の集約関数に相当する。
また、Everyを使う前にはGroupByでソートしておく必要がある。
GroupByで指定したソートキーが集計に対するキーとなる。
(デフォルトの出力は、キーと集約関数の結果を合わせたTupleになる)
ソートキーを指定しない、全データに対する集計という処理は行えない。[2010-04-08]
全データに対して集計したい場合は、全データで共通の値となる項目を追加する必要がある。
(Reducer#reduce()に置き換えられるわけだが、入出力時にキーが必要だから)
→Insertを使ってダミーのソートキーを追加する方法
| 例 | 説明 |
|---|---|
pipe = new Every( |
ソート項目毎に集約を行う。 集約関数に渡されるのはGroupByの出力の全項目(Fields.ALL)で、 Everyとしての出力は入力キーと集約関数の出力結果を合わせた全項目(Fields.ALL)。 |
pipe = new Every( |
指定された項目(argumentSelector)を集約対象とする。 SumやAverageは入力データ(Tuple)の第1項目を対象とするので、 Tupleに複数の項目がある場合は対象項目を指定する必要がある。 |
pipe = new Every( |
指定された項目(outputSelector)を出力する。 |
Fields dummyKey = new Fields("dummyKey"); |
全データに対する集計を行う例。[2010-04-22] 全データにInsert関数を使って同一の固定値を挿入し、それをキーとしてソートする。 |
Everyに指定し、キー毎の実際の処理を記述するのがAggregator(集約関数)。
pipe = new Every(pipe, new MyAggregator());
自分でAggregatorを作るには、Aggregatorインターフェースを実装すればよい。(Functionと同様)
さらにBaseOperationを親クラスにすれば、実際に実装するのは最小限のメソッドだけで済む。
Aggregator#start()メソッドで初期処理を行い、aggregate()で1データずつ集計処理を行う。そしてcomplete()で出力する。
これら3つのメソッドが
Hadoop本来のReducer#reduce()に相当する。
(参考: Average)
import cascading.flow.FlowProcess; import cascading.operation.Aggregator; import cascading.operation.AggregatorCall; import cascading.operation.BaseOperation;
public class MyAggregator extends BaseOperation<MyAggregator.Context> implements Aggregator<MyAggregator.Context> {
private static final long serialVersionUID = 1L;
/** コンストラクター */
public MyAggregator() {
// 出力項目名を指定
super(new Fields("count", "sum"));
}
protected static class Context {
protected int count;
protected int sum;
public void reset() { count = 0; sum = 0; }
}
@Override
public void start(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
Context context = aggregatorCall.getContext();
if (context != null) {
context.reset();
} else {
aggregatorCall.setContext(new Context());
}
}
@Override
public void aggregate(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
TupleEntry entry = aggregatorCall.getArguments(); //入力データ
// データの集計
Context context = aggregatorCall.getContext();
context.count++;
context.sum += entry.getInteger("data");
}
@Override
public void complete(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
//出力データの作成
Context context = aggregatorCall.getContext();
Tuple tuple = new Tuple(context.count, context.sum);
aggregatorCall.getOutputCollector().add(tuple); //集計結果の出力
}
}
BaseOperationやAggregatorの型引数は、メソッド呼び出しの間で保持して受け渡すデータの型(Contextクラス)を指定する。
Aggregatorはstart()で初期化してaggregate()で集計してcomplete()で出力するので、その間のデータの受け渡しにContextを使用する。
(キーの値が変わる度にstart()が呼ばれる)
(complete()以外でタプルを出力してはいけない(→単体テスト方法参照)。そういう事がしたい場合はBufferを使う。[2010-04-17])
引数のAggregatorCallの使い方は、FunctionCallと同様。
| メソッド | 説明 | |
|---|---|---|
TupleEntry |
aggregatorCall.getGroup() |
グルーピングのキーとなっているタプルを取得する。 |
TupleEntry |
aggregatorCall.getArguments() |
入力データとなるタプルを取得する。 |
Fields |
aggregatorCall.getArgumentFields() |
入力データのフィールド名を取得する。Cascading1.1以降。[2010-05-01] |
TupleEntryCollector |
aggregatorCall.getOutputCollector() |
データの出力先を取得する。 |
Context |
aggregatorCall.getContext() |
Aggregatorの型引数で指定したContextを取得・設定する。 Contextインスタンスを生成・初期化するのはプログラマーの責任。 |
void |
aggregatorCall.setContext(Context) |
|
Cascading1.0.18のAggregatorには以下のようなクラスがある。
| クラス名 | 説明 | データ型 | デフォルトの 出力項目名 |
|---|---|---|---|
| cascading.operation. Aggregator |
集約関数のインターフェース。 | ||
| cascading.operation.aggregator. Count |
キー毎のデータ数をカウントする。 →CountBy |
Long | count |
| cascading.operation.aggregator. Sum |
キー毎のデータ(Tupleの第1項目)の集計を行う。 データはデフォルトではdoubleとして扱われる。 →SumBy |
Double | sum |
| cascading.operation.aggregator. Average |
キー毎のデータ(Tupleの第1項目)の平均を算出する。 合計をdouble、件数をlongで計算する。 →AverageBy |
Double | average |
| cascading.operation.aggregator. Min |
同一キー内の(Tupleの第1項目の)最小/最大データを出力する。 データはdoubleとして比較する。 |
引数の型 | min |
| cascading.operation.aggregator. Max |
引数の型 | max | |
| cascading.operation.aggregator. First |
同一キー内の先頭/末尾データ(Tupleの全項目)を出力する。 | 引数 | 引数と同じ |
| cascading.operation.aggregator. Last |
引数 | 引数と同じ |
データをdoubleとして扱う場合、TupleEntry#getDouble()を使ってdouble型の値を取得している。
(つまりNumberであればdoubleValue()、それ以外だとDouble.toDouble(obj.toString())で変換する)
Everyには、Aggregator以外にBufferというものも指定できる。[2010-04-17]
pipe = new Every(pipe, new MyBuffer());
Bufferのコーディングは、Hadoop本来のReducerとそっくりになる。
BufferだけあればAggregatorなんか要らないんじゃないかと思うが、Cascadingとしては集約・集計はAggregatorを使うのが標準のようだ。
ではBufferの目的は何かと言うと、前後のレコード(タプル)の値を参照したい場合に使用する。
import cascading.flow.FlowProcess; import cascading.operation.BaseOperation; import cascading.operation.Buffer; import cascading.operation.BufferCall;
/**
* 時刻項目から前レコードとの差分(時間)を算出する.
*/
public class TimeDeltaBuffer extends BaseOperation<DateFormat> implements Buffer<DateFormat> {
private static final long serialVersionUID = 1L;
// コンストラクター
public TimeDeltaBuffer(Fields fieldDeclaration) {
super(1, fieldDeclaration);
}
@Override
public void prepare(FlowProcess flowProcess, OperationCall<DateFormat> operationCall) {
super.prepare(flowProcess, operationCall);
if (operationCall.getContext() == null) {
operationCall.setContext(new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"));
}
}
@Override
public void operate(FlowProcess flowProcess, BufferCall<DateFormat> bufferCall) {
TupleEntryCollector collector = bufferCall.getOutputCollector();
DateFormat df = bufferCall.getContext();
// 前データ(前レコードの時刻)
long prev = 0;
// 現在時刻(現在のレコードの時刻)
long now;
for (Iterator<TupleEntry> i = bufferCall.getArgumentsIterator(); i.hasNext(); prev = now) {
try {
Tuple value = i.next().getTuple();
String s = value.getString(0); //第1項目をStringとして取得
now = df.parse(s).getTime();
} catch (ParseException e) {
throw new RuntimeException(e);
}
if (prev == 0) { //初回(先頭レコード)用の初期化
prev = now;
}
// 前の時刻との差分(経過時間)の算出
long delta = now - prev;
// 時間の出力
collector.add(new Tuple(delta));
}
}
}
SimpleDateFormatインスタンスをフィールドで保持せずにわざわざContext経由で渡しているのは、SimpleDateFormatはMTセーフにする必要がある為。
(現時点のHadoopではマルチスレッドで動作することは無いようだが、将来的には分からないという理由でContextが存在しているらしい)
(まぁ、Contextの使い方としては、集計値と直接関係ないDateFormatのようなオブジェクトを渡すのはイレギュラーな使い方なんじゃないかという気はしなくもないが(苦笑))
Pipe pipe = new Pipe("pipe");
pipe = new Each(pipe, new RegexSplitter(new Fields("key", "time"), ","));
// keyでグルーピング、timeで2次ソート
pipe = new GroupBy(pipe, new Fields("key"), new Fields("time"));
pipe = new Every(pipe, new Fields("time"), new TimeDeltaBuffer(new Fields("delta")));
まず入力データをカンマ区切りでkeyとtimeという2つの項目に分割している。
次にkeyでグルーピングする。この際、key毎にtimeでさらにソートしておく。
で、TimeDeltaBufferにtime項目を渡し、deltaという項目名で時間を出力する。
なお、Aggregatorではグループキー項目と集約結果のペアが出力されるが、Bufferでは入力の全項目(collector.add()時の処理対象の(Iterator#next()で取得した)入力タプル)と編集結果がペアで出力される。
| 入力データの例 | → | 出力結果 |
|---|---|---|
key1,2010/04/15 01:02:03.000 key1,2010/04/15 01:02:04.000 key1,2010/04/15 01:02:04.001 key1,2010/04/15 01:02:05.000 key1,2010/04/15 01:03:03.000 key2,2010/04/15 01:03:04.000 key1,2010/04/15 01:03:05.000 key2,2010/04/15 01:03:06.000 key1,2010/04/16 01:00:00.000 key2,2010/04/16 01:00:01.000 |
→ |
key1 2010/04/15 01:02:03.000 0 key1 2010/04/15 01:02:04.000 1000 key1 2010/04/15 01:02:04.001 1 key1 2010/04/15 01:02:05.000 999 key1 2010/04/15 01:03:03.000 58000 key1 2010/04/15 01:03:05.000 2000 key1 2010/04/16 01:00:00.000 86215000 key2 2010/04/15 01:03:04.000 0 key2 2010/04/15 01:03:06.000 2000 key2 2010/04/16 01:00:01.000 86215000 |
| keyとtimeのカンマ区切り | key・time・deltaをタブ区切り |