S-JIS[2010-04-05/2011-12-25] 変更履歴

Cascadingサンプル:WordCount

Cascadingのサンプルとして、HadoopチュートリアルのWordCountと同等の処理を作ってみる。


WordCountサンプル

CascadingHadoopチュートリアルのWordCountと同様の処理を行うプログラム。

package jp.hishidama.hadoop.cascading.wordcount;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowProcess;
import cascading.operation.Aggregator;
import cascading.operation.AggregatorCall;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
public class WordCount {
	// フィールド名の定義
	public static final String F_LINE  = "line";
	public static final String F_WORD  = "word";
	public static final String F_COUNT = "count";
	/**
	 * 行を分割する関数.
	 * チュートリアルWordCountのMapクラスに相当
	 */
	public static class SplitFunction extends BaseOperation<Object> implements Function<Object> {
		private static final long serialVersionUID = 1L;

		/** コンストラクター */
		public SplitFunction() {
			// 出力するフィールド名を指定
			super(new Fields(F_WORD));
		}

		@Override
		public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {
			TupleEntryCollector collector = functionCall.getOutputCollector();

			TupleEntry args = functionCall.getArguments();
			String line = args.getString(F_LINE);

			StringTokenizer tokenizer = new StringTokenizer(line);
			while (tokenizer.hasMoreTokens()) {
				Tuple tuple = new Tuple(tokenizer.nextToken());
				collector.add(tuple);
			}
		}
	}
	/**
	 * 件数をカウントする関数.
	 * チュートリアルWordCountのReduceクラスに相当
	 */
	public static class SumAggregator extends BaseOperation<SumAggregator.Context> implements Aggregator<SumAggregator.Context> {
		private static final long serialVersionUID = 1L;

		/** コンストラクター */
		public SumAggregator() {
			// 出力するフィールド名を指定
			super(new Fields(F_COUNT));
		}

		/** メソッド間にまたがってデータを保持する為のBeanクラス */
		protected static class Context {
			public int count;
		}

		@Override
		public void start(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
			Context context = aggregatorCall.getContext();
			if (context == null) {
				context = new Context();
				aggregatorCall.setContext(context);
			}
			context.count = 0;
		}

		@Override
		public void aggregate(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
			Context context = aggregatorCall.getContext();
			context.count++;
		}

		@Override
		public void complete(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
			TupleEntryCollector collector = aggregatorCall.getOutputCollector();

			Context context = aggregatorCall.getContext();
			collector.add(new Tuple(context.count));
		}
	}
	public static void main(String[] args) throws Exception {
		// 入出力ディレクトリーの指定
		Tap source = new Hfs(new TextLine(new Fields(F_LINE)), new Path(args[0]).toUri().toString());
		Tap sink   = new Hfs(new TextLine(),                   new Path(args[1]).toUri().toString(), SinkMode.REPLACE);

		// Pipeの初期化
		Pipe pipe = new Pipe("wordcount-pipe");

		// 行を単語に分割する。チュートリアルWordCountのMapクラスに相当
		pipe = new Each(pipe, new SplitFunction());

		// 単語毎にカウントする。チュートリアルWordCountのReduceクラスに相当
		pipe = new GroupBy(pipe, new Fields(F_WORD));
		pipe = new Every(pipe, new SumAggregator());

		// 実行
		FlowConnector flowConnector = new FlowConnector();
		Flow flow = flowConnector.connect("wordcount", source, sink, pipe);
		flow.complete();
	}
}

WordCountの処理の流れ(Pipeの定義)は以下のようになる。

処理 コーディング 説明 データ遷移のイメージ
入力ファイルの指定 Tap source = new Hfs(
 new TextLine(new Fields("line")),
 new Path(args[0]).toUri().toString()
);
Cascadingでは入力はsourceと呼び、場所はTapというクラスで指定する。
この例では、入力データに「line」という項目名を付けている。
Pathを使っている理由
ファイルの一行が1データとなる。
出力データ
line
Hello World Bye World
Hello Hadoop Goodbye Hadoop
出力ディレクトリーの指定 Tap sink = new Hfs(
 new TextLine(),
 new Path(args[1]).toUri().toString(),
 SinkMode.REPLACE
);
Cascadingでは出力先はsinkと呼び、場所はTapというクラスで指定する。
Pathを使っている理由
 
