S-JIS[2010-04-08/2010-04-23] 変更履歴

Cascading CoGroup

CascadingPipeのサブクラスであるCoGroupクラスについて。


CoGroupの概要

Cascadingにおいて、複数パイプの結合を行うのがCoGroup

指定されたキーでマッチングし、全項目を出力する。
片方のパイプにしかデータが存在しない場合に出力する/しない という制御を行う為に、結合方法(InnerJoinやLeftJoin等)が指定できる。

import cascading.pipe.CoGroup;

CoGroupはSQLのjoin(結合)に相当する。
(が、SQLのjoinの場合はselectで指定した項目しか出力されないのに対し、CoGroupでは入力タプル内の全項目が出力されるところが大きく異なる)

説明
pipe = new CoGroup(
 pipe1, new Fields(項目名,…),
 pipe2, new Fields(項目名,…),
 new Fields(出力項目名,…),
 結合方法
);
指定された項目同士で2つのパイプをマッチングする。

出力項目名入力と異なる項目名を指定する為に使うものであり、
出力項目を絞る(項目数を減らす)為のものではない。
出力項目を省略した場合は入力の項目名でそのまま出力される。
出力項目名の中に重複した名前があるとエラーになる

結合方法を省略した場合はInnerJoin扱い。
pipe = new CoGroup(
 Pipe.pipes(pipe1, …),
 Fields.fields(new Fields(項目名,…), …),
 new Fields(出力項目名,…),
 結合方法
);
2つ以上のパイプを指定できる。
pipe = new CoGroup(pipe1, …); 2つ以上のパイプを指定する。
マッチングキーは各パイプのタプルの第1項目。
出力項目名は入力項目名そのまま。
結合方法はInnerJoin

第1引数に新パイプ名を入れるコンストラクターもある。
新パイプ名を省略すると、各パイプの名前を「*」でつないだ名前になる。(例:"pipe1"と"pipe2"なら、"pipe1*pipe2"になる)

CoGroupは自前でソートを行うので、(Everyと違って)事前にGroupByでソートしておく必要は無い。
(CoGroupばGroupByの仲間なので)


出力項目名の注意

CoGroupでは、マッチした全データ(入力タプルの全項目)が出力される。
SQLならselectで出力する項目を指定できる(絞り込める)し、EachEveryoutputSelectorで絞り込めるが、CoGroupでは出来ない。

結合したいパイプの中で同名項目があった場合、そのままではエラーになる。

「word」という名前の項目が重複している場合の例:

	Pipe pipe = new CoGroup(
		pipe1, new Fields("id1"),	//pipe1のid1という項目と
		pipe2, new Fields("id2"),	//pipe2のid2という項目でマッチングする
		new InnerJoin()
	);
Exception in thread "main" cascading.flow.PlannerException: could not build flow from assembly:
 [[pipe1*pipe2][jp.hishidama.hadoop.cascading.join.JoinSample.main(JoinSample.java:64)]
 found duplicate field names in cogrouped tuple stream: ['id1', 'word']['id2', 'word']]
	at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:225)
	at cascading.flow.FlowConnector.connect(FlowConnector.java:452)
	at cascading.flow.FlowConnector.connect(FlowConnector.java:434)
	at cascading.flow.FlowConnector.connect(FlowConnector.java:315)
	at jp.hishidama.hadoop.cascading.join.JoinSample.main(JoinSample.java:70)
Caused by: cascading.pipe.OperatorException: [pipe1*pipe2][jp.hishidama.hadoop.cascading.join.JoinSample.main(JoinSample.java:64)]
 found duplicate field names in cogrouped tuple stream: ['id1', 'word']['id2', 'word']
	at cascading.pipe.Group.resolveDeclared(Group.java:1017)
	at cascading.pipe.Group.outgoingScopeFor(Group.java:849)
	at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:509)
	at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:491)
	at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:186)
	... 4 more
Caused by: cascading.tuple.TupleException: field name already exists: word
	at cascading.tuple.Fields.copy(Fields.java:844)
	at cascading.tuple.Fields.append(Fields.java:824)
	at cascading.pipe.Group.resolveDeclared(Group.java:1008)
	... 8 more

