HadoopのJava APIについて。
|
|
チュートリアルのソースでは使っていないが、@ITのサンプルではToolRunnerというクラスを使って実行している。
ToolRunnerを使うと、Configurationの初期化や汎用(一般)オプション(generic option)の解釈を行ってくれる。
したがって、Configurationの初期化を自前で行い、汎用オプションも使わないのであれば、ToolRunnerを使う必要は無い。
import org.apache.hadoop.conf.Configuration;
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setJarByClass(WordCount.class);
〜
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
public class WordCount extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int r = ToolRunner.run(new WordCount(), args);
System.exit(r);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "wordcount");
job.setJarByClass(getClass());
〜
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
ToolRunner#run()にはToolインターフェースの実装クラスを渡す必要があるので、クラス本体(上の例ではWordCount)はToolを実装する必要がある。
ToolインターフェースはConfigurableというインターフェースを継承している。
これにはgetConf()等が宣言されており、これをConfiguredクラスが実装している為、クラス本体はConfiguredを親クラスとしている。
Toolインターフェースのメソッドの中で Configuredクラスで実装されていないのはrun()のみであり、これをクラス本体が実装する必要がある。
ToolRunnerはConfigurationの初期化を行った後でTool#run()を呼び出す。
ToolRunnerが解釈してくれる汎用オプション(汎用コマンドライン・一般オプション)の内容は、ToolRunner.printGenericCommandUsage()を実行すると表示される。
ToolRunner.printGenericCommandUsage(System.out);
Generic options supported are -conf <configuration file> specify an application configuration file -D <property=value> use value for given property -fs <local|namenode:port> specify a namenode -jt <local|jobtracker:port> specify a job tracker -files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster -libjars <comma separated list of jars> specify comma separated jar files to include in the classpath. -archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines. The general command line syntax is bin/hadoop command [genericOptions] [commandOptions]
| オプション | 内容 | 備考 |
|---|---|---|
| -conf 設定ファイル | 設定ファイルを指定する。 設定ファイルの内容はHADOOP_HOME/conf/の下にあるxmlファイルと同形式。 すなわち <property>の様に書く。 そうすると、Tool#run()が呼ばれるときにはconfには設定ファイル内の値がセットされており、 上記の例では conf.get("hbase.mapred.outputtable")で"TEST_TYPE"を取得することが出来る。 |
→独自設定ファイル を指定する例 |
| -D プロパティー名=値 | confにプロパティーを設定する。[2010-03-12] 設定ファイルのプロパティーより優先される。(設定ファイルで指定されたプロパティーに上書きされる) javaコマンドの-Dオプションと違って、「-D」と「プロパティー名」の間にはスペースが必要なので注意。 |
→Reducerの個数 を指定する例 |
| -libjars jarファイル,… | 独自に使用するjarファイルをカンマ区切りで列挙する。 (通常、クラスパス等でjarファイルを列挙する際の区切り文字はWindowsだとセミコロン、UNIXだとコロンだが、 そういう機種依存をしない為にカンマ区切りになっているのだと思う) HADOOP_HOME/bin/hadoopシェルはHADOOPのライブラリーしかクラスパスに入れてくれないので、 その他のライブラリーを使う際はここでjarファイルを指定できる。 というものだと思うのだが、ToolRunner内で新しいクラスローダーを作成することになるので 最初にロードされたクラスからライブラリー内のクラスは参照できないような気が…。(単独環境だけの問題か?) 「hadoop jar zzz.jar」でHADOOPを実行する場合、zzz.jar内のルートにlibディレクトリーがあったら そのlib直下のjarファイルもクラスパスに含めてくれるようなので、そちらの方が確実かも。 (「bin/hadoop jar」はjarファイルの展開なんぞをやっているので、ちょっと無駄な気もする。 が、libディレクトリー内のjarファイルをクラスパスに含める為には、解凍が必須なのかも) |
最初に、どのクラスを使ってMap/Reduceの処理を行うかを指定する。[2010-02-24/2010-03-19]
| 初期化メソッド | 指定する値 | 説明 | 省略時(デフォルト) | |
|---|---|---|---|---|
| Job | setMapOutputKeyClass() | Mapperの出力キーのクラス | Mapperの出力型(すなわちReducerの入力型・Combinerの型)を指定する。 ここで指定した型が実際のMapperの出力と異なっていたら例外が発生する。 |
setOutputKeyClass()で指定した型 |
| setMapOutputValueClass() | Mapperの出力値のクラス | setOutputValueClass()で指定した型 | ||
| setOutputKeyClass() | Reducerの出力キーのクラス | Reducerの出力型(すなわちOutputFormatの入力型)を指定する。 | LongWritable.class | |
| setOutputValueClass() | Reducerの出力値のクラス | Text.class | ||
| setMapperClass() | Mapperクラス | Mapperの具象クラスを指定する。 | Mapper.class (入力をそのまま出力する) |
|
| setCombinerClass() | Combinerクラス | Combinerの具象クラスを指定する。 | null (中間集計を行わない) |
|
| setReducerClass() | Reducerクラス | Reducerの具象クラスを指定する。 | Reducer.class (入力をそのまま出力する) |
|
| setInputFormatClass() | InputFormatクラス | データ読み込みを行いMapperへデータを受け渡すInputFormatの具象クラスを指定する。 | TextInputFormat.class | |
| setOutputFormatClass() | OutputFormatクラス | Reducerからデータを受け取り最終出力を行うOutputFormatの具象クラスを指定する。 | TextOutputFormat.class | |
| setSortComparatorClass() | RawComparatorクラス | Mapper出力後(Combiner処理前)のソート(の為のキーの比較)を行うコンパレーターを指定する。 | setMapOutputKeyClass()で指定したクラス のWritableComparator |
|
| setGroupingComparatorClass() | RawComparatorクラス | Reducer処理前のソート・マージ(の為のキーの比較)を行うコンパレーターを指定する。 | setSortComparatorClass()で指定したクラス | |
| setNumReduceTasks() | 個数 | 使用するReducerの個数を指定する。 | 1 | |
| setPartitionerClass() | Partitionerクラス | Partitioner(Mapperの出力をどのReducerへ渡すか決めるクラス)を指定する。 | HashPartitioner.class | |
Map/Reduceのコーディングでは、入力の型と出力の型に相関関係がある。
つまり、それぞれのステップの前後における入力のキー・データの型と出力のキー・データの型が一致していなければならない。
データの流れから考えて、その関係は以下のようになる。
| 前ステップの出力 | 後ステップの入力 | 備考・例 | |
|---|---|---|---|
| 入力フォーマットの出力型 | = | Mapperの入力型 | 例えばTextInputFormatを使う場合、Mapperの入力の型は<LongWritable, Text>になる。 |
| Mapperの出力型 | = | Combinerの入力型 | 結論として、Combinerの入力型と出力型は同じでなければならない。 |
| Combinerの出力型 | = | Reducerの入力型 | |
| Mapperの出力型 | = | Reducerの入力型 | |
| Reducerの出力型 | = | 出力フォーマットの入力型 | 例えばTableOutputFormatを使う場合、Reducerの出力の型は<任意, Writable>になる。 |
public class MyInputFormat extends InputFormat<K, V> {
〜
}
public class MyMapper extends Mapper <KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
@Override
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
KEYOUT outKey = 〜;
VALUEOUT outVal = 〜;
context.write(outKey, outVal);
}
}
public class MyCombiner extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
@Override
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
KEYOUT outKey = 〜;
VALUEOUT outVal = 〜;
context.write(outKey, outVal);
}
}
public class MyReducer extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
@Override
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
KEYOUT outKey = 〜;
VALUEOUT outVal = 〜;
context.write(outKey, outVal);
}
}
public class MyOutputFormat extends OutputFormat<K, V> {
〜
}
public static void main(String[] args) {
Configuration conf = new Configuration();
Job job = new Job(conf, "〜");
job.setJarByClass(〜.class);
job.setMapOutputKeyClass (MyMapperのKEYOUT);
job.setMapOutputValueClass(MyMapperのVALUEOUT);
job.setOutputKeyClass (MyReducerのKEYOUT);
job.setOutputValueClass(MyReducerのVALUEOUT);
job.setMapperClass (MyMapper.class);
job.setCombinerClass(MyCombiner.class);
job.setReducerClass (MyReducer.class);
job.setInputFormatClass (MyInputFormat.class);
job.setOutputFormatClass(MyOutputFormat.class);
〜
}