Pipeインスタンスの初期化 Pipe pipe = new Pipe("wordcount-pipe");    
単語分割処理 pipe = new Each(pipe, new SplitFunction()); Mapperに相当する処理。
Eachは、入力データ1つにつき、関数を1回呼び出す処理。
この例では、入力を「line」項目から受け取り(operate()内のgetString())、
「word」という項目に単語を出力する。
Each+SplitFunction
入力データ   出力
line word
Hello World Bye World Hello
World
Bye
World
Hello Hadoop Goodbye Hadoop Hello
Hadoop
Goodbye
Hadoop
単語カウント処理 pipe = new Every(new GroupBy(pipe, new Fields("word")), new SumAggregator()); Reducerに相当する処理。
Every-GroupByは、指定された項目毎の処理を行う。
この例では、「word」毎に件数をカウントし、「count」項目にその件数を出力する。
GroupByはソートを行う。
GroupBy
入力   出力
word word
Hello
World
Bye
World
Hello
Hadoop
Goodbye
Hadoop
Bye
Goodbye
Hadoop
Hadoop
Hello
Hello
World
World
Every+SumAggregator
入力   出力データ
word word count
Bye Bye 1
Goodbye Goodbye 1
Hadoop Hadoop 2
Hadoop
Hello Hello 2
Hello
World World 2
World
実行 FlowConnector flowConnector = new FlowConnector();
Flow flow = flowConnector.connect("wordcount", source, sink, pipe);
flow.complete();
処理を実行する(Map/Reduceが作られ、実行される)。
Pipeの最終処理データが項目毎にタブ区切りでファイルに出力される。
上記のデータ遷移は、実際には
complete()が呼ばれてから処理開始。

実行

Cascadingのプログラムはmain()から始まる普通のJavaアプリケーションなので、普通に実行できる。

ただし実行にはHadoopのMap/Reduceが使われる為、実行時のクラスパスにHadoopのjarファイルが必要。
また、Hadoopの設定ファイルの場所(HADOOP_HOME/conf)を指定しなかった場合、デフォルト状態(=単独環境)で実行される。

つまり、bin/hadoopシェルを経由せずに直接実行すればHadoopの単独環境で実行され、
hadoopシェル経由で実行すれば 設定ファイルの内容に従った環境で実行される。

なお、出力TapSinkMode.REPLACEを指定した場合、出力先ディレクトリーが既に存在していたら自動的に削除されて実行される。
(したがって、本来のHadoopプログラムと違って、実行前に自分でoutputディレクトリーを削除する必要は無い。便利♪

build.xml(単独環境で実行):

<?xml version="1.0" encoding="Windows-31J"?>
<project name="hadoop cascading" basedir=".">
	<property environment="env" />

	<property name="cygwin"         location="C:\cygwin" />
	<property name="hadoop.home"    location="${cygwin}/usr/local/hadoop-0.20.2" />
	<property name="cascading.home" location="${cygwin}/usr/local/cascading-1.0.18-hadoop-0.19.0+" />

	<property name="target.home"   location="${cygwin}/home/hadoop/tutorial" />
	<property name="target.input"  location="${target.home}/input" />
	<property name="target.output" location="${target.home}/output" />

	<property name="main.class" value="jp.hishidama.hadoop.cascading.wordcount.WordCount" />

	<path id="class.path">
		<pathelement location="../classes" />
		<fileset dir="${hadoop.home}">
			<include name="hadoop-*-core.jar" />
			<include name="lib/**/*.jar" />
		</fileset>
		<fileset dir="${cascading.home}">
			<include name="*.jar" />
			<exclude name="cascading-*-*.jar" />
			<include name="lib/*.jar" />
		</fileset>
	</path>

	<target name="1.1.execute" description="実行">
		<java classname="${main.class}" fork="true" maxmemory="1024m">
			<arg value="${target.input}" />
			<arg value="${target.output}" />
			<classpath refid="class.path" />
			<sysproperty key="hadoop.log.dir" path="${hadoop.home}/logs" />
			<sysproperty key="hadoop.log.file" value="hadoop.log" />
			<sysproperty key="hadoop.home.dir" path="${hadoop.home}/" />
			<sysproperty key="hadoop.id.str" value="${env.COMPUTERNAME}" />
			<sysproperty key="hadoop.root.logger" value="INFO,console" />
			<sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" />
			<env key="PATH" path="${cygwin}/bin" />
		</java>
	</target>

	<target name="1.2.cat_output" description="outputの内容を表示する">
		<exec executable="${cygwin}/bin/cat.exe" dir="${target.output}">
			<arg value="*" />
		</exec>
	</target>
</project>

Cascadingのjarファイルをクラスパスに指定している事を除けば、通常のHadoopプログラムを実行する方法と全く同じ。


hadoopシェルによる実行

HADOOP_HOME/bin/hadoopシェルによってCascadingのプログラムを実行させる為には、HADOOP_HOME/conf/hadoop-env.shにCascadingのjarファイルを追加しておく必要がある。

HADOOP_HOME/conf/hadoop-env.sh:

# Extra Java CLASSPATH elements. Optional.
# export HADOOP_CLASSPATH=
CASCADING_HOME=/usr/local/cascading-1.0.18-hadoop-0.19.0+
HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$CASCADING_HOME/cascading-1.0.18.jar:$CASCADING_HOME/lib/jgrapht-jdk1.6.jar
export HADOOP_CLASSPATH

擬似分散環境での実行例:

$ cd /home/hadoop/tutorial

$ hadoop fs -put input input

$ hadoop jar wordcount.jar jp.hishidama.hadoop.cascading.wordcount.WordCount input output
2010/04/04 20:09:18 INFO util.Util: resolving application jar from found main method on: jp.hishidama.hadoop.cascading.wordcount.WordCount
2010/04/04 20:09:18 INFO flow.MultiMapReducePlanner: using application jar: /C:/cygwin/home/hadoop/tutorial/wordcount.jar
2010/04/04 20:09:19 INFO flow.Flow: [wordcount-flow] starting
2010/04/04 20:09:19 INFO flow.Flow: [wordcount-flow] source: Hfs["TextLine[['line']->[ALL]]"]["input"]"]
2010/04/04 20:09:19 INFO flow.Flow: [wordcount-flow] sink: Hfs["TextLine[['offset', 'line']->[ALL]]"]["output"]"]
2010/04/04 20:09:20 INFO flow.Flow: [wordcount-flow] parallel execution is enabled: true
2010/04/04 20:09:20 INFO flow.Flow: [wordcount-flow] starting jobs: 1
2010/04/04 20:09:20 INFO flow.Flow: [wordcount-flow] allocating threads: 1
2010/04/04 20:09:20 INFO flow.FlowStep: [wordcount-flow] starting step: (1/1) Hfs["TextLine[['offset', 'line']->[ALL]]"]["output"]"]
2010/04/04 20:09:20 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2010/04/04 20:09:20 INFO mapred.FileInputFormat: Total input paths to process : 2

$ hadoop fs -ls output
Found 2 items
drwxr-xr-x   - hishidama supergroup          0 2010-04-04 20:09 /user/hishidama/output/_logs
-rw-r--r--   1 hishidama supergroup         41 2010-04-04 20:09 /user/hishidama/output/part-00000

$ hadoop fs -cat 'output/*'
Bye	1
Goodbye	1
Hadoop	2
Hello	2
World	2
cat: Source must be a file.

WordCountのカスタマイズ:用意されているクラスの使用

最初のサンプルでは、HadoopのMap/Reduceと対比させる為にわざわざSplitFunctionSumAggregatorというクラスを作ってみた。

しかしテキストの行の分割やカウントといった処理はCascadingにデフォルトで用意されているので、そちらを使う方がいいかな。

import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexSplitGenerator;
public class WordCount2 {
	// フィールド名の定義
	public static final String F_LINE = "line";
	public static final String F_WORD = "word";
	public static final String F_COUNT = "count";
	public static void main(String[] args) throws Exception {
		// 入出力ディレクトリーの指定
		Tap source = new Hfs(new TextLine(new Fields(F_LINE)), new Path(args[0]).toUri().toString());
		Tap sink   = new Hfs(new TextLine(),                   new Path(args[1]).toUri().toString(), SinkMode.REPLACE);

		// Pipeの初期化
		Pipe pipe = new Pipe("wordcount-pipe");

		// 行を単語に分割する。チュートリアルWordCountのMapクラスに相当
		pipe = new Each(pipe, new RegexSplitGenerator(new Fields(F_WORD), "[ \t\n\r\f]+"));

		// 単語毎にカウントする。チュートリアルWordCountのReduceクラスに相当
		pipe = new GroupBy(pipe, new Fields(F_WORD));
		pipe = new Every(pipe, new Count(new Fields(F_COUNT)));

		// 実行
		FlowConnector flowConnector = new FlowConnector();
		Flow flow = flowConnector.connect("wordcount", source, sink, pipe);
		flow.complete();
	}
}

ToolRunnerの使用

HadoopのTool・ToolRunnerを使用する例。[2010-04-15]

前述のWordCountのHfs生成では、パスを指定するのに「new Path(args[0]).toUri().toString()」などという、回りくどい事をしている。
素直に「args[0]」とした場合、常にURI形式で指定しなければならない。(相対パスを解釈してくれない)
HadoopのPathクラスを使って「new Path(args[0]).toString()」とした場合も、CascadingはWindowsのローカルパスを解釈してくれない。
そこで、間にURIクラスを入れている。

しかし本来のHadoopのプログラムでは、Hadoopのコンフィグファイルの内容に応じて自動的に相対パスを解釈してくれる。
それを行っているのがFileInputFormat#addInputPath()やFileOutputFormat#setOutputPath()なので、それと同様の処理をしてやればよい。
ただし、その為にはHadoopのコンフィグファイルを読み込む必要がある(ファイルを指定する方法が必要になるし、コンフィグオブジェクトを生成・保持・取得する必要がある)。
そこで、HadoopのToolRunnerを使用する。

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.MultiMapReducePlanner;
public class WordCount3 extends Configured implements Tool {
	// フィールド名の定義
	public static final String F_LINE = "line";
	public static final String F_WORD = "word";
	public static final String F_COUNT = "count";
	// パスの解釈
	public String makeQualifiedPath(String path) throws IOException {
		FileSystem fs = FileSystem.get(super.getConf());
		return new Path(path).makeQualified(fs).toString();
	}
	@Override
	public int run(String[] args) throws Exception {
		// 入出力ディレクトリーの指定
		Tap source = new Hfs(new TextLine(new Fields(F_LINE)), makeQualifiedPath(args[0]));
		Tap sink   = new Hfs(new TextLine(),                   makeQualifiedPath(args[1]), SinkMode.REPLACE);

		// Pipeの初期化
		Pipe pipe = new Pipe("wordcount-pipe");

		// 行を単語に分割する。チュートリアルWordCountのMapクラスに相当
		pipe = new Each(pipe, new RegexSplitGenerator(new Fields(F_WORD), "[ \t\n\r\f]+"));

		// 単語毎にカウントする。チュートリアルWordCountのReduceクラスに相当
		pipe = new GroupBy(pipe, new Fields(F_WORD));
		pipe = new Every(pipe, new Count(new Fields(F_COUNT)));

		// 実行
		{
			Map<Object, Object> properties = new HashMap<Object, Object>();
			MultiMapReducePlanner.setJobConf(properties, new JobConf(super.getConf()));
			FlowConnector.setApplicationJarClass(properties, getClass());

			FlowConnector flowConnector = new FlowConnector(properties);
			Flow flow = flowConnector.connect("wordcount", source, sink, pipe);
			flow.complete();
		}
		return 0;
	}

	public static void main(String[] args) throws Exception {
		int r = ToolRunner.run(new WordCount3(), args);
		System.exit(r);
	}
}

ついでに、FlowConnectorプロパティーを渡すようにしてみた。
FlowConnector#connect()の中ではMultiMapReducePlannerを使っており、そこにJobConf(Hadoopの設定)を指定することが出来るようだ。
(Hadoop0.20ではJobConfは非推奨クラスなのだが、Cascading1.0はHadoop0.19対応なので、JobConfを使っている)

パス名の解釈やプロパティーの初期化を行う自作ライブラリー


Cascading1.2

Cascading1.2では件数カウント用のサブアセンブリーが追加された。[2011-12-25]

		// 単語毎にカウントする。チュートリアルWordCountのReduceクラスに相当
		pipe = new CountBy(pipe, new Fields(F_WORD), new Fields(F_COUNT));

Cascadingへ戻る / Hadoopへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま