Cascadingのサンプルとして、HadoopチュートリアルのWordCountと同等の処理を作ってみる。
|
|
CascadingでHadoopチュートリアルのWordCountと同様の処理を行うプログラム。
package jp.hishidama.hadoop.cascading.wordcount;
import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import cascading.flow.Flow; import cascading.flow.FlowConnector; import cascading.flow.FlowProcess; import cascading.operation.Aggregator; import cascading.operation.AggregatorCall; import cascading.operation.BaseOperation; import cascading.operation.Function; import cascading.operation.FunctionCall; import cascading.pipe.Each; import cascading.pipe.Every; import cascading.pipe.GroupBy; import cascading.pipe.Pipe; import cascading.scheme.TextLine; import cascading.tap.Hfs; import cascading.tap.SinkMode; import cascading.tap.Tap; import cascading.tuple.Fields; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; import cascading.tuple.TupleEntryCollector;
public class WordCount {
// フィールド名の定義 public static final String F_LINE = "line"; public static final String F_WORD = "word"; public static final String F_COUNT = "count";
/**
* 行を分割する関数.
* チュートリアルWordCountのMapクラスに相当
*/
public static class SplitFunction extends BaseOperation<Object> implements Function<Object> {
private static final long serialVersionUID = 1L;
/** コンストラクター */
public SplitFunction() {
// 出力するフィールド名を指定
super(new Fields(F_WORD));
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {
TupleEntryCollector collector = functionCall.getOutputCollector();
TupleEntry args = functionCall.getArguments();
String line = args.getString(F_LINE);
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
Tuple tuple = new Tuple(tokenizer.nextToken());
collector.add(tuple);
}
}
}
/**
* 件数をカウントする関数.
* チュートリアルWordCountのReduceクラスに相当
*/
public static class SumAggregator extends BaseOperation<SumAggregator.Context> implements Aggregator<SumAggregator.Context> {
private static final long serialVersionUID = 1L;
/** コンストラクター */
public SumAggregator() {
// 出力するフィールド名を指定
super(new Fields(F_COUNT));
}
/** メソッド間にまたがってデータを保持する為のBeanクラス */
protected static class Context {
public int count;
}
@Override
public void start(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
Context context = aggregatorCall.getContext();
if (context == null) {
context = new Context();
aggregatorCall.setContext(context);
}
context.count = 0;
}
@Override
public void aggregate(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
Context context = aggregatorCall.getContext();
context.count++;
}
@Override
public void complete(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
TupleEntryCollector collector = aggregatorCall.getOutputCollector();
Context context = aggregatorCall.getContext();
collector.add(new Tuple(context.count));
}
}
public static void main(String[] args) throws Exception {
// 入出力ディレクトリーの指定
Tap source = new Hfs(new TextLine(new Fields(F_LINE)), new Path(args[0]).toUri().toString());
Tap sink = new Hfs(new TextLine(), new Path(args[1]).toUri().toString(), SinkMode.REPLACE);
// Pipeの初期化
Pipe pipe = new Pipe("wordcount-pipe");
// 行を単語に分割する。チュートリアルWordCountのMapクラスに相当
pipe = new Each(pipe, new SplitFunction());
// 単語毎にカウントする。チュートリアルWordCountのReduceクラスに相当
pipe = new GroupBy(pipe, new Fields(F_WORD));
pipe = new Every(pipe, new SumAggregator());
// 実行
FlowConnector flowConnector = new FlowConnector();
Flow flow = flowConnector.connect("wordcount", source, sink, pipe);
flow.complete();
}
}
WordCountの処理の流れ(Pipeの定義)は以下のようになる。
| 処理 | コーディング | 説明 | データ遷移のイメージ | |||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 入力ファイルの指定 | Tap source = new Hfs( |
Cascadingでは入力はsourceと呼び、場所はTapというクラスで指定する。 この例では、入力データに「line」という項目名を付けている。 →Pathを使っている理由 |
ファイルの一行が1データとなる。
|
|||||||||||||||||||||||||||
| 出力ディレクトリーの指定 | Tap sink = new Hfs( |
Cascadingでは出力先はsinkと呼び、場所はTapというクラスで指定する。 →Pathを使っている理由 |
||||||||||||||||||||||||||||
| Pipeインスタンスの初期化 | Pipe pipe = new Pipe("wordcount-pipe"); |
|||||||||||||||||||||||||||||
| 単語分割処理 | pipe = new Each(pipe, new SplitFunction()); |
Mapperに相当する処理。 Eachは、入力データ1つにつき、関数を1回呼び出す処理。 この例では、入力を「line」項目から受け取り(operate()内のgetString())、 「word」という項目に単語を出力する。 |
|
|||||||||||||||||||||||||||
| 単語カウント処理 | pipe = new Every(new GroupBy(pipe, new Fields("word")), new SumAggregator()); |
Reducerに相当する処理。 Every-GroupByは、指定された項目毎の処理を行う。 この例では、「word」毎に件数をカウントし、「count」項目にその件数を出力する。 |
GroupByはソートを行う。
|
|||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
| 実行 | FlowConnector flowConnector = new FlowConnector(); |
処理を実行する(Map/Reduceが作られ、実行される)。 Pipeの最終処理データが項目毎にタブ区切りでファイルに出力される。 |
上記のデータ遷移は、実際には complete()が呼ばれてから処理開始。 |
|||||||||||||||||||||||||||
Cascadingのプログラムはmain()から始まる普通のJavaアプリケーションなので、普通に実行できる。
ただし実行にはHadoopのMap/Reduceが使われる為、実行時のクラスパスにHadoopのjarファイルが必要。
また、Hadoopの設定ファイルの場所(HADOOP_HOME/conf)を指定しなかった場合、デフォルト状態(=単独環境)で実行される。
つまり、bin/hadoopシェルを経由せずに直接実行すればHadoopの単独環境で実行され、
hadoopシェル経由で実行すれば 設定ファイルの内容に従った環境で実行される。
なお、出力TapにSinkMode.REPLACEを指定した場合、出力先ディレクトリーが既に存在していたら自動的に削除されて実行される。
(したがって、本来のHadoopプログラムと違って、実行前に自分でoutputディレクトリーを削除する必要は無い。便利♪)
<?xml version="1.0" encoding="Windows-31J"?>
<project name="hadoop cascading" basedir=".">
<property environment="env" />
<property name="cygwin" location="C:\cygwin" />
<property name="hadoop.home" location="${cygwin}/usr/local/hadoop-0.20.2" />
<property name="cascading.home" location="${cygwin}/usr/local/cascading-1.0.18-hadoop-0.19.0+" />
<property name="target.home" location="${cygwin}/home/hadoop/tutorial" />
<property name="target.input" location="${target.home}/input" />
<property name="target.output" location="${target.home}/output" />
<property name="main.class" value="jp.hishidama.hadoop.cascading.wordcount.WordCount" />
<path id="class.path">
<pathelement location="../classes" />
<fileset dir="${hadoop.home}">
<include name="hadoop-*-core.jar" />
<include name="lib/**/*.jar" />
</fileset>
<fileset dir="${cascading.home}">
<include name="*.jar" />
<exclude name="cascading-*-*.jar" />
<include name="lib/*.jar" />
</fileset>
</path>
<target name="1.1.execute" description="実行">
<java classname="${main.class}" fork="true" maxmemory="1024m">
<arg value="${target.input}" />
<arg value="${target.output}" />
<classpath refid="class.path" />
<sysproperty key="hadoop.log.dir" path="${hadoop.home}/logs" />
<sysproperty key="hadoop.log.file" value="hadoop.log" />
<sysproperty key="hadoop.home.dir" path="${hadoop.home}/" />
<sysproperty key="hadoop.id.str" value="${env.COMPUTERNAME}" />
<sysproperty key="hadoop.root.logger" value="INFO,console" />
<sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" />
<env key="PATH" path="${cygwin}/bin" />
</java>
</target>
<target name="1.2.cat_output" description="outputの内容を表示する">
<exec executable="${cygwin}/bin/cat.exe" dir="${target.output}">
<arg value="*" />
</exec>
</target>
</project>
Cascadingのjarファイルをクラスパスに指定している事を除けば、通常のHadoopプログラムを実行する方法と全く同じ。
HADOOP_HOME/bin/hadoopシェルによってCascadingのプログラムを実行させる為には、HADOOP_HOME/conf/hadoop-env.shにCascadingのjarファイルを追加しておく必要がある。
# Extra Java CLASSPATH elements. Optional. # export HADOOP_CLASSPATH= CASCADING_HOME=/usr/local/cascading-1.0.18-hadoop-0.19.0+ HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$CASCADING_HOME/cascading-1.0.18.jar:$CASCADING_HOME/lib/jgrapht-jdk1.6.jar export HADOOP_CLASSPATH
$ cd /home/hadoop/tutorial $ hadoop fs -put input input $ hadoop jar wordcount.jar jp.hishidama.hadoop.cascading.wordcount.WordCount input output 2010/04/04 20:09:18 INFO util.Util: resolving application jar from found main method on: jp.hishidama.hadoop.cascading.wordcount.WordCount 2010/04/04 20:09:18 INFO flow.MultiMapReducePlanner: using application jar: /C:/cygwin/home/hadoop/tutorial/wordcount.jar 2010/04/04 20:09:19 INFO flow.Flow: [wordcount-flow] starting 2010/04/04 20:09:19 INFO flow.Flow: [wordcount-flow] source: Hfs["TextLine[['line']->[ALL]]"]["input"]"] 2010/04/04 20:09:19 INFO flow.Flow: [wordcount-flow] sink: Hfs["TextLine[['offset', 'line']->[ALL]]"]["output"]"] 2010/04/04 20:09:20 INFO flow.Flow: [wordcount-flow] parallel execution is enabled: true 2010/04/04 20:09:20 INFO flow.Flow: [wordcount-flow] starting jobs: 1 2010/04/04 20:09:20 INFO flow.Flow: [wordcount-flow] allocating threads: 1 2010/04/04 20:09:20 INFO flow.FlowStep: [wordcount-flow] starting step: (1/1) Hfs["TextLine[['offset', 'line']->[ALL]]"]["output"]"] 2010/04/04 20:09:20 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 2010/04/04 20:09:20 INFO mapred.FileInputFormat: Total input paths to process : 2 $ hadoop fs -ls output Found 2 items drwxr-xr-x - hishidama supergroup 0 2010-04-04 20:09 /user/hishidama/output/_logs -rw-r--r-- 1 hishidama supergroup 41 2010-04-04 20:09 /user/hishidama/output/part-00000 $ hadoop fs -cat 'output/*' Bye 1 Goodbye 1 Hadoop 2 Hello 2 World 2 cat: Source must be a file.
最初のサンプルでは、HadoopのMap/Reduceと対比させる為にわざわざSplitFunctionやSumAggregatorというクラスを作ってみた。
しかしテキストの行の分割やカウントといった処理はCascadingにデフォルトで用意されているので、そちらを使う方がいいかな。
import cascading.operation.aggregator.Count; import cascading.operation.regex.RegexSplitGenerator;
public class WordCount2 {
// フィールド名の定義 public static final String F_LINE = "line"; public static final String F_WORD = "word"; public static final String F_COUNT = "count";
public static void main(String[] args) throws Exception {
// 入出力ディレクトリーの指定
Tap source = new Hfs(new TextLine(new Fields(F_LINE)), new Path(args[0]).toUri().toString());
Tap sink = new Hfs(new TextLine(), new Path(args[1]).toUri().toString(), SinkMode.REPLACE);
// Pipeの初期化
Pipe pipe = new Pipe("wordcount-pipe");
// 行を単語に分割する。チュートリアルWordCountのMapクラスに相当
pipe = new Each(pipe, new RegexSplitGenerator(new Fields(F_WORD), "[ \t\n\r\f]+"));
// 単語毎にカウントする。チュートリアルWordCountのReduceクラスに相当
pipe = new GroupBy(pipe, new Fields(F_WORD));
pipe = new Every(pipe, new Count(new Fields(F_COUNT)));
// 実行
FlowConnector flowConnector = new FlowConnector();
Flow flow = flowConnector.connect("wordcount", source, sink, pipe);
flow.complete();
}
}
HadoopのTool・ToolRunnerを使用する例。[2010-04-15]
前述のWordCountのHfs生成では、パスを指定するのに「new
Path(args[0]).toUri().toString()」などという、回りくどい事をしている。
素直に「args[0]」とした場合、常にURI形式で指定しなければならない。(相対パスを解釈してくれない)
HadoopのPathクラスを使って「new Path(args[0]).toString()」とした場合も、CascadingはWindowsのローカルパスを解釈してくれない。
そこで、間にURIクラスを入れている。
しかし本来のHadoopのプログラムでは、Hadoopのコンフィグファイルの内容に応じて自動的に相対パスを解釈してくれる。
それを行っているのがFileInputFormat#addInputPath()やFileOutputFormat#setOutputPath()なので、それと同様の処理をしてやればよい。
ただし、その為にはHadoopのコンフィグファイルを読み込む必要がある(ファイルを指定する方法が必要になるし、コンフィグオブジェクトを生成・保持・取得する必要がある)。
そこで、HadoopのToolRunnerを使用する。
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import cascading.flow.Flow; import cascading.flow.FlowConnector; import cascading.flow.MultiMapReducePlanner;
public class WordCount3 extends Configured implements Tool {
// フィールド名の定義 public static final String F_LINE = "line"; public static final String F_WORD = "word"; public static final String F_COUNT = "count";
// パスの解釈
public String makeQualifiedPath(String path) throws IOException {
FileSystem fs = FileSystem.get(super.getConf());
return new Path(path).makeQualified(fs).toString();
}
@Override
public int run(String[] args) throws Exception {
// 入出力ディレクトリーの指定
Tap source = new Hfs(new TextLine(new Fields(F_LINE)), makeQualifiedPath(args[0]));
Tap sink = new Hfs(new TextLine(), makeQualifiedPath(args[1]), SinkMode.REPLACE);
// Pipeの初期化
Pipe pipe = new Pipe("wordcount-pipe");
// 行を単語に分割する。チュートリアルWordCountのMapクラスに相当
pipe = new Each(pipe, new RegexSplitGenerator(new Fields(F_WORD), "[ \t\n\r\f]+"));
// 単語毎にカウントする。チュートリアルWordCountのReduceクラスに相当
pipe = new GroupBy(pipe, new Fields(F_WORD));
pipe = new Every(pipe, new Count(new Fields(F_COUNT)));
// 実行
{
Map<Object, Object> properties = new HashMap<Object, Object>();
MultiMapReducePlanner.setJobConf(properties, new JobConf(super.getConf()));
FlowConnector.setApplicationJarClass(properties, getClass());
FlowConnector flowConnector = new FlowConnector(properties);
Flow flow = flowConnector.connect("wordcount", source, sink, pipe);
flow.complete();
}
return 0;
}
public static void main(String[] args) throws Exception {
int r = ToolRunner.run(new WordCount3(), args);
System.exit(r);
}
}
ついでに、FlowConnectorにプロパティーを渡すようにしてみた。
FlowConnector#connect()の中ではMultiMapReducePlannerを使っており、そこにJobConf(Hadoopの設定)を指定することが出来るようだ。
(Hadoop0.20ではJobConfは非推奨クラスなのだが、Cascading1.0はHadoop0.19対応なので、JobConfを使っている)
Cascading1.2では件数カウント用のサブアセンブリーが追加された。[2011-12-25]
// 単語毎にカウントする。チュートリアルWordCountのReduceクラスに相当 pipe = new CountBy(pipe, new Fields(F_WORD), new Fields(F_COUNT));