HadoopのReducerクラスについて。
|
|
Reducerは、Mapperの出力を演算する。
Mapperからの出力はキー毎にまとめられて、キーの昇順にソートされて入ってくる。
ほとんどのケースでは値は合算(集計)すると思うが、最大値や最小値を見つけるという事にも使える。
Mapperの出力をそのままOutputFormatに渡す場合はReducerは不要。
Reducerの個数を0にするとReducerは実行されない。
Mapperと同様に、reduce()メソッドでcontext.write()する為のインスタンスは使い回しがきく。
public class IntSumReducer<Key> extends Reducer<Key,IntWritable, Key,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Key key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
(Reducerの出力はOutputFormatに渡される。
TextOutputFormatではファイルストリームに出力しているので渡したインスタンスは保持されない。
TableOutputFormatではインスタンスのコピーが作られるので、渡したインスタンス自体は保持されない)
Reducer(Hadoop0.20.1)には以下のようなクラスがある。
自分でReducerを作る際には、ReducerかTableReducer辺りを親クラスに使うのが普通だと思う。
クラス名 | 入力型 | 出力型 | 説明 | ||
---|---|---|---|---|---|
KEYIN | VALIN | KEYOUT | VALOUT | ||
org.apache.hadoop.mapreduce. Reducer |
任意 | 任意 | 任意 | 任意 | 入力のキーと値をそのまま全て出力する。 →チュートリアル・サンプル |
org.apache.hadoop.mapreduce.lib.reduce. IntSumReducer |
任意 | IntWritable | KEYIN | IntWritable | intの値を受け取って全て合算し、入力と同じキーで集計値を出力する。 つまりはチュートリアルWordCountのReduceと全く同じ。 |
org.apache.hadoop.mapreduce.lib.reduce. LongSumReducer |
任意 | LongWritable | KEYIN | LongWritable | IntSumReducerのlong版。 |
実行時にいくつのReducer(ReduceTask)が作られるのかは、Job#setNumReduceTasks()で指定できる。[2010-03-19]
デフォルトは1。
つまり全てのMapperの出力を1つのReducerで処理することになる。
0を指定すると、Reducerの処理が行われない。
この場合、Mapperの出力が直接OutputFormatに送られる。
また、Partitionerは使われない(job.setPartitionerClass()によってPartitionerを指定していても、無視される)。[2010-08-11]
(PartitionerはMapperの出力をReducerに渡す為に分割するものだから)
総合計を算出するReducerなら、1つだけにしないと全データが対象にならないだろう。
キー毎に集計を行うようなReducerなら、複数あった方が並列性がいいだろう。
集計が必要ないジョブでは、Reducerは要らない。
※単独環境では、2より大きい個数を指定しても、Reducerは1個しか作られない。
Reducerの個数は、Hadoopが自動的に算出してくれるわけではない。
Reducerが何の処理を行うのかは人間(プログラマー)にしか分からないので、Reducerがいくつ必要なのかも人間が指定する必要がある。
Reducerの個数は、マシン(ノード)の個数より少し少ないのがいいらしい。
ノードの個数とぴったり同じなら、並列性(同時に実行できる処理数)が最も良くなる。
しかしHadoopでは何かの障害が起きると障害が起きたタスクを別ノードに割り振って再実行させるので、障害が起きた時の為に少しだけノードを空けておく方がいいらしい。
(ノードが空いていないと、どこかの処理が終了して空くまでタスクが待機することになり、その結果、全体の完了時間が遅くなってしまう為)
また、他のジョブも同時に動かすことを考えれば、自分が全ノードを使っていいのかどうかも考慮する必要がある。
自分のジョブの方が優先度が高いのか他ジョブを優先すべきなのかは、人間でないと判断できない。
つまり、環境や優先度等によって、指定すべきReducerの個数が変わる。
したがって、Reducerの個数を0か1にしないといけない場合はプログラム内でJob#setNumReduceTasks()によって個数を指定(固定)してしまえばよいが、
そうでない場合はプログラムの外から(実行時引数で)Reducerの個数を指定できる方が都合が良い。
この為にToolRunnerを使うと便利。
public class WordCount extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "wordcount"); job.setJarByClass(getClass()); System.out.println("reducer.num : " + job.getNumReduceTasks()); 〜 } public static void main(String[] args) throws Exception { int r = ToolRunner.run(new WordCount(), args); System.exit(r); } }
$ /usr/local/hadoop-0.20.2/bin/hadoop jar wordcount.jar jp.hishidama.hadoop.tutorial.WordCount -D mapred.reduce.tasks=2 input output
$ cat wordcount.conf ←適当に付けたファイル名 <?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>mapred.reduce.tasks</name> <value>2</value> </property> </configuration> $ /usr/local/hadoop-0.20.2/bin/hadoop jar wordcount.jar jp.hishidama.hadoop.tutorial.WordCount -conf wordcount.conf input output