S-JIS[2010-04-23/2010-04-27] 変更履歴

Cascading Flow

CascadingのFlowクラスについて。


Flowの概要

Cascadingにおいて、入力Tap(source)出力Tap(sink)パイプを結び付け、実際にMap/Reduceの実行を行うのがFlow

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
	FlowConnector flowConnector = new FlowConnector();
	Flow flow = flowConnector.connect("フロー名", source, sink, pipe);
	flow.complete();	//実行開始

複数フローの並列実行

Flowを1つのプログラムの中で同時に複数指定する事が出来る。[2010-04-27]

パイプ分岐によって並列な処理を記述できるが、実際の実行は並列でなく、適当な順番で直列(シーケンシャル)に実行される(と思われる)。
フローの場合は本当に並列で実行される(らしい)。

例えばFlowAがtap1へ出力してFlowBがtap1を入力としていると、FlowAが実行されてからFlowBが実行される。
FlowAとFlowBの入出力に関係が無ければ、並列で実行される。
また、出力が新しくならない場合はフロー実行されないこともあるらしい(?)。

import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;

import cascading.scheme.SequenceFile;
import cascading.scheme.TextLine;
	// pipe1からpipe11,pipe12へ分岐
	Pipe pipe1 = new Pipe("pipe1");
	Pipe pipe11 = new Pipe("pipe11", pipe1);
	〜
	Pipe pipe12 = new Pipe("pipe12", pipe1);
	〜

	// pipe11の結果をpipe21で処理
	Pipe pipe21 = new Pipe("pipe21");
	〜

	// pipe12の結果をpipe22で処理
	Pipe pipe22 = new Pipe("pipe22");
	〜

	// sourceとsinkの設定
	Tap source1 = new Hfs(new TextLine(new Fields("line")), args[0]);
	Tap tap11   = new Hfs(new SequenceFile(new Fields(保存する項目)), "temp1");
	Tap tap12   = new Hfs(new SequenceFile(new Fields(保存する項目)), "temp2");
	Tap sink21  = new Hfs(new TextLine(), args[1], SinkMode.REPLACE);
	Tap sink22  = new Hfs(new TextLine(), args[2], SinkMode.REPLACE);

	// フローの構築
	Map<String, Tap> taps = new HashMap<String, Tap>();
	taps.put(pipe11.getName(), tap11);
	taps.put(pipe12.getName(), tap12);

	FlowConnector flowConnector = new FlowConnector();
	Flow flow1  = flowConnector.connect("flow1", source1, taps, pipe11, pipe12);
	Flow flow21 = flowConnector.connect("flow21", tap11, sink21, pipe21);
	Flow flow22 = flowConnector.connect("flow22", tap12, sink22, pipe22);
	Cascade cascade = new CascadeConnector().connect(flow1, flow21, flow22);
	cascade.complete(); //実行
source1 flow1
pipe1
pipe11 pipe12
tap11 tap12  
flow21,flow22
pipe21 pipe22
sink21 sink22

あるパイプから出力し、別のパイプの入力とするTap(上記のtap11,tap12)は、TextLineでなくSequenceFileを使うようだ。
SequenceFileのfieldsには、パイプから出力する項目(次パイプの入力となる項目)を指定する。

フロー(FlowConnector#connect())自体は、通常のフローの作り方と同じ。
それを実行する為にCascadeというクラスに一旦まとめる。
CascadeConnector#connect()には、フローを順不同で指定する。実行順序は内部で勝手に決めてくれる。


有向グラフの描画

Cascadingの内部では、パイプ(とsourcesink)を実行順序で並び替えた、有向グラフ(向きの指定のあるグラフ(ノードと矢印))を作っている。
(この為にjgrapht-*.jarが使われている模様)
Flowでは、その有向グラフのデータであるdotファイルを生成することが出来る。
そしてこのdotファイルから、有向グラフの実際の図を生成することが出来る。(→Graphvizというツールを使用)

例えばWordCountサンプルの有向グラフを出力するには、以下のようにする。
(complete()を呼んでMap/Reduceを実行する代わりに、writeDOT()でdotファイルを出力する)

	FlowConnector flowConnector = new FlowConnector();
	Flow flow = flowConnector.connect("wordcount", source, sink, pipe);
//	flow.complete();
	flow.writeDOT("C:/temp/graphviz/wordcount.dot");

↓(wordcount.dot)

digraph G {
  1 [label = "Every('wordcount')[Count[decl:'count']]"];
  2 [label = "Hfs['TextLine[['offset', 'line']->[ALL]]']['file:/C:/cygwin/home/hadoop/tutorial/output']']"];
  3 [label = "GroupBy('wordcount')[by:['word']]"];
  4 [label = "Each('wordcount')[RegexSplitGenerator[decl:'word'][args:1]]"];
  5 [label = "Hfs['TextLine[['line']->[ALL]]']['file:/C:/cygwin/home/hadoop/tutorial/input']']"];
  6 [label = "[head]"];
  7 [label = "[tail]"];
  1 -> 2 [label = "[{2}:'word', 'count']\n[{1}:'word']"];
  4 -> 3 [label = "[{1}:'word']\n[{1}:'word']"];
  3 -> 1 [label = "wordcount[{1}:'word']\n[{1}:'word']"];
  6 -> 5 [label = ""];
  2 -> 7 [label = "[{?}:ALL]\n[{?}:ALL]"];
  5 -> 4 [label = "[{1}:'line']\n[{1}:'line']"];
}

↓(Gveditによる画像化)

丸で囲まれているのが処理(主にパイプ)。
sourcesinkも出ている。(中間ファイル出力がある場合はTempHfsも出てくる)

矢印の脇に出ているのが、出力項目名。各項目名の左側のとげ括弧で囲まれている数字は、項目数。
上段がグループキー項目名(たぶんFields.GROUP(もしかするとFields.ALL)で取得できる項目)、下段が値項目名(たぶんFields.VALUESで取得できる項目)。
EachではFields.GROUPもFields.VALUESも同じになる為、同じ項目名が二重に出ている。

WordCountくらいだと一本道なのでこういう図のありがたみはあまり無いかもしれないが、パイプが分岐したり結合したりしてくると、処理順序を確認するのに便利かもしれない。


Cascadingへ戻る / Hadoopへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま