S-JIS[2010-04-11/2010-05-01] 変更履歴

Cascading Tuple

CascadingのTuple・TupleEntryクラスおよびFieldsクラスについて。


Tupleの概要

パイプ内で使用されるデータは、Tupleクラス(タプル)で受け渡しされる。[2010-04-05]
Tap(source)から読み込まれたデータはTupleとしてPipeに渡され、処理した結果もTupleとして出力される。
Tap(sink)もTupleを受け取ってファイルへ出力する。
つまりTupleは、Mapper#map()の引数valueやReducer#reduce()の引数values、出力時のcontext.write()の引数valueに相当する。(というか、実際に渡される)

Tupleは位置づけ的にはJavaBeanのようなもので、Stringやintのフィールドを持つ代わりに個々のデータ(Comparableのサブクラス、具体的にはStringやInteger)をリストを使って保持している。

Tupleは項目名を保持しておらず、List内の位置(インデックス・番号)でしかデータを判別できない。
項目名を扱う場合はTupleEntryが使われる。TupleEntry(厳密にはFields)が項目名とTuple内の位置関係を保持する。

TupleやTupleEntryが使用される場所(引数として渡される場合など)によっては、Tuple内のデータを更新できない(set()やadd()不可)こともある。(更新しようとするとUnsupportedOperationExceptionが発生する)


Tuple・TupleEntryの使用例

import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
説明
Tuple tuple = new Tuple();
要素が0個のTupleインスタンスを生成する。
Tuple tuple = Tuple.size(n);
要素がn個のTupleインスタンスを生成する。
Tuple tuple = new Tuple(値, …);
データで初期化したTupleインスタンスを生成する。
引数の型はComparable。つまりStringやInteger等。
intやlong等のプリミティブ型はオートボクシングによりIntegerやLongに変換される前提。
tuple.add(値);
tuple.addAll(値, …);
tupleにデータを追加する。
tuple.set(i, 値);
指定されたインデックスの位置にデータをセットする。
インデックスが要素数の範囲外だと例外発生。
int size = tuple.size();
tupleの要素数(データ数)を取得する。
Comparable obj = tuple.get(i);
String   value = tuple.getString(i);
int      value = tuple.getInteger(i);
インデックスを指定してtupleから値を取得する。
getString()は、nullならnull、obj.toString()が返る。
getInteger()は、nullなら0、NumberならintValue()、それ以外はInteger.parseInt(obj.toString())の値が返る。
TupleEntry entry = new TupleEntry(
  new Fields("col1", "col2"),
  tuple
);
TupleEntry entry = new TupleEntry(
  new Fields("col1", "col2")
);
entry.setTuple(tuple);
Fields fields = new Fields("col1", "col2");
TupleEntry entry = new TupleEntry(
  fields,
  Tuple.size(fields.size())
);
TupleEntryインスタンスを生成する。

項目名を指定してget()やset()を行う場合、
Fields内部で項目名と位置のマップが作られてキャッシュされるので、
TupleEntryインスタンスは(Fieldsインスタンスも)なるべく使い回す方がよさそう。
(tupleの中身は色々変わるが、Fieldsが使用範囲内で変わるような使い方はしないはず)
Comparable obj = entry.get(項目名);
String   value = entry.getString(項目名);
int      value = entry.getInteger(項目名);
項目名を指定してentry内のtupleから値を取得する。
Comparable obj = entry.get(i);
String   value = entry.getString(i);
int      value = entry.getInteger(i);
インデックスを指定してentry内のtupleから値を取得する。
実のところ、TupleEntryのget系のメソッドは、引数の型がComparableのものしか無い。
引数がNumberであればインデックスとして扱い、Stringであれば項目名としてインデックスを探索する。
entry.set(i, 値);
entry.set(項目名, 値);
enrty内のtupleに値をセットする。
第1引数はComparableなので、get()と同様の条件判断が行われる。
Fields fields = entry.getFields();
項目名一覧を取得する。
int pos = entry.getFields().getPos(項目名);
項目に該当するインデックスを取得する。
Tuple tuple = entry.getTuple();
entry内のtupleを取得する。
Tuple tuple = entry.selectTuple(
  new Fields("col1")
);
項目名で絞り込んだ(あるいは項目の並び順を変えた)新しいTupleを返す。

Cascadingでは(SQLの様に)項目に名前を付けてそれで指定することが出来るわけだが、名前指定だと毎回インデックスの探索処理が入ってしまう。
高速化を目指すなら、インデックスの取得がなるべく少なくなるようにコーディングすべき。

また、Tuple#equals()は実装されているが、TupleEntry#equals()は実装されていない。
したがって、同名項目で同じ値のつもりであっても、項目の並び順が異なるTuple同士は等しいとは見なされないので注意。


Fieldsの概要

Fieldsは、項目名の並びを保持するクラス。

import cascading.tuple.Fields;
Fieldsインスタンスの取得方法
説明
new Fields("項目名",…) 項目名をStringで指定する。
重複した名前を指定することは出来ない。
new Fields(番号,…) 項目のインデックス(番号)を指定する。
入力となるFieldsの並び順を変えたい場合などに使用する。
0から始まる正の数が先頭項目からのインデックスで、
-1から始まる負の数が末尾項目からの指定になる。
(-1が一番末尾、-2がその次、…)
インデックス番号の使用例
Fields.size(個数) 名前を指定せず、項目の個数を指定してFieldsインスタンスを生成する。
Fields.join(フィールド,…) 複数のFieldsを結合した一つのFieldsを返す。
fields.rename(
 new Fields("旧名1",…),
 new Fields("新名1",…)
)
フィールド内の一部のフィールド名を変更した新しいFieldsを生成する。
Cascading1.1以降。[2010-05-01]
Fields.ALL 全項目を表す。
Fields.UNKNOWN 不確定(未定義)を表す。
Fields.GROUP 直前のグルーピングのキー項目を表す。
Fields.VALUES 直前のグルーピングの値項目を表す。
Fields.ARGS 関数の引数の全項目を表す。
Fields.RESULTS 関数の出力項目(出力結果全体)を表す。
Fields.FIRST new Fields(0)と同じ。つまり先頭項目のみの指定。
Fields.LAST new Fields(-1)と同じ。つまり末尾項目のみの指定。

Fieldsの使用例

Fieldsは、使われる場所によって意味合いが異なる。

使用箇所 説明
Pipe Each
Every
引数argumentSelector
pipe = new Each(pipe, new Fields("項目名",…), new 関数());
pipe = new Each(pipe, Fields.ALL, new 関数());
関数へ渡す項目を絞り込む指定。
前パイプのTupleのうち、指定された項目だけを渡す。
Fields.ALLを指定すると、入力Tupleの全項目を渡す。
Fields.GROUPを指定すると、直前のグルーピングのキー項目を渡す。
(直前がGroupByCoGroupでなかった場合は全項目を渡す)
Fields.VALUESを指定すると、直前のグルーピングの値項目を渡す。
省略時はFields.ALL
引数outputSelector
pipe = new Each(pipe, new 関数(), new Fields("出力項目名",…));
pipe = new Each(pipe, new 関数(), Fields.RESULTS);
pipe = new Each(pipe, new 関数(), Fields.ALL);
関数から出力された項目を絞り込む指定。
入力Tupleと関数からの出力結果のうち、指定された項目だけを返す。
Fields.RESULTSを指定すると、関数の出力結果をそのまま返す。
Fields.ALLを指定すると、入力Tupleの末尾に関数の出力結果をくっつけて返す。
省略時はEachFields.RESULTSEveryFields.ALL
GroupBy 引数groupFields
pipe = new GroupBy(pipe, new Fields("項目名",…));
グルーピング(ソート)するキー項目の指定。
引数sortFields
pipe = new GroupBy(pipe,
 new Fields("項目名",…),
 new Fields("ソート項目名",…)
);
ソート項目の指定。(セカンダリーソート)
グルーピングされたデータ毎にソートしたい場合に指定する。
省略時はnull。
CoGroup 引数groupFields・declaredFields
pipe = new CoGroup(
 pipe1, new Fields("項目名",…),
 pipe2, new Fields("項目名",…),
 new Fields("出力項目名",…)
);
groupFields(引数pipeの直後)は、マッチングキー項目の指定。
省略時はFields.FIRST(先頭項目)。
declaredFieldsは、出力する際の項目名の指定。
EachEveryと異なり、出力する項目の絞り込みではない)
Function
Aggregator
引数fieldDeclaration
pipe = new Each (pipe,
 new RegexSplitter(new Fields("出力項目名"))
);
pipe = new Every(pipe,
 new Sum(new Fields("出力項目名"))
);
関数の出力結果の項目名の指定。
省略時のデフォルトの項目名は、関数によって異なる。
TextLine(source 引数sourceFields
new TextLine(new Fields("オフセット項目名", "文字列項目名"))
new TextLine(TextLine.DEFAULT_SOURCE_FIELDS)
new TextLine()
new TextLine(new Fields("文字列項目名"))
sourceに使用するTextLineのsourceFieldsでは、入力項目名を指定する。
項目数が2個のときはオフセットと入力行の項目名。
項目数が1個のときは入力行の項目名のみ。(オフセットが出力されなくなる)
TextLineではsourceFieldsを省略した場合のデフォルトはTextLine.DEFAULT_SOURCE_FIELDS(new Fields("offset","line"))だが、
TextLineの親クラスであるSchemeではFields.UNKNOWNがデフォルト。
TextLine(sink 引数sinkFields
new TextLine(
 TextLine.DEFAULT_SOURCE_FIELDS,
 new Fields("文字列項目名")
)
sinkに使用するTextLineでは、sinkFieldsで出力対象項目名を指定できる (出力項目を絞り込む指定)。
Fields.ALLの場合、渡された全項目を出力する。
省略時はFields.ALL
なお、sinkFieldsを指定する際はsourceFieldsにも何らかの値を指定しなければならない。
使わない項目なのでFields.UNKNOWN等を指定したいところだが、
TextLineでは「sourceFieldsの項目数が1または2」というチェックを行っているので、UNKNOWNは指定できない。

Fieldsのインデックス指定の例

Fieldsのコンストラクターでは、項目名を文字列で指定する他に、インデックス番号を指定する方法がある。
項目数や項目名が不定・不明の場合に項目を並び替えたり絞り込んだりすることが出来る。

解説 データ例
new Each(pipe, new Fields( 1,  3,  0), new Identity());
new Each(pipe, new Fields( 1, -1,  0), new Identity());
new Each(pipe, new Fields(-3, -1, -4), new Identity());
前パイプの出力から項目を絞り込んで順序を変更し、
関数Identityに渡している。
0
-4
1
-3
2
-2
3
-1
a b c d
1
-3
3
-1
0
-4
 
b d a  
new Each(pipe, new Identity(), new Fields( 1,  3,  0));
new Each(pipe, new Identity(), new Fields( 1, -1,  0));
new Each(pipe, new Identity(), new Fields(-3, -1, -4));
関数Identityからの出力項目に対して
項目を絞り込んで順序を変更して出力する。

Cascadingへ戻る / Hadoopへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま