CascadingのTuple・TupleEntryクラスおよびFieldsクラスについて。
|
|
パイプ内で使用されるデータは、Tupleクラス(タプル)で受け渡しされる。[2010-04-05]
Tap(source)から読み込まれたデータはTupleとしてPipeに渡され、処理した結果もTupleとして出力される。
Tap(sink)もTupleを受け取ってファイルへ出力する。
つまりTupleは、Mapper#map()の引数valueやReducer#reduce()の引数values、出力時のcontext.write()の引数valueに相当する。(というか、実際に渡される)
Tupleは位置づけ的にはJavaBeanのようなもので、Stringやintのフィールドを持つ代わりに個々のデータ(Comparableのサブクラス、具体的にはStringやInteger)をリストを使って保持している。
Tupleは項目名を保持しておらず、List内の位置(インデックス・番号)でしかデータを判別できない。
項目名を扱う場合はTupleEntryが使われる。TupleEntry(厳密にはFields)が項目名とTuple内の位置関係を保持する。
TupleやTupleEntryが使用される場所(引数として渡される場合など)によっては、Tuple内のデータを更新できない(set()やadd()不可)こともある。(更新しようとするとUnsupportedOperationExceptionが発生する)
import cascading.tuple.Tuple; import cascading.tuple.TupleEntry;
例 | 説明 |
---|---|
Tuple tuple = new Tuple(); |
要素が0個のTupleインスタンスを生成する。 |
Tuple tuple = Tuple.size(n); |
要素がn個のTupleインスタンスを生成する。 |
Tuple tuple = new Tuple(値, …); |
データで初期化したTupleインスタンスを生成する。 引数の型はComparable。つまりStringやInteger等。 intやlong等のプリミティブ型はオートボクシングによりIntegerやLongに変換される前提。 |
tuple.add(値); tuple.addAll(値, …); |
tupleにデータを追加する。 |
tuple.set(i, 値); |
指定されたインデックスの位置にデータをセットする。 インデックスが要素数の範囲外だと例外発生。 |
int size = tuple.size(); |
tupleの要素数(データ数)を取得する。 |
Comparable obj = tuple.get(i); String value = tuple.getString(i); int value = tuple.getInteger(i); |
インデックスを指定してtupleから値を取得する。 getString()は、nullならnull、 obj.toString() が返る。getInteger()は、nullなら0、Numberなら intValue() 、それ以外はInteger.parseInt(obj.toString()) の値が返る。 |
TupleEntry entry = new TupleEntry( new Fields("col1", "col2"), tuple ); TupleEntry entry = new TupleEntry( new Fields("col1", "col2") ); entry.setTuple(tuple); Fields fields = new Fields("col1", "col2"); TupleEntry entry = new TupleEntry( fields, Tuple.size(fields.size()) ); |
TupleEntryインスタンスを生成する。 項目名を指定してget()やset()を行う場合、 Fields内部で項目名と位置のマップが作られてキャッシュされるので、 TupleEntryインスタンスは(Fieldsインスタンスも)なるべく使い回す方がよさそう。 (tupleの中身は色々変わるが、Fieldsが使用範囲内で変わるような使い方はしないはず) |
Comparable obj = entry.get(項目名); String value = entry.getString(項目名); int value = entry.getInteger(項目名); |
項目名を指定してentry内のtupleから値を取得する。 |
Comparable obj = entry.get(i); String value = entry.getString(i); int value = entry.getInteger(i); |
インデックスを指定してentry内のtupleから値を取得する。 実のところ、TupleEntryのget系のメソッドは、引数の型がComparableのものしか無い。 引数がNumberであればインデックスとして扱い、Stringであれば項目名としてインデックスを探索する。 |
entry.set(i, 値); entry.set(項目名, 値); |
enrty内のtupleに値をセットする。 第1引数はComparableなので、get()と同様の条件判断が行われる。 |
Fields fields = entry.getFields(); |
項目名一覧を取得する。 |
int pos = entry.getFields().getPos(項目名); |
項目に該当するインデックスを取得する。 |
Tuple tuple = entry.getTuple(); |
entry内のtupleを取得する。 |
Tuple tuple = entry.selectTuple( new Fields("col1") ); |
項目名で絞り込んだ(あるいは項目の並び順を変えた)新しいTupleを返す。 |
Cascadingでは(SQLの様に)項目に名前を付けてそれで指定することが出来るわけだが、名前指定だと毎回インデックスの探索処理が入ってしまう。
高速化を目指すなら、インデックスの取得がなるべく少なくなるようにコーディングすべき。
また、Tuple#equals()は実装されているが、TupleEntry#equals()は実装されていない。
したがって、同名項目で同じ値のつもりであっても、項目の並び順が異なるTuple同士は等しいとは見なされないので注意。
Fieldsは、項目名の並びを保持するクラス。
import cascading.tuple.Fields;
例 | 説明 |
---|---|
new Fields("項目名",…) |
項目名をStringで指定する。 重複した名前を指定することは出来ない。 |
new Fields(番号,…) |
項目のインデックス(番号)を指定する。 入力となるFieldsの並び順を変えたい場合などに使用する。 0から始まる正の数が先頭項目からのインデックスで、 -1から始まる負の数が末尾項目からの指定になる。 (-1が一番末尾、-2がその次、…) →インデックス番号の使用例 |
Fields.size(個数) |
名前を指定せず、項目の個数を指定してFieldsインスタンスを生成する。 |
Fields.join(フィールド,…) |
複数のFieldsを結合した一つのFieldsを返す。 |
fields.rename( |
フィールド内の一部のフィールド名を変更した新しいFieldsを生成する。 Cascading1.1以降。[2010-05-01] |
Fields.ALL |
全項目を表す。 |
Fields.UNKNOWN |
不確定(未定義)を表す。 |
Fields.GROUP |
直前のグルーピングのキー項目を表す。 |
Fields.VALUES |
直前のグルーピングの値項目を表す。 |
Fields.ARGS |
関数の引数の全項目を表す。 |
Fields.RESULTS |
関数の出力項目(出力結果全体)を表す。 |
Fields.FIRST |
new Fields(0) と同じ。つまり先頭項目のみの指定。 |
Fields.LAST |
new Fields(-1) と同じ。つまり末尾項目のみの指定。 |
Fieldsは、使われる場所によって意味合いが異なる。
使用箇所 | 例 | 説明 | |
---|---|---|---|
Pipe | Each Every |
引数argumentSelectorpipe = new Each(pipe, new Fields("項目名",…), new 関数()); pipe = new Each(pipe, Fields.ALL, new 関数()); |
関数へ渡す項目を絞り込む指定。 前パイプのTupleのうち、指定された項目だけを渡す。 Fields.ALLを指定すると、入力Tupleの全項目を渡す。 Fields.GROUPを指定すると、直前のグルーピングのキー項目を渡す。 (直前がGroupByやCoGroupでなかった場合は全項目を渡す) Fields.VALUESを指定すると、直前のグルーピングの値項目を渡す。 省略時はFields.ALL。 |
引数outputSelectorpipe = new Each(pipe, new 関数(), new Fields("出力項目名",…)); pipe = new Each(pipe, new 関数(), Fields.RESULTS); pipe = new Each(pipe, new 関数(), Fields.ALL); |
関数から出力された項目を絞り込む指定。 入力Tupleと関数からの出力結果のうち、指定された項目だけを返す。 Fields.RESULTSを指定すると、関数の出力結果をそのまま返す。 Fields.ALLを指定すると、入力Tupleの末尾に関数の出力結果をくっつけて返す。 省略時はEachはFields.RESULTS、EveryはFields.ALL。 |
||
GroupBy | 引数groupFieldspipe = new GroupBy(pipe, new Fields("項目名",…)); |
グルーピング(ソート)するキー項目の指定。 | |
引数sortFieldspipe = new GroupBy(pipe, new Fields("項目名",…), new Fields("ソート項目名",…) ); |
ソート項目の指定。(セカンダリーソート) グルーピングされたデータ毎にソートしたい場合に指定する。 省略時はnull。 |
||
CoGroup | 引数groupFields・declaredFieldspipe = new CoGroup( pipe1, new Fields("項目名",…), pipe2, new Fields("項目名",…), new Fields("出力項目名",…) ); |
groupFields(引数pipeの直後)は、マッチングキー項目の指定。 省略時はFields.FIRST(先頭項目)。 |
|
declaredFieldsは、出力する際の項目名の指定。 (EachやEveryと異なり、出力する項目の絞り込みではない) |
|||
Function Aggregator |
引数fieldDeclarationpipe = new Each (pipe, new RegexSplitter(new Fields("出力項目名")) ); pipe = new Every(pipe, new Sum(new Fields("出力項目名")) ); |
関数の出力結果の項目名の指定。 省略時のデフォルトの項目名は、関数によって異なる。 |
|
TextLine(source) | 引数sourceFieldsnew TextLine(new Fields("オフセット項目名", "文字列項目名")) new TextLine(TextLine.DEFAULT_SOURCE_FIELDS) new TextLine() new TextLine(new Fields("文字列項目名")) |
sourceに使用するTextLineのsourceFieldsでは、入力項目名を指定する。 項目数が2個のときはオフセットと入力行の項目名。 項目数が1個のときは入力行の項目名のみ。(オフセットが出力されなくなる) TextLineではsourceFieldsを省略した場合のデフォルトはTextLine.DEFAULT_SOURCE_FIELDS(new Fields("offset","line"))だが、 TextLineの親クラスであるSchemeではFields.UNKNOWNがデフォルト。 |
|
TextLine(sink) | 引数sinkFieldsnew TextLine( TextLine.DEFAULT_SOURCE_FIELDS, new Fields("文字列項目名") ) |
sinkに使用するTextLineでは、sinkFieldsで出力対象項目名を指定できる
(出力項目を絞り込む指定)。 Fields.ALLの場合、渡された全項目を出力する。 省略時はFields.ALL。 なお、sinkFieldsを指定する際はsourceFieldsにも何らかの値を指定しなければならない。 使わない項目なのでFields.UNKNOWN等を指定したいところだが、 TextLineでは「sourceFieldsの項目数が1または2」というチェックを行っているので、UNKNOWNは指定できない。 |
Fieldsのコンストラクターでは、項目名を文字列で指定する他に、インデックス番号を指定する方法がある。
項目数や項目名が不定・不明の場合に項目を並び替えたり絞り込んだりすることが出来る。
例 | 解説 | データ例 | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
new Each(pipe, new Fields( 1, 3, 0), new Identity()); new Each(pipe, new Fields( 1, -1, 0), new Identity()); new Each(pipe, new Fields(-3, -1, -4), new Identity()); |
前パイプの出力から項目を絞り込んで順序を変更し、 関数Identityに渡している。 |
|
||||||||||||||||||||
new Each(pipe, new Identity(), new Fields( 1, 3, 0)); new Each(pipe, new Identity(), new Fields( 1, -1, 0)); new Each(pipe, new Identity(), new Fields(-3, -1, -4)); |
関数Identityからの出力項目に対して 項目を絞り込んで順序を変更して出力する。 |