CascadingのPipeのサブクラスであるEachクラスについて。
|
|
Cascadingにおいて、入力データ(Tuple)1個につき1回ずつ処理を行うのがEach。
import cascading.pipe.Each;
Eachには、実際の処理を行うFunctionやFilterを指定する。
Hadoop本来のMapper#map()に相当する。
例 | 説明 |
---|---|
pipe = new Each(pipe,
関数オブジェクト); |
前パイプのデータ(Tuple)を関数に渡す。 関数に渡されるのは前パイプの出力の全項目(Fields.ALL)で、 Eachとしての出力は関数の出力結果の項目のみ(Fields.RESULTS)。 |
pipe = new Each(pipe, new Fields(項目名,…), 関数オブジェクト); |
指定された項目(argumentSelector)だけを前パイプから抽出して(新しいTupleを作り) 関数に渡す。 |
pipe = new Each(pipe, 関数オブジェクト, new Fields(項目名,…)); |
前パイプのデータ(Tuple)を関数に渡し、 指定された項目(outputSelector)を出力する。 Fields.ALLを指定すると、入力データと関数の出力結果を結合したTupleを出力する。 |
pipe = new Each(pipe, |
argumentSelectorとoutputSelectorの両方を同時に指定できる。 |
Eachに指定し、1データずつの実際の処理を記述するのがFunction(関数)。
pipe = new Each(pipe, new MyFunction());
自分でFunctionを作るには、Functionインターフェースを実装すればよい。
さらにBaseOperationを親クラスにすれば、実際に実装するのは1メソッドだけで済む。
Function#operate()メソッドが
Hadoop本来のMapper#map()に相当し、1データずつの処理を記述する。
なお、BaseOperationはjava.io.Serializableをimplementsしている。[2010-04-29]
つまり、自作の関数はシリアライズ可能である必要がある。
何故シリアライズ可能である必要があるかと言うと、Cascadingは処理を別マシンで実行するためにPipeをシリアライズして転送するから。
すると、当然Pipeの一種であるEachやそこに指定している関数もシリアライズ可能である必要がある。
import cascading.flow.FlowProcess; import cascading.operation.BaseOperation; import cascading.operation.Function; import cascading.operation.FunctionCall;
public class MyFunction extends BaseOperation<MyFunction.Context> implements Function<MyFunction.Context> { private static final long serialVersionUID = 1L; static class Context { } @Override public void operate(FlowProcess flowProcess, FunctionCall<Context> functionCall) { TupleEntry entry = functionCall.getArguments(); //入力データ Tuple tuple = 〜; //出力データの作成 functionCall.getOutputCollector().add(tuple); //出力 } }
BaseOperationやFunctionの型引数は、複数のoperate()メソッド呼び出しの間で保持して受け渡したいデータがある場合に指定する。
しかしCascading1.0の
Functionの場合はあまり使い道が無いと思う。
→Aggregatorでは必須
→Cascading1.1での使い道
operate()の引数のfunctionCallから、使用する情報を取得する。
メソッド | 説明 | |
---|---|---|
TupleEntry |
functionCall.getArguments() |
入力データとなるタプルを取得する。 |
Fields |
aggregatorCall.getArgumentFields() |
入力データのフィールド名を取得する。Cascading1.1以降。[2010-05-01] |
TupleEntryCollector |
functionCall.getOutputCollector() |
データの出力先を取得する。 |
Context |
functionCall.getContext() |
Functionの型引数で指定したContextを取得・設定する。 Contextインスタンスを生成するのはプログラマーの責任。 (Functionではたぶん使用しない。Aggregatorでは必須) |
void |
functionCall.setContext(Context) |
Cascading1.0.18のcollector.add()は、Mapper#map()のoutput.collect()(Hadoop0.20のcontext.write()相当)を呼び出す。
context.write()では引数のオブジェクトはシリアライズ(あるいはコピー)されて保持されるので、引数のオブジェクトは使い回すことが出来る。
しかしcollector.add()では、渡したTupleはunmodifiable(変更不可)にされるので、使い回すことは出来ない。
Cascading1.1で、getArgumentFields()メソッドが追加された。[2010-05-01]
何が嬉しいかと言うと、関数(Operation)内の前処理であるprepare()メソッドで引数のフィールド名を取得できること。
getArguments()は入ってきたデータを取得するものなので、prepare()内では使えない。
事前にフィールド名を取得できれば、Fields内のインデックスに変換しておくことが出来る。
Tupleからデータを取得する際は、フィールド名を指定するよりもインデックスを指定する方が高速だから。
(つまりprepare()でContextを作り、その中でインデックスを保持しておく。operate()ではそのインデックスを使ってアクセスする)
BaseOperationにはFieldsを引数にとるコンストラクターがある。
これを指定すると、出力する項目名を指定できる。
出力するデータ(Tupleの各項目)は、それに合致するようにしなければならない。
public class MyFunction extends BaseOperation<Object> implements Function<Object> { private static final long serialVersionUID = 1L; /** コンストラクター */ public MyFunction() { //出力する項目名(と項目の個数)を指定 super(new Fields("out1", "out2")); } @Override public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) { TupleEntry entry = functionCall.getArguments(); //入力データ //出力データの作成 if (true) { // Tupleを使う例 Tuple tuple = Tuple.size(fieldDeclaration.size()); tuple.set(0, entry.get("in1")); //out1 tuple.set(1, entry.get("in2")); //out2 functionCall.getOutputCollector().add(tuple); //出力 } else { // TupleEntryを使う例 TupleEntry outEntry = new TupleEntry(fieldDeclaration, Tuple.size(fieldDeclaration.size())); outEntry.set("out1", entry.get("in1")); outEntry.set("out2", entry.get("in2")); functionCall.getOutputCollector().add(outEntry); //出力 } } }
コンストラクターでFieldsを指定すると、super.fieldDeclaration
で使用できる。
fieldDeclaration.size()
は、項目の個数を取得するもの。
なお、TupleEntryは単なる“Tupleのラッパー”でしかないので、毎回インスタンスを作るのは効率が悪い。
また、名前で指定できるのは便利だが、内部的にはインデックスを算出してアクセスするので、Tupleにインデックス番号を直接指定する方が効率が良い。
コンストラクターのフィールド名は、基本的には関数の外側から渡すようにした方が良い。使う側の応用がしやすいから。[2010-05-01]
/** コンストラクター */ public MyFunction(FieldsfieldDeclaration
) { //出力する項目名(と項目の個数)を指定 super(fieldDeclaration
); }
Cascading1.0.18のFunctionの一部。
クラス名 | 説明 | 例 | |||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
cascading.operation. Function |
関数のインターフェース。 | ||||||||||||||||||
cascading.operation.regex. RegexSplitGenerator |
指定された正規表現(例えばスペースとかタブ)で区切って個々の単語を1個ずつ出力する。 (単語の個数分出力される) →WordCountサンプル |
pipe = new Each(pipe, |
|
||||||||||||||||
cascading.operation.regex. RegexSplitter |
指定された正規表現(例えばタブ)で区切って、個々の単語を各項目に割り当てたTupleを出力する。 | pipe = new Each(pipe, |
|
||||||||||||||||
cascading.operation.text. FieldJoiner |
指定されたデリミター(区切り文字)を使ってTuple内の全項目をつなげて1つの項目として出力する。 | pipe = new Each(pipe, |
|
||||||||||||||||
cascading.operation.text. FieldFormatter |
フォーマット(String. format())を指定して、それに応じた編集を行う。 |
pipe = new Each(pipe, |
|
||||||||||||||||
cascading.operation. Identity |
入力データをそのまま出力する。 ただ項目を絞り込みたい(あるいは列の並び順を変えたい)だけの場合に使える。 |
pipe = new Each(pipe, |
|
||||||||||||||||
pipe = new Each(pipe, |
|
||||||||||||||||||
クラスを指定すると、各項目のデータをそのクラスに応じて変換する。 | pipe = new Each(pipe, |
|
|||||||||||||||||
cascading.operation. Insert |
固定値を挿入(追加)する。[2010-04-22] SQLのinsertとは異なり、レコードを追加するのではなく、 全レコードに特定項目・同一値を追加するもの。 (全データを対象とした集計を行いたい場合に使える) Eachのデフォルトの出力項目は関数の出力結果のみなので、 Fields.ALLを指定しないと、固定値のみのデータが出力されてしまう(苦笑) →Insertの注意点 |
pipe = new Each(pipe, |
|
Insert関数では、挿入する固定値を指定する。[2010-04-29]
これはCascading1.0ではComparable、Cascading1.1ではObjectなのだが、実際はシリアライズ可能なクラス(Serializableを実装したクラス)でなければならない。
例えば0(Integer)や文字列(String)なら大丈夫だが、NullWritableやIntWritableは駄目。
→シリアライズ可能でなければならない理由
Eachに指定し、条件に応じてレコード(タプル)をフィルタリング(除去・削除)するのがFilter。[2010-04-16]
pipe = new Each(pipe, new MyFilter());
フィルターでは、出力項目を絞ることが出来ず、入力タプルの全項目が出力される。
自分でFilterを作るには、Filterインターフェースを実装すればよい。
Functionと同じく、BaseOperationを親クラスにするのが楽。
Filter#isRemove()メソッドで条件を判定し、不要なデータの場合はtrueを返すと、タプルストリームからレコード(データ)が除去される。
import cascading.flow.FlowProcess; import cascading.operation.BaseOperation; import cascading.operation.Filter; import cascading.operation.FilterCall;
/** * 指定された文字列で始まるタプルを除去するフィルター */ public class StartsFilter extends BaseOperation<Object> implements Filter<Object> { private static final long serialVersionUID = 1L; protected String prefix; /** * コンストラクター * * @param prefix * 先頭文字列 */ public StartsFilter(String prefix) { this.prefix = prefix; } @Override public boolean isRemove(FlowProcess flowProcess, FilterCall<Object> filterCall) { Tuple tuple = filterCall.getArguments().getTuple(); //入力データ String s = tuple.getString(0); //先頭項目を取得 if (s.startsWith(prefix)) { return true; //指定された文字から始まる場合はデータを除去する } return false; } }
//「#」で始まる行を除去するフィルター Filter<?> filter = new StartsFilter("#"); pipe = new Each(pipe, new Fields("line"), filter); //判定に使用する項目は「line」
実際のところ、フィルターと同等の処理は、Functionを使っても実現できる。
しかしFilterでは、isRemove()だけ実装すればよい(outputCollectorへの出力を記述する必要が無い)のと、And・Or・Notといった論理演算が使える(論理演算用のフィルターが用意されている)のがFunctionと異なる。
//「#」または「;」で始まる行を除去するフィルター Filter<?> filter = new Or(new StartsFilter("#"), new StartsFilter(";"));
なお、これくらいの条件であれば、Cascadingに用意されているフィルタークラスを使えば自作する必要は無い。
//「#」または「;」で始まる行を除去するフィルター Filter<?> filter = ExpressionFilter("$0.startsWith(\"#\") || $0.startsWith(\";\")", String.class);
//「#」または「;」で始まる行を除去するフィルター(正規表現を使用) Filter<?> filter = new Not(new RegexFilter("^[;#]"));
Cascading1.0.18のFilterの一部。[2010-04-16]
クラス名 | 説明 | 備考 | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
cascading.operation. Filter |
フィルターのインターフェース。 | |||||||||
cascading.operation.filter. FilterNull |
タプル内のいずれかの項目がnullだったら除去する。 | |||||||||
cascading.operation.filter. FilterNotNull |
タプル内のいずれかの項目がnull以外だったら除去する。 (全てがnullの項目だけ出力される) |
|||||||||
cascading.operation.filter. Limit |
指定されたレコード数だけ出力する。 (必ずしもぴったりになるわけではない) |
フィルター(Each)はHadoopのMapperやReducerとして実行される為、 複数のノードに分割して処理される可能性がある。 Limitでは、指定されたレコード数をノード数で割って、各ノードの上限数を決めている。 割り切れない場合は各ノードの上限数が+1されるので、 全体としてちょっと多くなる可能性がある。 また、あるノードに割り当てられた処理対象レコード数が少なければ、 処理対象全体のレコード数合計は指定レコード数より多かったとしても、 抽出されたレコード数合計は指定レコード数より少なくなる事もありそう。 |
||||||||
cascading.operation.filter. Sample |
サンプリング。全体の何%かを出力する。 レコード数が指定された割合になるように 適当に抽出される。 |
乱数(java.util.Random)を使っているので、 いつも同じデータが抽出されるとは限らない。 (処理対象全体のレコード数が同一であっても、毎回同じ件数になるとも限らない) |
||||||||
cascading.operation.regex. RegexFilter |
指定された正規表現を満たすデータだけ出力する。 | |||||||||
cascading.operation.expression. ExpressionFilter |
Javaの式を書いて条件を指定する。 条件を満たすデータを除去する。 |
ExpressionFilterを使う為にはクラスパスに CASCADING_HOME/lib/janino-*.jar が含まれている必要がある。 |
||||||||
cascading.operation.filter. And・Or・Not・Xor |
他のFilterを指定して、論理演算を行う。 | Andの場合、指定された全フィルターの条件を満たす場合にデータを除去する。 Orの場合、指定されたいずれかのフィルターの条件が満たされた場合にデータを除去する。 |
||||||||
cascading.operation. Debug |
デバッグ出力を行う。[2010-04-20] つまり、データをログ(コンソール)に出力(表示)する。 後続パイプへは全データがそのまま無条件に出力される。 (あくまでもEachで指定できるフィルターなので、 GroupByとEveryの間で使う事は出来ない) |
|