この場合、CoGroupのコンストラクターの引数declaredFieldsで出力用の項目名を指定すると大丈夫。

	Pipe pipe = new CoGroup(
		pipe1, new Fields("id1"),
		pipe2, new Fields("id2"),
		new Fields("id", "word1", "dummy-id", "word2"),
		new InnerJoin());

	// さらに項目を絞り込む例
	pipe = new Each(pipe, new Fields("id", "word1", "word2"), new Identity());

しかしこのやり方だと、pipe1やpipe2の項目を増減させた場合に、CoGroupで指定する項目もそれに合わせて変更する必要があり、とてもとってもとーっても不便。
結合する予定のパイプでは、最初から重複しないような項目名を付ける方が良いと思う。
自動的に重複しない名前をつけるCoGroupEx


出力項目の選択(絞り込み)

CoGroupには、出力項目を絞る(選択する)フィールド指定(EachEveryoutputSelector)は無い。[2010-04-13]
(CoGroupのコンストラクターの引数declaredFieldsは出力項目の名前を指定するものであって、項目数を減らすことは出来ない)

しかしCoGroupはイテレーターを出力するだけであって、ソートされたデータそのものを出力している訳ではないようだ。
つまり後続のEach等の処理で初めて(CoGroupで作成されたイテレーターを使って)ソートされたデータの読み取りを行う為、絞り込みが必要なら、そちらの指定で行えばよいことになる。

CoGroupでFunctionAggregatorといった関数が指定できないのも、実際のデータ加工を行うのは後続処理だからだろう。
(つまりCoGroupは本当にGroupByと同じくソートをしているだけ。GroupByも関数の指定なんかしないよなー)


出力グループ名

Cascading1.0のCoGroupでは、出力されるグループキーの項目名が定義されない。[2010-04-20]
つまりグループキーの項目数に応じて0,1,…という番号が付くだけで、項目名が付かない
したがって、後続のパイプでグループキーを項目名で指定することが出来ない。
(後続パイプでFields.ALLEachGroupByのデフォルト)を指定した場合は、CoGroupのdeclaredFieldsで指定した項目だけが対象となるので、グループキーを扱うことが無いので関係ない。
 Everyでは前パイプのグループキーが暗黙に使われるので、実行は出来るが、出力項目名は0,1,…のままになる。
 つまり後続パイプでFields.GROUPを使うと、項目名が0,1,…になっている)

例:

	Pipe pipe1 = 〜;	//出力項目名がkey1,data1
	Pipe pipe2 = 〜;	//出力項目名がkey1,data2

	Pipe pipe = new CoGroup("join",
		pipe1, new Fields("key1"),
		pipe2, new Fields("key1"),
		new Fields("key1", "data1", "key1-dummy", "data2")	//出力項目名(declaredFields)
	);

//	pipe = new Each(pipe, Fields.GROUP,       new Debug(true));
//	pipe = new Each(pipe, new Fields("key1"), new Debug(true));

	pipe = new Every(pipe, new Count("count"), new Fields("key1", "count"));	//エラーが出る
Exception in thread "main" cascading.flow.PlannerException: could not build flow from assembly: [unable to resolve selector using incoming: [{1}:0] declared: [{1}:'count']]
	at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:225)
	at cascading.flow.FlowConnector.connect(FlowConnector.java:452)
	at cascading.flow.FlowConnector.connect(FlowConnector.java:434)
	at cascading.flow.FlowConnector.connect(FlowConnector.java:315)
	at cascading.flow.FlowConnector.connect(FlowConnector.java:260)
	at jp.hishidama.hadoop.cascading.student.GroupSample.run(GroupSample.java:293)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
	at jp.hishidama.hadoop.cascading.student.GroupSample.main(GroupSample.java:300)
Caused by: cascading.pipe.OperatorException: unable to resolve selector using incoming: [{1}:0] declared: [{1}:'count']
	at cascading.pipe.Operator.resolveOutgoingSelector(Operator.java:301)
	at cascading.pipe.Every.resolveOutgoingGroupingSelector(Every.java:270)
	at cascading.pipe.Every.outgoingScopeFor(Every.java:257)
	at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:509)
	at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:491)
	at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:186)
	... 8 more
Caused by: cascading.tuple.TupleException: selector did not find fields: [key1] in [0, count]
	at cascading.tuple.Fields.resolve(Fields.java:269)
	at cascading.pipe.Operator.resolveOutgoingSelector(Operator.java:297)
	... 13 more

上記のCoGroupで出力項目名にkey1とつけているので、後続のEveryでその名前が指定できるような気がする。
しかしEveryでは、前パイプのキー項目(グループ項目)は出力項目から除外されて別に扱われるので、グループ項目に該当しているkey1は指定できない。
(Eachでは、CoGroupのdeclaredFieldsで指定した項目名key1がそのまま使える)

ただし、Cascading1.1では、CoGroupで指定したグループキー項目名が全パイプで一致していれば、その名前が使われるようになる。
つまりこの状態(pipe1もpipe2もグループキー項目名が同じkey1)なら後続のEveryでkey1という名前を使うことが出来る。
1つでも一致していなければCascading1.0と同様に番号だけの項目になってしまうが、
Cascading1.1では、CoGroupのコンストラクターの引数resultGroupFieldsでグループ項目名を指定することも出来るようになった。

Cascading1.0なら、CoGroupとEveryの間に(無駄なソートになってしまうが)GroupByを入れるか、
全ての後続処理でnew Fields(0)というように項目名を使わず番号を指定すれば、一応OK。

//間にGroupByを入れる方法
	Pipe pipe = new CoGroup("join",
		pipe1, new Fields("key1"),
		pipe2, new Fields("key1"),
		new Fields("key1", "data1", "key1-dummy", "data2")	//出力項目名(declaredFields)
	);

	pipe = new GroupBy(pipe, new Fields("key1")); //無駄なソートではあるが…

	pipe = new Every(pipe, new Count("count"), new Fields("key1", "count"));	//OK
//インデックス番号を使う方法
	Pipe pipe = new CoGroup("join",
		pipe1, new Fields("key1"),
		pipe2, new Fields("key1"),
		new Fields("key1", "data1", "key1-dummy", "data2")	//出力項目名(declaredFields)
	);

	pipe = new Every(pipe, new Count("count"), new Fields(0, "count"));	//OK
項目名のパターンのまとめ
  グループ項目名が等しいパターン グループ項目名が異なるパターン
pipe1の項目名 new Fields("key1", "key2", "data1") new Fields("key11", "key12", "data1")
pipe2の項目名 new Fields("key1", "key2", "data2") new Fields("key21", "key22", "data2")
CoGroupのpipe1のキー new Fields("key1", "key2") new Fields("key11", "key12")
CoGroupのpipe2のキー new Fields("key1", "key2") new Fields("key21", "key22")
CoGroupのdeclaredFields指定
(出力後の全項目)
new Fields(
"key1", "key2", "data1",
"dmy1", "dmy2", "data2")
new Fields(
"key1", "key2", "data1",
"dmy1", "dmy2", "data2")
出力後のグループキー
(Cascading1.0)
new Fields(0, 1)
Fields.size(2)
new Fields(0, 1)
Fields.size(2)
出力後のグループキー
(Cascading1.1)
new Fields("key1", "key2")

結合方法(Joiner)

CoGroupにおける結合方法をJoinerで指定する。

Joinerはインターフェースであり、CoGroupにはJoinerの具象クラスのインスタンスを指定する。

	pipe = new CoGroup(〜, new InnerJoin());
クラス名 説明 コーディング例 出力サンプル
cascading.pipe.cogroup.
InnerJoin
内部結合。
双方のパイプに存在しているデータのみを出力する。
new InnerJoin()
id1 word1 id2 word2
key3 data13 key3 data23
cascading.pipe.cogroup.
LeftJoin
左結合。
左側のパイプのデータを全て出力し、右側のパイプからはそれにマッチするデータだけ出力する。
右側のパイプに存在しないキーでは、nullが出力される。
new LeftJoin()
id1 word1 id2 word2
key1 data11 null null
key3 data13 key3 data23
key4 data14 null null
cascading.pipe.cogroup.
RightJoin
右結合。
右側のパイプのデータを全て出力し、左側のパイプからはそれにマッチするデータだけ出力する。
左側のパイプに存在しないキーでは、nullが出力される。
new RightJoin()
id1 word1 id2 word2
null null key2 data22
key3 data13 key3 data23
null null key5 data25
cascading.pipe.cogroup.
OuterJoin
外部結合。
双方のパイプのデータを全て出力する。
存在しないキーでは、nullが出力される。
new OuterJoin()
id1 word1 id2 word2
key1 data11 null null
null null key2 data22
key3 data13 key3 data23
key4 data14 null null
null null key5 data25
cascading.pipe.cogroup.
MixedJoin
3つ以上のパイプを使う際に、それぞれの結合方法をInnerJoinとするかOuterJoinとするかを指定できる。
コンストラクターの引数がtrueだとinner、falseだとouter。
(MixedJoin内に定数フィールドが用意されている)
new MixedJoin(
 new boolean[]{
  MixedJoin.INNER,
  MixedJoin.OUTER,
  MixedJoin.OUTER
 }
)
 

入力データのサンプル

input/file01.txt input/file02.txt
key1,data11
key3,data13
key4,data14
key2,data22
key3,data23
key5,data25

結合のコーディング例

public class JoinSample {

	public static final String F_ID1   = "id1";
	public static final String F_WORD1 = "word1";
	public static final String F_ID2   = "id2";
	public static final String F_WORD2 = "word2";

	public static void main(String[] args) {
		// 入力ファイルの指定
		Tap source1 = new Hfs(new TextLine(new Fields("line")), "file:///C:/cygwin/home/hadoop/join/input/file01.txt");
		Tap source2 = new Hfs(new TextLine(new Fields("line")), "file:///C:/cygwin/home/hadoop/join/input/file02.txt");

		// 出力ディレクトリーの指定
		Tap sink = new Hfs(new TextLine(), "file:///C:/cygwin/home/hadoop/join/output/", SinkMode.REPLACE);

		// パイプの作成:入力行をカンマ区切りで分割
		Pipe pipe1 = new Pipe("pipe1");
		pipe1 = new Each(pipe1, new RegexSplitter(new Fields(F_ID1, F_WORD1), ","));

		Pipe pipe2 = new Pipe("pipe2");
		pipe2 = new Each(pipe2, new RegexSplitter(new Fields(F_ID2, F_WORD2), ","));

		Joiner join = new InnerJoin();
//		Joiner join = new LeftJoin();
//		Joiner join = new RightJoin();
//		Joiner join = new OuterJoin();

		// id1とid2で結合(pipe1が左でpipe2が右、つまりLeftJoinではpipe1が主たるデータとなる)
		Pipe pipe = new CoGroup(pipe1, new Fields(F_ID1), pipe2, new Fields(F_ID2), join);

		// パイプに入力ファイルを紐付ける
		Map<String, Tap> sources = new HashMap<String, Tap>();
		sources.put(pipe1.getName(), source1);
		sources.put(pipe2.getName(), source2);

		FlowConnector flowConnector = new FlowConnector();
		Flow flow = flowConnector.connect("join-sample", sources, sink, pipe);
		flow.complete(); //実行
	}
}

CoGroupEx

前述の通り、CoGroupでは、出力項目名が重複していてはいけない。[2010-04-23]

しかし1つのパイプを分岐させてから結合したい場合など、項目名は簡単に重複してしまう。
(少なくとも結合用のキー項目名はよく重複する。と言うより、結合に使う項目は同一の名前の方が分かりやすい)
declaredFieldsで出力項目名を指定できるが、入力項目数に合わせて全項目名を書かなければならないので、生産性が非常に悪い。
前パイプに項目を追加すると、それについてもdeclaredFieldsに追加しなければならないのは、かなり面倒。
(Cascadingのプログラムで一番大変なのは、「前パイプの出力項目が何で、自パイプの入力項目をどれにして出力項目が何になるか」を把握すること)

という訳で、出力項目名が重複していたら自動的にダミーの名前に置き換えるCoGroupを作ってみた。→ソース
前提として、名前がかぶっているからには値は同一であり、どちらの値が有効(どちらの名前がダミー)になっても構わない、という考えをしている。
つまり、左側の項目名を正とし、後から出てきた重複名を変更している。

public class CoGroupEx extends CoGroup {

	〜コンストラクター〜
	/**
	 * 出力項目名(declaredFields)がnullの場合、項目名を定義してから、スーパークラスのメソッドを呼び出す。
	 */
	@Override
	public Scope outgoingScopeFor(Set<Scope> incomingScopes) {
		if (getDeclaredFields() == null) {
			super.declaredFields = createDeclaredFields(incomingScopes);
		}
		return super.outgoingScopeFor(incomingScopes);
	}

	/**
	 * 出力フィールドを生成する。
	 *
	 * @param incomingScopes 入力スコープ
	 * @return フィールド名
	 * @see Group#resolveDeclared(Set<Scope>)
	 */
	protected Fields createDeclaredFields(Set<Scope> incomingScopes) {

		Map<String, Scope> scopesMap = new HashMap<String, Scope>();
		for (Scope incomingScope : incomingScopes) {
			scopesMap.put(incomingScope.getName(), incomingScope);
		}

		Set<String> nameSet = new HashSet<String>();
		Fields appendedFields = new Fields();

		Pipe[] pipes = getPrevious();
		for (int i = 0; i < pipes.length; i++) {
			Pipe pipe = pipes[i];
			Fields appendableField = resolveFields(scopesMap.get(pipe.getName()));

			int sz = appendableField.size();
			Comparable<?>[] names = new Comparable[sz];
			for (int j = 0; j < sz; j++) {
				Comparable<?> name = appendableField.get(j);
				if (name instanceof String) {
					if (nameSet.contains(name)) {
						name = convertFieldName(i, pipe, (String) name);
					}
					nameSet.add((String) name);
				}
				names[j] = name;
			}

			appendedFields = appendedFields.append(new Fields(names));
		}

		return appendedFields;
	}

	/**
	 * 重複した名前を別の名前に変換する。
	 *
	 * @param i         パイプの番号
	 * @param pipe      対象フィールドの属しているパイプ
	 * @param fieldName フィールド名
	 * @return 変換されたフィールド名
	 */
	protected String convertFieldName(int i, Pipe pipe, String fieldName) {
		return pipe.getName() + "." + fieldName;
	}
}

Cascadingでは、Pipe#outgoingScopeFor()というメソッドで出力項目を決定しているらしい。
そこで このメソッドをオーバーライドし、declaredFieldsが未定義(null)だったらdeclaredFieldsを定義するようにしている。

Cascadingでは、実際にFlowが実行されるまで、どんな項目名が入ってくるか分からない。
言い換えると、Pipe(CoGroup)のコンストラクター時点では、項目名を決定できない。
なぜかと言うと、EachやEveryではFields.ALLを使って前パイプの項目名をそのまま後ろに渡す事があるので、パイプの一番先頭の項目名が分かる必要がある。 しかし一番先頭の項目名はTap(source)によって決まるのであり、どのパイプがどのsourceと結び付けられるのかは、FlowConnector#connect()で指定するから。
したがってコンストラクター内で項目名を判断してdeclaredFieldsを書き換えるという事は出来ない。ので、outgoingScopeFor()をオーバーライドする方法を採ってみた。

outgoingScopeFor()の引数に使われているScopeは、前パイプの項目名情報を保持しているクラス。
パイプ間の項目名情報の受け渡しに使用される。

createDeclaredFields()は、単純に項目名のSetを作り、重複していないかどうかチェックしているだけ。
convertFieldName()で実際の項目名の変換を行っている。
見て分かるとおり、パイプ名を接頭辞として付けているだけ。CoGroupでは指定された複数のパイプの名前が重複することは無い(重複しているとエラーになる)ので、項目名が同じなら、パイプ名を付加すれば必ず異なる項目名になる。
(ただし、変換後の名前とさらに重複する項目名があったら、CoGroupExではそこまでチェックしてないので、エラーになってしまう)


CoGroupExの使用例

	public static final String F_ID   = "id";
	public static final String F_WORD1 = "word1";
	public static final String F_WORD2 = "word2";
		// パイプの作成:入力行をカンマ区切りで分割
		Pipe pipe1 = new Pipe("pipe1");
		pipe1 = new Each(pipe1, new RegexSplitter(new Fields(F_ID, F_WORD1), ","));

		Pipe pipe2 = new Pipe("pipe2");
		pipe2 = new Each(pipe2, new RegexSplitter(new Fields(F_ID, F_WORD2), ","));

		// idとidで結合
//		Pipe pipe = new CoGroup  (pipe1, new Fields(F_ID), pipe2, new Fields(F_ID), new Fields(F_ID, F_WORD1, "dummy", F_WORD2));
		Pipe pipe = new CoGroupEx(pipe1, new Fields(F_ID), pipe2, new Fields(F_ID));
  CoGroup CoGroupEx
pipe1からの項目 "id", "word1"   "id", "word1"  
pipe2からの項目   "id", "word2"   "id", "word2"
出力項目 "id", "word1", "dummy", "word2" "id", "word1", "pipe2.id", "word2"

Cascadingへ戻る / Hadoopへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま