S-JIS[2010-04-010] 変更履歴

Cascading Pipe

CascadingのPipeクラスについて。

  • Cascading APIのPipe

Pipeの概要

Cascadingでは処理単位のPipeを用意し、PipeとPipeをつなぐことによって処理全体を作り上げる。[2010-04-05]
つまり、前パイプの処理結果(タプル)が後パイプに渡される。
一番最初のPipeの入力はsource(ファイル等からの入力)であり、一番最後のPipeの出力先がsink(ファイル等への出力)となる。

import cascading.pipe.Pipe;

PipeのサブクラスであるEachがHadoop本来のMapperEveryReducerに相当するが、
HadoopではMap→Reduceしか処理できなかったのに対し、Cascadingでは(ほぼ)自由に順番を変えられる。
また、複数のパイプに分岐したり、複数のパイプをひとつに統合(GroupByCoGroup)したりすることが出来る。


Cascadingでは、入出力データに項目名を付けてデータを区別している。(RDBのテーブルで項目名を付けるようなもの)
各Pipeでは、入力データの中から自分が処理したい項目名を指定してデータを取得し、処理結果用の項目に出力する。

MapReduceにおいてMapperの出力がReducerの入力になるからデータ型を合わせる必要があるのと同様に、
Pipeをつなぐ際には、前パイプの出力項目名後パイプの入力項目名が一致していないといけない。(パイプの中に存在している項目しか取得できない)
ただし、項目名を使わず、Tuple(Fields)内のインデックス番号を用いて項目を指定する事も出来る。


Pipeの種類

Pipeクラスは、実際の処理に応じて何種類か存在する。[2010-04-05]

クラス名 説明 使用例 類似
cascading.pipe.Pipe 処理単位を表す基本クラス。 Pipe pipe = new Pipe("name");  
cascading.pipe.Each データ1行に対し1回ずつ関数(operate())が呼ばれる。
データを加工したり分割したりする。
Each・Functionの詳細
pipe = new Each(前パイプ, 関数オブジェクト); MapReduceのMapper
関数がMapper#map()。
cascading.pipe.GroupBy データを指定されたキーでソートする。
複数のパイプを指定することも出来る。
GroupByの詳細
pipe = new GroupBy(前パイプ, new Fields("ソートキー項目名")); SQLのorder by(あるいはgroup by)。
MapReduceのシャッフル処理。
cascading.pipe.Every 同一キーのデータを処理する(集計する)。
新しいキーになった時にstart()が呼ばれ、
データ毎にaggregate()が呼ばれる。そして
次のキーになる直前にcomplete()が呼ばれる。
Everyを使う際には、直前にGroupByでソートしておく必要がある。
Every・Aggregatorの詳細
pipe = new Every(前パイプ, 集約関数オブジェクト); SQLのsumやcount。
MapReduceのReducer
集約関数がReducer#reduce()。
cascading.pipe.CoGroup 複数のパイプを指定されたキーで結合する。
(キーでソートされる)
CoGroupの詳細
pipe = new CoGroup(
 pipe1, new Fields("pipe1のキー項目名"),
 pipe2, new Fields("pipe2のキー項目名")
);
SQLのjoin。
cascading.pipe.SubAssembly 共通処理となるパイプを作る。
SubAssemblyの詳細
  サブルーチン

パイプ名

Pipeクラスそのもの(各サブクラスでなく)は、先頭のパイプを用意する(パイプ名を付ける)のに使われるようだ。

	Pipe pipe = new Pipe("パイプ名");

	// 後続処理にPipeインスタンスを受け渡していく
	pipe = new Each(pipe, new RegexSplitGenerator(new Fields("word"), "[ \t\n\r\f]+"));

パイプ名は、パイプの使用方法が間違っている場合の例外メッセージや、sourceやsinkとパイプを紐付けるのに使われる。
CoGroupでは、キー項目名などをパイプ毎に管理する為のMapのキーとしてもパイプ名を使用している。
(中間ワークのディレクトリー名にも使われるようなので(?)、日本語は使わない方がいいかも。)

大抵のPipeサブクラスではパイプ名を指定できるコンストラクターが用意されているので、必ずしもPipeクラスそのものを使用しなければならないわけではない。

	Pipe pipe = new Each("パイプ名", new RegexSplitGenerator(new Fields("word"), "[ \t\n\r\f]+"));

名前を付けなかった場合は、前パイプの名前がそのまま使われる。
グルーピング系のパイプ(GroupByCoGroup)で複数の前パイプを指定していた場合は、それらの名前を合成した名前になる。ちゃんと自分で名前を付けておく方が無難だろう。

MapReduceのジョブ名にはフロー名(FlowConnector#connect()で指定する名前)が使われるようだ。
フロー名にnullを指定すると、パイプ名から生成される。


Pipeとsource・sinkの紐付け

パイプの先頭がどのsourceからデータを受け取るのかについては、先頭のパイプのパイプ名とsourceを紐付けることによって指定する。
同じく、パイプ(の末尾)からどのsinkへデータを出力するのかを、(末尾の)パイプのパイプ名とsinkを紐付けることによって指定する。

	// 各パイプ(head)の作成
	Pipe pipeH1 = new Pipe("先頭パイプ名1");
	Pipe pipeH2 = new Pipe("先頭パイプ名2");
〜
	// 最終パイプ(tail)
	Pipe pipeT1 = new Xxx("末尾パイプ名1", 前パイプ, 〜);
	Pipe pipeT2 = new Zzz("末尾パイプ名2", 前パイプ, 〜);

	// パイプとsourceを結び付ける
	Map<String, Tap> sources = new HashMap<String, Tap>();
	sources.put("先頭パイプ名1", source1);
	sources.put("先頭パイプ名2", source2);

	// パイプとsinkを結び付ける
	Map<String, Tap> sinks = new HashMap<String, Tap>();
//	sinks.put("末尾パイプ名1", sink1);
//	sinks.put("末尾パイプ名2", sink2);
	sinks.put(pipeT1.getName(), sink1);
	sinks.put(pipeT2.getName(), sink2);

	// フローの作成
	FlowConnector flowConnector = new FlowConnector();
	Flow flow = flowConnector.connect("フロー名", sources, sinks, pipeT1, pipeT2);
	flow.complete(); //実行

しかし大抵のプログラムではパイプはひとつなので、それ用のメソッドが用意されている。

	FlowConnector flowConnector = new FlowConnector();
	Flow flow = flowConnector.connect("フロー名", source1, sink1, pipe1);
	flow.complete();

sourceのみ複数、あるいはsinkのみ複数といったメソッドも用意されている。


sinkに紐付けるパイプは、必ずしも最終パイプ(フローの末尾)でなくてもよいようだ。
途中のパイプ(の名前)を指定しても、ちゃんと出力される。
sinksにputしておけば、FlowConnector#connect()でそのパイプを指定しなくても出力される)


パイプの分岐

Cascadingでは、同一データを使用して処理を分岐させる(複数の処理で同一データを使用する)ことが出来る。
各Pipeクラスにおいて、前パイプに同じPipeインスタンスを指定すればよい。
(なお、パイプの合流(統合・結合)はGroupByCoGroupで行う)

public class ForkSample {

	public static void main(String[] args) {
		Tap source = new Hfs(new TextLine(new Fields("line")), new Path(args[0]).toUri().toString());
		Tap sink1 = new Hfs(new TextLine(), new Path(args[1]).toUri().toString(), SinkMode.REPLACE);
		Tap sink2 = new Hfs(new TextLine(), new Path(args[2]).toUri().toString(), SinkMode.REPLACE);

		Pipe pipe = new Pipe("最初のパイプ");
		pipe = new Each(pipe, new RegexSplitter(new Fields("key1", "key2", "data"), ",")); //入力行をカンマ区切りで分割

		Pipe pipe1 = new GroupBy("集計1", pipe, new Fields("key1"));
		pipe1 = new Every(pipe1, new Fields("data"), new Sum());

		Pipe pipe2 = new GroupBy("集計2", pipe, new Fields("key2"));
		pipe2 = new Every(pipe2, new Fields("data"), new Sum());

		// sinkの紐付け
		Map<String, Tap> sinks = new HashMap<String, Tap>();
		sinks.put(pipe1.getName(), sink1);
		sinks.put(pipe2.getName(), sink2);

		FlowConnector flowConnector = new FlowConnector();
		Flow flow = flowConnector.connect("fork-sample", source, sinks, pipe1, pipe2);
		flow.complete();
	}
}

ちなみに、GroupByでソートしてから2つのEveryに分岐することは出来ないようだ。
(Everyにはパイプ名を指定するコンストラクターも無いし、ちょっと特殊なのかも)

	Pipe pipe2 = new GroupBy("集計2", pipe, new Fields("key2"));

	Pipe pipe21 = new Every(pipe2, new Fields("data"), new Sum());
	pipe21 = new Pipe("SUM2", pipe21);

	Pipe pipe22 = new Every(pipe2, new Count());
	pipe22 = new Pipe("CNT2", pipe22);
Exception in thread "main" cascading.flow.PlannerException: could not build flow from assembly:
 [[集計2][jp.hishidama.hadoop.cascading.fork.ForkSample.main(ForkSample.java:51)] could not resolve argument selector in:
 Every(集計2)[Count[decl:'count']]]
	at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:225)
	at cascading.flow.FlowConnector.connect(FlowConnector.java:452)
	at cascading.flow.FlowConnector.connect(FlowConnector.java:434)
	at cascading.flow.FlowConnector.connect(FlowConnector.java:406)
	at jp.hishidama.hadoop.cascading.fork.ForkSample.main(ForkSample.java:63)
Caused by: cascading.pipe.OperatorException: [集計2][jp.hishidama.hadoop.cascading.fork.ForkSample.main(ForkSample.java:51)] could not resolve argument selector in:
 Every(集計2)[Count[decl:'count']]
	at cascading.pipe.Operator.resolveArgumentSelector(Operator.java:325)
	at cascading.pipe.Every.outgoingScopeFor(Every.java:243)
	at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:509)
	at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:491)
	at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:186)
	... 4 more
Caused by: java.lang.IllegalStateException: Every cannot follow a Tap or an Each
	at cascading.pipe.Every.resolveIncomingOperationFields(Every.java:223)
	at cascading.pipe.Operator.resolveArgumentSelector(Operator.java:313)
	... 8 more

	Pipe pipe21 = new GroupBy("SUM2", pipe, new Fields("key2"));
	pipe21 = new Every(pipe21, new Fields("data"), new Sum());

	Pipe pipe22 = new GroupBy("CNT2", pipe, new Fields("key2"));
	pipe22 = new Every(pipe22, new Count());

たぶんEveryReducerGroupByがシャッフル処理だと思われる点を考慮すると、
HadoopのMapReduceはシャッフルした結果を別々のReducerに渡せないから、という事なのかな?
(シャッフルはReduceタスクの前処理なので、外部から制御できない)


Cascadingへ戻る / Hadoopへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま