CascadingのPipeのサブクラスであるCoGroupクラスについて。
Cascadingにおいて、複数パイプの結合を行うのがCoGroup。
指定されたキーでマッチングし、全項目を出力する。
片方のパイプにしかデータが存在しない場合に出力する/しない という制御を行う為に、結合方法(InnerJoinやLeftJoin等)が指定できる。
import cascading.pipe.CoGroup;
CoGroupはSQLのjoin(結合)に相当する。
(が、SQLのjoinの場合はselectで指定した項目しか出力されないのに対し、CoGroupでは入力タプル内の全項目が出力されるところが大きく異なる)
例 | 説明 |
---|---|
pipe = new CoGroup( |
指定された項目同士で2つのパイプをマッチングする。 出力項目名は入力と異なる項目名を指定する為に使うものであり、 出力項目を絞る(項目数を減らす)為のものではない。 出力項目を省略した場合は入力の項目名でそのまま出力される。 出力項目名の中に重複した名前があるとエラーになる。 結合方法を省略した場合はInnerJoin扱い。 |
pipe = new CoGroup( |
2つ以上のパイプを指定できる。 |
pipe = new CoGroup(pipe1, …); |
2つ以上のパイプを指定する。 マッチングキーは各パイプのタプルの第1項目。 出力項目名は入力項目名そのまま。 結合方法はInnerJoin。 |
第1引数に新パイプ名を入れるコンストラクターもある。
新パイプ名を省略すると、各パイプの名前を「*」でつないだ名前になる。(例:"pipe1"と"pipe2"なら、"pipe1*pipe2"になる)
CoGroupは自前でソートを行うので、(Everyと違って)事前にGroupByでソートしておく必要は無い。
(CoGroupばGroupByの仲間なので)
CoGroupでは、マッチした全データ(入力タプルの全項目)が出力される。
SQLならselectで出力する項目を指定できる(絞り込める)し、EachやEveryもoutputSelectorで絞り込めるが、CoGroupでは出来ない。
結合したいパイプの中で同名項目があった場合、そのままではエラーになる。
Pipe pipe = new CoGroup( pipe1, new Fields("id1"), //pipe1のid1という項目と pipe2, new Fields("id2"), //pipe2のid2という項目でマッチングする new InnerJoin() );
Exception in thread "main" cascading.flow.PlannerException: could not build flow from assembly: [[pipe1*pipe2][jp.hishidama.hadoop.cascading.join.JoinSample.main(JoinSample.java:64)] found duplicate field names in cogrouped tuple stream: ['id1', 'word']['id2', 'word']] at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:225) at cascading.flow.FlowConnector.connect(FlowConnector.java:452) at cascading.flow.FlowConnector.connect(FlowConnector.java:434) at cascading.flow.FlowConnector.connect(FlowConnector.java:315) at jp.hishidama.hadoop.cascading.join.JoinSample.main(JoinSample.java:70) Caused by: cascading.pipe.OperatorException: [pipe1*pipe2][jp.hishidama.hadoop.cascading.join.JoinSample.main(JoinSample.java:64)] found duplicate field names in cogrouped tuple stream: ['id1', 'word']['id2', 'word'] at cascading.pipe.Group.resolveDeclared(Group.java:1017) at cascading.pipe.Group.outgoingScopeFor(Group.java:849) at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:509) at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:491) at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:186) ... 4 more Caused by: cascading.tuple.TupleException: field name already exists: word at cascading.tuple.Fields.copy(Fields.java:844) at cascading.tuple.Fields.append(Fields.java:824) at cascading.pipe.Group.resolveDeclared(Group.java:1008) ... 8 more
この場合、CoGroupのコンストラクターの引数declaredFieldsで出力用の項目名を指定すると大丈夫。
Pipe pipe = new CoGroup( pipe1, new Fields("id1"), pipe2, new Fields("id2"), new Fields("id", "word1", "dummy-id", "word2"), new InnerJoin()); // さらに項目を絞り込む例 pipe = new Each(pipe, new Fields("id", "word1", "word2"), new Identity());
しかしこのやり方だと、pipe1やpipe2の項目を増減させた場合に、CoGroupで指定する項目もそれに合わせて変更する必要があり、とてもとってもとーっても不便。
結合する予定のパイプでは、最初から重複しないような項目名を付ける方が良いと思う。
→自動的に重複しない名前をつけるCoGroupEx
CoGroupには、出力項目を絞る(選択する)フィールド指定(EachやEveryのoutputSelector)は無い。[2010-04-13]
(CoGroupのコンストラクターの引数declaredFieldsは出力項目の名前を指定するものであって、項目数を減らすことは出来ない)
しかしCoGroupはイテレーターを出力するだけであって、ソートされたデータそのものを出力している訳ではないようだ。
つまり後続のEach等の処理で初めて(CoGroupで作成されたイテレーターを使って)ソートされたデータの読み取りを行う為、絞り込みが必要なら、そちらの指定で行えばよいことになる。
CoGroupでFunctionやAggregatorといった関数が指定できないのも、実際のデータ加工を行うのは後続処理だからだろう。
(つまりCoGroupは本当にGroupByと同じくソートをしているだけ。GroupByも関数の指定なんかしないよなー)
Cascading1.0のCoGroupでは、出力されるグループキーの項目名が定義されない。[2010-04-20]
つまりグループキーの項目数に応じて0,1,…という番号が付くだけで、項目名が付かない。
したがって、後続のパイプでグループキーを項目名で指定することが出来ない。
(後続パイプでFields.ALL(EachやGroupByのデフォルト)を指定した場合は、CoGroupのdeclaredFieldsで指定した項目だけが対象となるので、グループキーを扱うことが無いので関係ない。
Everyでは前パイプのグループキーが暗黙に使われるので、実行は出来るが、出力項目名は0,1,…のままになる。
つまり後続パイプでFields.GROUPを使うと、項目名が0,1,…になっている)
Pipe pipe1 = 〜; //出力項目名がkey1,data1 Pipe pipe2 = 〜; //出力項目名がkey1,data2 Pipe pipe = new CoGroup("join", pipe1, new Fields("key1"), pipe2, new Fields("key1"), new Fields("key1", "data1", "key1-dummy", "data2") //出力項目名(declaredFields) ); // pipe = new Each(pipe, Fields.GROUP, new Debug(true)); // pipe = new Each(pipe, new Fields("key1"), new Debug(true)); pipe = new Every(pipe, new Count("count"), new Fields("key1", "count")); //エラーが出る
Exception in thread "main" cascading.flow.PlannerException: could not build flow from assembly: [unable to resolve selector using incoming: [{1}:0] declared: [{1}:'count']] at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:225) at cascading.flow.FlowConnector.connect(FlowConnector.java:452) at cascading.flow.FlowConnector.connect(FlowConnector.java:434) at cascading.flow.FlowConnector.connect(FlowConnector.java:315) at cascading.flow.FlowConnector.connect(FlowConnector.java:260) at jp.hishidama.hadoop.cascading.student.GroupSample.run(GroupSample.java:293) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79) at jp.hishidama.hadoop.cascading.student.GroupSample.main(GroupSample.java:300) Caused by: cascading.pipe.OperatorException: unable to resolve selector using incoming: [{1}:0] declared: [{1}:'count'] at cascading.pipe.Operator.resolveOutgoingSelector(Operator.java:301) at cascading.pipe.Every.resolveOutgoingGroupingSelector(Every.java:270) at cascading.pipe.Every.outgoingScopeFor(Every.java:257) at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:509) at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:491) at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:186) ... 8 more Caused by: cascading.tuple.TupleException: selector did not find fields: [key1] in [0, count] at cascading.tuple.Fields.resolve(Fields.java:269) at cascading.pipe.Operator.resolveOutgoingSelector(Operator.java:297) ... 13 more
上記のCoGroupで出力項目名にkey1とつけているので、後続のEveryでその名前が指定できるような気がする。
しかしEveryでは、前パイプのキー項目(グループ項目)は出力項目から除外されて別に扱われるので、グループ項目に該当しているkey1は指定できない。
(Eachでは、CoGroupのdeclaredFieldsで指定した項目名key1がそのまま使える)
ただし、Cascading1.1では、CoGroupで指定したグループキー項目名が全パイプで一致していれば、その名前が使われるようになる。
つまりこの状態(pipe1もpipe2もグループキー項目名が同じkey1)なら後続のEveryでkey1という名前を使うことが出来る。
1つでも一致していなければCascading1.0と同様に番号だけの項目になってしまうが、
Cascading1.1では、CoGroupのコンストラクターの引数resultGroupFieldsでグループ項目名を指定することも出来るようになった。
Cascading1.0なら、CoGroupとEveryの間に(無駄なソートになってしまうが)GroupByを入れるか、
全ての後続処理でnew Fields(0)
というように項目名を使わず番号を指定すれば、一応OK。
//間にGroupByを入れる方法 Pipe pipe = new CoGroup("join", pipe1, new Fields("key1"), pipe2, new Fields("key1"), new Fields("key1", "data1", "key1-dummy", "data2") //出力項目名(declaredFields) ); pipe = new GroupBy(pipe, new Fields("key1")); //無駄なソートではあるが… pipe = new Every(pipe, new Count("count"), new Fields("key1", "count")); //OK
//インデックス番号を使う方法 Pipe pipe = new CoGroup("join", pipe1, new Fields("key1"), pipe2, new Fields("key1"), new Fields("key1", "data1", "key1-dummy", "data2") //出力項目名(declaredFields) ); pipe = new Every(pipe, new Count("count"), new Fields(0, "count")); //OK
グループ項目名が等しいパターン | グループ項目名が異なるパターン | |
---|---|---|
pipe1の項目名 | new Fields("key1", "key2", "data1") |
new Fields("key11", "key12", "data1") |
pipe2の項目名 | new Fields("key1", "key2", "data2") |
new Fields("key21", "key22", "data2") |
CoGroupのpipe1のキー | new Fields("key1", "key2") |
new Fields("key11", "key12") |
CoGroupのpipe2のキー | new Fields("key1", "key2") |
new Fields("key21", "key22") |
CoGroupのdeclaredFields指定 (出力後の全項目) |
new Fields( |
new Fields( |
出力後のグループキー (Cascading1.0) |
new Fields(0, 1) |
new Fields(0, 1) |
出力後のグループキー (Cascading1.1) |
new Fields("key1", "key2") |
CoGroupにおける結合方法をJoinerで指定する。
Joinerはインターフェースであり、CoGroupにはJoinerの具象クラスのインスタンスを指定する。
pipe = new CoGroup(〜, new InnerJoin());
クラス名 | 説明 | コーディング例 | 出力サンプル | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
cascading.pipe.cogroup. InnerJoin |
内部結合。 双方のパイプに存在しているデータのみを出力する。 |
new InnerJoin() |
|
||||||||||||||||||||||||
cascading.pipe.cogroup. LeftJoin |
左結合。 左側のパイプのデータを全て出力し、右側のパイプからはそれにマッチするデータだけ出力する。 右側のパイプに存在しないキーでは、nullが出力される。 |
new LeftJoin() |
|
||||||||||||||||||||||||
cascading.pipe.cogroup. RightJoin |
右結合。 右側のパイプのデータを全て出力し、左側のパイプからはそれにマッチするデータだけ出力する。 左側のパイプに存在しないキーでは、nullが出力される。 |
new RightJoin() |
|
||||||||||||||||||||||||
cascading.pipe.cogroup. OuterJoin |
外部結合。 双方のパイプのデータを全て出力する。 存在しないキーでは、nullが出力される。 |
new OuterJoin() |
|
||||||||||||||||||||||||
cascading.pipe.cogroup. MixedJoin |
3つ以上のパイプを使う際に、それぞれの結合方法をInnerJoinとするかOuterJoinとするかを指定できる。 コンストラクターの引数がtrueだとinner、falseだとouter。 (MixedJoin内に定数フィールドが用意されている) |
new MixedJoin( |
input/file01.txt | input/file02.txt |
key1,data11 key3,data13 key4,data14 |
key2,data22 key3,data23 key5,data25 |
public class JoinSample { public static final String F_ID1 = "id1"; public static final String F_WORD1 = "word1"; public static final String F_ID2 = "id2"; public static final String F_WORD2 = "word2"; public static void main(String[] args) { // 入力ファイルの指定 Tap source1 = new Hfs(new TextLine(new Fields("line")), "file:///C:/cygwin/home/hadoop/join/input/file01.txt"); Tap source2 = new Hfs(new TextLine(new Fields("line")), "file:///C:/cygwin/home/hadoop/join/input/file02.txt"); // 出力ディレクトリーの指定 Tap sink = new Hfs(new TextLine(), "file:///C:/cygwin/home/hadoop/join/output/", SinkMode.REPLACE); // パイプの作成:入力行をカンマ区切りで分割 Pipe pipe1 = new Pipe("pipe1"); pipe1 = new Each(pipe1, new RegexSplitter(new Fields(F_ID1, F_WORD1), ",")); Pipe pipe2 = new Pipe("pipe2"); pipe2 = new Each(pipe2, new RegexSplitter(new Fields(F_ID2, F_WORD2), ",")); Joiner join = new InnerJoin(); // Joiner join = new LeftJoin(); // Joiner join = new RightJoin(); // Joiner join = new OuterJoin(); // id1とid2で結合(pipe1が左でpipe2が右、つまりLeftJoinではpipe1が主たるデータとなる) Pipe pipe = new CoGroup(pipe1, new Fields(F_ID1), pipe2, new Fields(F_ID2), join); // パイプに入力ファイルを紐付ける Map<String, Tap> sources = new HashMap<String, Tap>(); sources.put(pipe1.getName(), source1); sources.put(pipe2.getName(), source2); FlowConnector flowConnector = new FlowConnector(); Flow flow = flowConnector.connect("join-sample", sources, sink, pipe); flow.complete(); //実行 } }
前述の通り、CoGroupでは、出力項目名が重複していてはいけない。[2010-04-23]
しかし1つのパイプを分岐させてから結合したい場合など、項目名は簡単に重複してしまう。
(少なくとも結合用のキー項目名はよく重複する。と言うより、結合に使う項目は同一の名前の方が分かりやすい)
declaredFieldsで出力項目名を指定できるが、入力項目数に合わせて全項目名を書かなければならないので、生産性が非常に悪い。
前パイプに項目を追加すると、それについてもdeclaredFieldsに追加しなければならないのは、かなり面倒。
(Cascadingのプログラムで一番大変なのは、「前パイプの出力項目が何で、自パイプの入力項目をどれにして出力項目が何になるか」を把握すること)
という訳で、出力項目名が重複していたら自動的にダミーの名前に置き換えるCoGroupを作ってみた。→ソース
前提として、名前がかぶっているからには値は同一であり、どちらの値が有効(どちらの名前がダミー)になっても構わない、という考えをしている。
つまり、左側の項目名を正とし、後から出てきた重複名を変更している。
public class CoGroupEx extends CoGroup { 〜コンストラクター〜
/** * 出力項目名(declaredFields)がnullの場合、項目名を定義してから、スーパークラスのメソッドを呼び出す。 */ @Override public Scope outgoingScopeFor(Set<Scope> incomingScopes) { if (getDeclaredFields() == null) { super.declaredFields = createDeclaredFields(incomingScopes); } return super.outgoingScopeFor(incomingScopes); } /** * 出力フィールドを生成する。 * * @param incomingScopes 入力スコープ * @return フィールド名 * @see Group#resolveDeclared(Set<Scope>) */ protected Fields createDeclaredFields(Set<Scope> incomingScopes) { Map<String, Scope> scopesMap = new HashMap<String, Scope>(); for (Scope incomingScope : incomingScopes) { scopesMap.put(incomingScope.getName(), incomingScope); } Set<String> nameSet = new HashSet<String>(); Fields appendedFields = new Fields(); Pipe[] pipes = getPrevious(); for (int i = 0; i < pipes.length; i++) { Pipe pipe = pipes[i]; Fields appendableField = resolveFields(scopesMap.get(pipe.getName())); int sz = appendableField.size(); Comparable<?>[] names = new Comparable[sz]; for (int j = 0; j < sz; j++) { Comparable<?> name = appendableField.get(j); if (name instanceof String) { if (nameSet.contains(name)) { name = convertFieldName(i, pipe, (String) name); } nameSet.add((String) name); } names[j] = name; } appendedFields = appendedFields.append(new Fields(names)); } return appendedFields; } /** * 重複した名前を別の名前に変換する。 * * @param i パイプの番号 * @param pipe 対象フィールドの属しているパイプ * @param fieldName フィールド名 * @return 変換されたフィールド名 */ protected String convertFieldName(int i, Pipe pipe, String fieldName) { return pipe.getName() + "." + fieldName; } }
Cascadingでは、Pipe#outgoingScopeFor()というメソッドで出力項目を決定しているらしい。
そこで このメソッドをオーバーライドし、declaredFieldsが未定義(null)だったらdeclaredFieldsを定義するようにしている。
Cascadingでは、実際にFlowが実行されるまで、どんな項目名が入ってくるか分からない。
言い換えると、Pipe(CoGroup)のコンストラクター時点では、項目名を決定できない。
なぜかと言うと、EachやEveryではFields.ALLを使って前パイプの項目名をそのまま後ろに渡す事があるので、パイプの一番先頭の項目名が分かる必要がある。 しかし一番先頭の項目名はTap(source)によって決まるのであり、どのパイプがどのsourceと結び付けられるのかは、FlowConnector#connect()で指定するから。
したがってコンストラクター内で項目名を判断してdeclaredFieldsを書き換えるという事は出来ない。ので、outgoingScopeFor()をオーバーライドする方法を採ってみた。outgoingScopeFor()の引数に使われているScopeは、前パイプの項目名情報を保持しているクラス。
パイプ間の項目名情報の受け渡しに使用される。
createDeclaredFields()は、単純に項目名のSetを作り、重複していないかどうかチェックしているだけ。
convertFieldName()で実際の項目名の変換を行っている。
見て分かるとおり、パイプ名を接頭辞として付けているだけ。CoGroupでは指定された複数のパイプの名前が重複することは無い(重複しているとエラーになる)ので、項目名が同じなら、パイプ名を付加すれば必ず異なる項目名になる。
(ただし、変換後の名前とさらに重複する項目名があったら、CoGroupExではそこまでチェックしてないので、エラーになってしまう)
public static final String F_ID = "id"; public static final String F_WORD1 = "word1"; public static final String F_WORD2 = "word2";
// パイプの作成:入力行をカンマ区切りで分割
Pipe pipe1 = new Pipe("pipe1");
pipe1 = new Each(pipe1, new RegexSplitter(new Fields(F_ID, F_WORD1), ","));
Pipe pipe2 = new Pipe("pipe2");
pipe2 = new Each(pipe2, new RegexSplitter(new Fields(F_ID, F_WORD2), ","));
// idとidで結合
// Pipe pipe = new CoGroup (pipe1, new Fields(F_ID), pipe2, new Fields(F_ID), new Fields(F_ID, F_WORD1, "dummy", F_WORD2));
Pipe pipe = new CoGroupEx(pipe1, new Fields(F_ID), pipe2, new Fields(F_ID));
CoGroup | CoGroupEx | |||
---|---|---|---|---|
pipe1からの項目 | "id", "word1" |
"id", "word1" |
||
pipe2からの項目 | "id", "word2" |
"id", "word2" |
||
出力項目 | "id", "word1", "dummy", "word2" |
"id", "word1", "pipe2.id", "word2" |