S-JIS[2010-04-06/2010-05-01] 変更履歴

Cascading Each

CascadingPipeのサブクラスであるEachクラスについて。


Eachの概要

Cascadingにおいて、入力データ(Tuple)1個につき1回ずつ処理を行うのがEach

import cascading.pipe.Each;

Eachには、実際の処理を行うFunctionFilterを指定する。
Hadoop本来Mapper#map()に相当する。

Eachの使用例 [/2010-04-13]
説明
pipe = new Each(pipe, 関数オブジェクト); 前パイプのデータ(Tuple)を関数に渡す。
関数に渡されるのは前パイプの出力の全項目Fields.ALL)で、
Eachとしての出力は関数の出力結果の項目のみ(Fields.RESULTS)。
pipe = new Each(pipe, new Fields(項目名,…), 関数オブジェクト); 指定された項目argumentSelector)だけを前パイプから抽出して(新しいTupleを作り)
関数に渡す。
pipe = new Each(pipe, 関数オブジェクト, new Fields(項目名,…)); 前パイプのデータ(Tuple)を関数に渡し、
指定された項目outputSelector)を出力する。
Fields.ALLを指定すると、入力データ関数の出力結果を結合したTupleを出力する。
pipe = new Each(pipe,
 new Fields(項目名,…),
 関数オブジェクト,
 new Fields(項目名,…)
);
argumentSelectoroutputSelectorの両方を同時に指定できる。

Function

Eachに指定し、1データずつの実際の処理を記述するのがFunction(関数)。

	pipe = new Each(pipe, new MyFunction());

自分でFunctionを作るには、Functionインターフェースを実装すればよい。
さらにBaseOperationを親クラスにすれば、実際に実装するのは1メソッドだけで済む。
Function#operate()メソッドが Hadoop本来のMapper#map()に相当し、1データずつの処理を記述する。

なお、BaseOperationはjava.io.Serializableをimplementsしている。[2010-04-29]
つまり、自作の関数はシリアライズ可能である必要がある。

何故シリアライズ可能である必要があるかと言うと、Cascadingは処理を別マシンで実行するためにPipeシリアライズして転送するから。
すると、当然Pipeの一種であるEachやそこに指定している関数もシリアライズ可能である必要がある。

Functionのコーディング例

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
public class MyFunction extends BaseOperation<MyFunction.Context> implements Function<MyFunction.Context> {
	private static final long serialVersionUID = 1L;

	static class Context {
	}

	@Override
	public void operate(FlowProcess flowProcess, FunctionCall<Context> functionCall) {
		TupleEntry entry = functionCall.getArguments(); //入力データ

		Tuple tuple = 〜;	//出力データの作成

		functionCall.getOutputCollector().add(tuple);	//出力
	}
}

BaseOperationやFunctionの型引数は、複数のoperate()メソッド呼び出しの間で保持して受け渡したいデータがある場合に指定する。
しかしCascading1.0の Functionの場合はあまり使い道が無いと思う。
Aggregatorでは必須
Cascading1.1での使い道


operate()の引数のfunctionCallから、使用する情報を取得する。

メソッド 説明
TupleEntry functionCall.getArguments() 入力データとなるタプルを取得する。
Fields aggregatorCall.getArgumentFields() 入力データのフィールド名を取得する。Cascading1.1以降。[2010-05-01]
TupleEntryCollector functionCall.getOutputCollector() データの出力先を取得する。
Context functionCall.getContext() Functionの型引数で指定したContextを取得・設定する。
Contextインスタンスを生成するのはプログラマーの責任。
(Functionではたぶん使用しない。Aggregatorでは必須)
void functionCall.setContext(Context)

Cascading1.0.18のcollector.add()は、Mapper#map()のoutput.collect()(Hadoop0.20のcontext.write()相当)を呼び出す。
context.write()では引数のオブジェクトはシリアライズ(あるいはコピー)されて保持されるので、引数のオブジェクトは使い回すことが出来る。
しかしcollector.add()では、渡したTupleはunmodifiable(変更不可)にされるので、使い回すことは出来ない。

Cascading1.1で、getArgumentFields()メソッドが追加された。[2010-05-01]
何が嬉しいかと言うと、関数(Operation)内の前処理であるprepare()メソッドで引数のフィールド名を取得できること。
getArguments()は入ってきたデータを取得するものなので、prepare()内では使えない。
事前にフィールド名を取得できれば、Fields内のインデックスに変換しておくことが出来る。
Tupleからデータを取得する際は、フィールド名を指定するよりもインデックスを指定する方が高速だから。
(つまりprepare()でContextを作り、その中でインデックスを保持しておく。operate()ではそのインデックスを使ってアクセスする)


BaseOperationにはFieldsを引数にとるコンストラクターがある。
これを指定すると、出力する項目名を指定できる。
出力するデータ(Tupleの各項目)は、それに合致するようにしなければならない。

public class MyFunction extends BaseOperation<Object> implements Function<Object> {
	private static final long serialVersionUID = 1L;

	/** コンストラクター */
	public MyFunction() {
		//出力する項目名(と項目の個数)を指定
		super(new Fields("out1", "out2"));
	}

	@Override
	public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {
		TupleEntry entry = functionCall.getArguments(); //入力データ

		//出力データの作成
		if (true) {
			// Tupleを使う例
			Tuple tuple = Tuple.size(fieldDeclaration.size());
			tuple.set(0, entry.get("in1")); //out1
			tuple.set(1, entry.get("in2")); //out2

			functionCall.getOutputCollector().add(tuple); //出力
		} else {
			// TupleEntryを使う例
			TupleEntry outEntry = new TupleEntry(fieldDeclaration, Tuple.size(fieldDeclaration.size()));
			outEntry.set("out1", entry.get("in1"));
			outEntry.set("out2", entry.get("in2"));

			functionCall.getOutputCollector().add(outEntry); //出力
		}
	}
}

コンストラクターでFieldsを指定すると、super.fieldDeclarationで使用できる。
fieldDeclaration.size()は、項目の個数を取得するもの。

なお、TupleEntryは単なる“Tupleのラッパー”でしかないので、毎回インスタンスを作るのは効率が悪い。
また、名前で指定できるのは便利だが、内部的にはインデックスを算出してアクセスするので、Tupleにインデックス番号を直接指定する方が効率が良い。

コンストラクターのフィールド名は、基本的には関数の外側から渡すようにした方が良い。使う側の応用がしやすいから。[2010-05-01]

	/** コンストラクター */
	public MyFunction(Fields fieldDeclaration) {
		//出力する項目名(と項目の個数)を指定
		super(fieldDeclaration);
	}

Functionの種類

Cascading1.0.18のFunctionの一部。

クラス名 説明
cascading.operation.
Function
関数のインターフェース。    
cascading.operation.regex.
RegexSplitGenerator
指定された正規表現(例えばスペースとかタブ)で区切って個々の単語1個ずつ出力する。
(単語の個数分出力される)
WordCountサンプル
pipe = new Each(pipe,
 new RegexSplitGenerator(",")
);
0   0
"ab,cd,ef" "ab"
"cd"
"ef"
cascading.operation.regex.
RegexSplitter
指定された正規表現(例えばタブ)で区切って、個々の単語各項目に割り当てたTupleを出力する。 pipe = new Each(pipe,
 new RegexSplitter(",")
);
0   0 1 2
"ab,cd,ef" "ab" "cd" "ef"
cascading.operation.text.
FieldJoiner
指定されたデリミター(区切り文字)を使ってTuple内の全項目をつなげて1つの項目として出力する。 pipe = new Each(pipe,
 new FieldJoiner(":")
);
0 1 2   0
"ab" "cd" "ef" "ab:cd:ef"
cascading.operation.text.
FieldFormatter
フォーマット(String.format())を指定して、それに応じた編集を行う。 pipe = new Each(pipe,
 new FieldFormatter("%s=[%s]")
);
0 1   0
"abc" "def" "abc=[def]"
cascading.operation.
Identity
入力データをそのまま出力する。
ただ項目を絞り込みたい(あるいは列の並び順を変えたい)だけの場合に使える。
pipe = new Each(pipe,
 new Fields("col2"),
 new Identity()
);
0
col1
1
col2
  0
col2
"abc" "def" "def"
pipe = new Each(pipe,
 new Fields(2, 1),
 new Identity()
);
0 1 2   0 1
"ab" "cd" "ef" "ef" "cd"
クラスを指定すると、各項目のデータをそのクラスに応じて変換する。 pipe = new Each(pipe,
 new Identity(
  int.class, int.class
 )
);
0 1   0 1
"123" "456" 123 456
cascading.operation.
Insert
固定値を挿入(追加)する。[2010-04-22]
SQLのinsertとは異なり、レコードを追加するのではなく、
全レコードに特定項目・同一値を追加するもの。
全データを対象とした集計を行いたい場合に使える)
Eachのデフォルトの出力項目は関数の出力結果のみなので、
Fields.ALLを指定しないと、固定値のみのデータが出力されてしまう(苦笑)
Insertの注意点
pipe = new Each(pipe,
 new Insert(
  new Fields("newcol"),
  "定数値"
 ),
 Fields.ALL
);
0
 
  0
 
1
newcol
"abc" "abc" "定数値"
"def" "def" "定数値"
"ghi" "ghi" "定数値"

Insertに指定する定数について

Insert関数では、挿入する固定値を指定する。[2010-04-29]
これはCascading1.0ではComparable、Cascading1.1ではObjectなのだが、実際はシリアライズ可能なクラス(Serializableを実装したクラス)でなければならない。
例えば0(Integer)や文字列(String)なら大丈夫だが、NullWritableやIntWritableは駄目。
シリアライズ可能でなければならない理由


Filter

Eachに指定し、条件に応じてレコード(タプル)をフィルタリング(除去・削除)するのがFilter。[2010-04-16]

	pipe = new Each(pipe, new MyFilter());

フィルターでは、出力項目を絞ることが出来ず、入力タプルの全項目が出力される。


Filterのコーディング例

自分でFilterを作るには、Filterインターフェースを実装すればよい。
Functionと同じく、BaseOperationを親クラスにするのが楽。
Filter#isRemove()メソッドで条件を判定し、不要なデータの場合はtrueを返すと、タプルストリームからレコード(データ)が除去される。

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Filter;
import cascading.operation.FilterCall;
/**
 * 指定された文字列で始まるタプルを除去するフィルター
 */
public class StartsFilter extends BaseOperation<Object> implements Filter<Object> {
	private static final long serialVersionUID = 1L;

	protected String prefix;

	/**
	 * コンストラクター
	 *
	 * @param prefix
	 *            先頭文字列
	 */
	public StartsFilter(String prefix) {
		this.prefix = prefix;
	}

	@Override
	public boolean isRemove(FlowProcess flowProcess, FilterCall<Object> filterCall) {
		Tuple tuple = filterCall.getArguments().getTuple(); //入力データ

		String s = tuple.getString(0); //先頭項目を取得
		if (s.startsWith(prefix)) {
			return true; //指定された文字から始まる場合はデータを除去する
		}

		return false;
	}
}

Filterを使う側の例

		//「#」で始まる行を除去するフィルター
		Filter<?> filter = new StartsFilter("#");

		pipe = new Each(pipe, new Fields("line"), filter); //判定に使用する項目は「line」

実際のところ、フィルターと同等の処理は、Functionを使っても実現できる。
しかしFilterでは、isRemove()だけ実装すればよい(outputCollectorへの出力を記述する必要が無い)のと、And・Or・Notといった論理演算が使える(論理演算用のフィルターが用意されている)のがFunctionと異なる。

		//「#」または「;」で始まる行を除去するフィルター
		Filter<?> filter = new Or(new StartsFilter("#"), new StartsFilter(";"));

なお、これくらいの条件であれば、Cascadingに用意されているフィルタークラスを使えば自作する必要は無い。

		//「#」または「;」で始まる行を除去するフィルター
		Filter<?> filter = ExpressionFilter("$0.startsWith(\"#\") || $0.startsWith(\";\")", String.class);
		//「#」または「;」で始まる行を除去するフィルター(正規表現を使用)
		Filter<?> filter = new Not(new RegexFilter("^[;#]"));

Filterの種類

Cascading1.0.18のFilterの一部。[2010-04-16]

クラス名 説明 備考
cascading.operation.
Filter
フィルターのインターフェース。  
cascading.operation.filter.
FilterNull
タプル内のいずれかの項目がnullだったら除去する。  
cascading.operation.filter.
FilterNotNull
タプル内のいずれかの項目がnull以外だったら除去する。
(全てがnullの項目だけ出力される)
 
cascading.operation.filter.
Limit
指定されたレコード数だけ出力する。
(必ずしもぴったりになるわけではない)
フィルター(Each)はHadoopのMapperやReducerとして実行される為、
複数のノードに分割して処理される可能性がある。
Limitでは、指定されたレコード数をノード数で割って、各ノードの上限数を決めている。
割り切れない場合は各ノードの上限数が+1されるので、
全体としてちょっと多くなる可能性がある。
また、あるノードに割り当てられた処理対象レコード数が少なければ、
処理対象全体のレコード数合計は指定レコード数より多かったとしても、
抽出されたレコード数合計は指定レコード数より少なくなる事もありそう。
cascading.operation.filter.
Sample
サンプリング。全体の何%かを出力する。
レコード数が指定された割合になるように
適当に抽出される。
乱数(java.util.Random)を使っているので、
いつも同じデータが抽出されるとは限らない。
(処理対象全体のレコード数が同一であっても、毎回同じ件数になるとも限らない)
cascading.operation.regex.
RegexFilter
指定された正規表現を満たすデータだけ出力する。  
cascading.operation.expression.
ExpressionFilter
Javaの式を書いて条件を指定する。
条件を満たすデータを除去する。
new ExpressionFilter("line.startWith(\"#\")", String.class)
変数名に当たる部分は、フィールド名(項目名)を指定する。
new ExpressionFilter("$0.startWith(\"#\")", String.class)
「$0」のようにしてTupleのインデックス番号を指定することも出来る。
第2引数には、Tupleから取得するデータの型を指定する。
ExpressionFilterを使う為にはクラスパスに
CASCADING_HOME/lib/janino-*.jar
が含まれている必要がある。
cascading.operation.filter.
AndOrNotXor
他のFilterを指定して、論理演算を行う。 Andの場合、指定された全フィルターの条件を満たす場合にデータを除去する。
Orの場合、指定されたいずれかのフィルターの条件が満たされた場合にデータを除去する。
cascading.operation.
Debug
デバッグ出力を行う。[2010-04-20]
つまり、データをログ(コンソール)に出力(表示)する。
後続パイプへは全データがそのまま無条件に出力される。
(あくまでもEachで指定できるフィルターなので、
GroupByEveryの間で使う事は出来ない)
説明
new Each(pipe, new Debug()) 前パイプの全データをログ出力する。
new Each(pipe, new Debug(true)) 項目名と全データをログ出力する。
new Each(pipe, new Fields("zzz"), new Debug()) 指定された項目のデータだけログ出力する。

Cascadingへ戻る / Hadoopへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま