S-JIS[2010-02-23/2012-12-15] 変更履歴

Hadoop Java API

HadoopのJava APIについて。


ToolRunner

チュートリアルのソースでは使っていないが、@ITのサンプルではToolRunnerというクラスを使って実行している。

ToolRunnerを使うと、Configurationの初期化や汎用(一般)オプション(generic option)の解釈を行ってくれる。
したがって、Configurationの初期化を自前で行い、汎用オプションも使わないのであれば、ToolRunnerを使う必要は無い。

ToolRunnerを使わない例

import org.apache.hadoop.conf.Configuration;
public class WordCount {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf, "wordcount");
		job.setJarByClass(WordCount.class);
〜
		boolean success = job.waitForCompletion(true);
		System.exit(success ? 0 : 1);
	}

ToolRunnerを使う例

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCount extends Configured implements Tool {
	public static void main(String[] args) throws Exception {
		int r = ToolRunner.run(new WordCount(), args);
		System.exit(r);
	}
	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Job job = new Job(conf, "wordcount");
		job.setJarByClass(getClass());
〜
		boolean success = job.waitForCompletion(true);
		return success ? 0 : 1;
	}

ToolRunner#run()にはToolインターフェースの実装クラスを渡す必要があるので、クラス本体(上の例ではWordCount)はToolを実装する必要がある。

ToolインターフェースはConfigurableというインターフェースを継承している。
これにはgetConf()等が宣言されており、これをConfiguredクラスが実装している為、クラス本体はConfiguredを親クラスとしている。

Toolインターフェースのメソッドの中で Configuredクラスで実装されていないのはrun()のみであり、これをクラス本体が実装する必要がある。
ToolRunnerはConfigurationの初期化を行った後でTool#run()を呼び出す。


ToolRunnerが解釈してくれる汎用オプション(汎用コマンドライン・一般オプション)の内容は、ToolRunner.printGenericCommandUsage()を実行すると表示される。

		ToolRunner.printGenericCommandUsage(System.out);
Generic options supported are
-conf <configuration file>     specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|jobtracker:port>    specify a job tracker
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.

The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
オプション 内容 備考
-conf 設定ファイル 設定ファイルを指定する。
設定ファイルの内容はHADOOP_HOME/conf/の下にあるxmlファイルと同形式。
すなわち
<property>
  <name>hbase.mapred.outputtable</name>
  <value>TEST_TYPE</value>
  <description>output table name</description>
</property>

の様に書く。
そうすると、Tool#run()が呼ばれるときにはconfには設定ファイル内の値がセットされており、
上記の例ではconf.get("hbase.mapred.outputtable")"TEST_TYPE"を取得することが出来る。
独自設定ファイル
を指定する例
-D プロパティー名=値 confにプロパティーを設定する。[2010-03-12]
設定ファイルのプロパティーより優先される。(設定ファイルで指定されたプロパティーに上書きされる)
javaコマンドの-Dオプションと違って、「-D」と「プロパティー名」の間にはスペースが必要なので注意。
Reducerの個数
を指定する例
-libjars jarファイル,… 独自に使用するjarファイルをカンマ区切りで列挙する。
(通常、クラスパス等でjarファイルを列挙する際の区切り文字はWindowsだとセミコロン、UNIXだとコロンだが、
 そういう機種依存をしない為にカンマ区切りになっているのだと思う)

HADOOP_HOME/bin/hadoopシェルはHADOOPのライブラリーしかクラスパスに入れてくれないので、
その他のライブラリーを使う際はここでjarファイルを指定できる。
というものだと思うのだが、ToolRunner内で新しいクラスローダーを作成することになるので
最初にロードされたクラスからライブラリー内のクラスは参照できないような気が…。(単独環境だけの問題か?)
「hadoop jar zzz.jar」でHADOOPを実行する場合、zzz.jar内のルートにlibディレクトリーがあったら
そのlib直下のjarファイルもクラスパスに含めてくれるようなので、そちらの方が確実かも。
(「bin/hadoop jar」はjarファイルの展開なんぞをやっているので、ちょっと無駄な気もする。
 が、libディレクトリー内のjarファイルをクラスパスに含める為には、解凍が必須なのかも)
 

初期化メソッド

最初に、どのクラスを使ってMap/Reduceの処理を行うかを指定する。[2010-02-24/2010-03-19]

初期化メソッド 指定する値 説明 省略時(デフォルト)
Job setMapOutputKeyClass() Mapperの出力キーのクラス Mapperの出力型(すなわちReducerの入力型・Combinerの型)を指定する。
ここで指定した型が実際のMapperの出力と異なっていたら例外が発生する。
setOutputKeyClass()で指定した型
setMapOutputValueClass() Mapperの出力値のクラス setOutputValueClass()で指定した型
setOutputKeyClass() Reducerの出力キーのクラス Reducerの出力型(すなわちOutputFormatの入力型)を指定する。 LongWritable.class
setOutputValueClass() Reducerの出力値のクラス Text.class
setMapperClass() Mapperクラス Mapperの具象クラスを指定する。 Mapper.class
(入力をそのまま出力する)
setCombinerClass() Combinerクラス Combinerの具象クラスを指定する。 null
(中間集計を行わない)
setReducerClass() Reducerクラス Reducerの具象クラスを指定する。 Reducer.class
(入力をそのまま出力する)
setInputFormatClass() InputFormatクラス データ読み込みを行いMapperへデータを受け渡すInputFormatの具象クラスを指定する。 TextInputFormat.class
setOutputFormatClass() OutputFormatクラス Reducerからデータを受け取り最終出力を行うOutputFormatの具象クラスを指定する。 TextOutputFormat.class
setSortComparatorClass() RawComparatorクラス Mapper出力後(Combiner処理前)のソート(の為のキーの比較)を行うコンパレーターを指定する。 setMapOutputKeyClass()で指定したクラス
のWritableComparator
setGroupingComparatorClass() RawComparatorクラス Reducer処理前のソート・マージ(の為のキーの比較)を行うコンパレーターを指定する。 setSortComparatorClass()で指定したクラス
setNumReduceTasks() 個数 使用するReducerの個数を指定する。 1
setPartitionerClass() Partitionerクラス Partitioner(Mapperの出力をどのReducerへ渡すか決めるクラス)を指定する。 HashPartitioner.class

Map/Reduceのコーディングでは、入力の型出力の型に相関関係がある。
つまり、それぞれのステップの前後における入力のキー・データの型出力のキー・データの型が一致していなければならない。

データの流れから考えて、その関係は以下のようになる。

前ステップの出力   後ステップの入力 備考・例
入力フォーマットの出力型 Mapperの入力型 例えばTextInputFormatを使う場合、Mapperの入力の型は<LongWritable, Text>になる。
Mapperの出力型 Combinerの入力型 結論として、Combinerの入力型と出力型は同じでなければならない。
Combinerの出力型 Reducerの入力型
Mapperの出力型 Reducerの入力型  
Reducerの出力型 出力フォーマットの入力型 例えばTableOutputFormatを使う場合、Reducerの出力の型は<任意, Writable>になる。
public class MyInputFormat extends InputFormat<K, V> {
	〜
}
public class MyMapper   extends Mapper <KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

	@Override
	protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
		KEYOUT   outKey = 〜;
		VALUEOUT outVal = 〜;
		context.write(outKey, outVal);
	}
}
public class MyCombiner extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

	@Override
	protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
		KEYOUT   outKey = 〜;
		VALUEOUT outVal = 〜;
		context.write(outKey, outVal);
	}
}
public class MyReducer  extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

	@Override
	protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
		KEYOUT   outKey = 〜;
		VALUEOUT outVal = 〜;
		context.write(outKey, outVal);
	}
}
public class MyOutputFormat extends OutputFormat<K, V> {
	〜
}
	public static void main(String[] args) {
		Configuration conf = new Configuration();
		Job job = new Job(conf, "〜");
		job.setJarByClass(〜.class);

		job.setMapOutputKeyClass  (MyMapperのKEYOUT);
		job.setMapOutputValueClass(MyMapperのVALUEOUT);
		job.setOutputKeyClass  (MyReducerのKEYOUT);
		job.setOutputValueClass(MyReducerのVALUEOUT);

		job.setMapperClass  (MyMapper.class);
		job.setCombinerClass(MyCombiner.class);
		job.setReducerClass (MyReducer.class);

		job.setInputFormatClass (MyInputFormat.class);
		job.setOutputFormatClass(MyOutputFormat.class);
〜
	}

Hadoop目次へ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま