S-JIS[2010-02-25/2011-12-25] 変更履歴

Hadoop Combiner

HadoopのCombinerクラス(集約関数)について。
(combineは「コンバイン」と読むが、combinerは「コンバイナー」でいいのかな)


Combinerの概要

CombinerはMapperの出力結果を(Reducerに渡す前に)中間集計するもの。

Combinerの目的は、“Mapperの出力レコードが多くなってバッファーに入り切らなくなった際に、(Reducerと同様の)集計を行ってレコード数を減らす”というもの。
したがって、(MapperやReducerには全データが必ず渡されるが、)Combinerには全てのレコードが渡されるとは限らない。
(Combinerが常に呼ばれるとも限らないし、同一キーであってもMapper出力の一部分しか渡ってこないこともある(というか中間集計なので、一部分しか来ないのが普通と考えるべき?))

Mapper出力がメモリー上のバッファーに入り切らなくなると一時ファイルに書き出される為、ファイル出力が減ると効率がよい(ファイル出力はメモリーアクセスに比べれば遅いので、少ない方がよい)。
Combinerによってバッファー内のレコード数が減れば、ファイル出力の量が減る。
また、このCombiner処理はReducerへデータが送られる前に行われる為、Reducerが別サーバーにあるとネットワーク上の転送量が減るという意味でも効率が良くなる。
さらには、Shuffleフェーズにおけるソート量が減るという意味でも効率が良い。[2011-12-25]


HadoopのAPIとしてはCombinerというクラスは無く、Reducerを継承する。
そしてJob#setCombinerClass()でジョブにCombinerクラスを登録する。

CombinerはReducerクラスを継承すると言っても、その型はReducerと違って自由には決められない。
Mapperの出力を受け取りReducerへ渡すという仕様なので、Combinerの入力型はMapperの出力型と等しく、Combinerの出力型はReducerの入力型と等しくなければならない。
一方、Mapperの出力は(Reducerへ直接渡されることもあるので)、Reducerの入力型と等しい。
この三段論法により、Combinerの入力型とCombinerの出力型は同じでなければならない。
(Combinerの入力型=Mapperの出力型=Reducerの入力型=Combinerの出力型)
Map/Reduceの入力型と出力型の相関関係

なので、Combinerの入出力処理の内容(および入出力型)とReducerの入出力処理(入出力型)が等しい場合は
Combinerの具体的なクラスとして、Reducerが兼ねることが出来る。
(そういう意味があって、CombinerはReducerを継承するようになっていて専用クラスが無いのだろう)


チュートリアルのWordCountCombinerにReducerと同じクラスを指定している。

WordCountでは Mapperの出力型は<単語, 個数>であり、Reducerの処理は「単語ごとに個数を合算する」というもの。
“合算”という処理は、適当なところで分割して別々に合算したものを最終的に足し合わせても同じ結果になる。
( a+b+c+d+e=(a+b)+(c+d)+e
 → ReducerのみであればReducerが「1+1+1+1+1」を行うが、Combinerがある場合はCombinerが「1+1」と「1+1」を行い、Reducerが最終的に「2+2+1」を行う)
また、WordCountのReducerは入力型も出力型も<単語, 個数>で、等しい。
したがって、WordCountではReducerとCombinerは同じ処理でよいので、CombinerにReducerをそのまま指定することが出来る。


@ITのMapReduceのJava実装Apache Hadoopを使ってみた (2/3)では「Combinerには、普通はReducerと同じクラスを登録すればいい」と書かれているが、これは「合算するような処理では」という前提が付いているように思える。
まぁ、Reducerは“普通は”合算処理しかしないという意味なのかもしれないけど…。

例えばHBaseのテーブルのデータを集計してテーブルに書き込む(更新する)という処理の場合、
Mapperの入力は<ImmutableBytesWritable, Result>、出力は<集計キー, 値>となり、
Reducerの入力は(Mapperの出力と同じく)<集計キー, 値>で出力は<NullWritable, Put>になる。
この場合、(Reducerの入出力の型が異なるので)CombinerにReducerを使うことは出来ない。


Combinerの実験

チュートリアルのWordCountにCombinerクラスを入れてみた。
と言ってもReducerとの違いはログ出力だけだけど…。

public class WordCount {
	public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String line = value.toString();
			System.out.println("map: " + key + "/" + line);

			StringTokenizer tokenizer = new StringTokenizer(line);
			while (tokenizer.hasMoreTokens()) {
				word.set(tokenizer.nextToken());
				// 実験の為、データを増殖させる
				for (int i = 0; i < 10 * 10000; i++) {
					context.write(word, one);
				}
			}
		}
	}
	public static class Combine extends Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable value : values) {
				sum += value.get();
			}
			System.out.println("combine: " + key + "=" + sum);
			context.write(key, new IntWritable(sum));
		}
	}
	public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable value : values) {
				int n = value.get();
				System.out.println("reduce: " + key + "=" + n);
				sum += n;
			}
			context.write(key, new IntWritable(sum));
		}
	}
	public static void main(String[] args) throws Exception {
〜
		job.setMapperClass(Map.class);
		job.setCombinerClass(Combine.class);
		job.setReducerClass(Reduce.class);
〜
	}
}

実行結果:

map: 0/Hello World Bye World
combine: Bye=62144
combine: Hello=100000
combine: World=100000
combine: Bye=37856
combine: World=100000
map: 0/Hello Hadoop Goodbye Hadoop
combine: Goodbye=62144
combine: Hadoop=100000
combine: Hello=100000
combine: Goodbye=37856
combine: Hadoop=100000
reduce: Bye=62144
reduce: Bye=37856
reduce: Goodbye=62144
reduce: Goodbye=37856
reduce: Hadoop=100000
reduce: Hadoop=100000
reduce: Hello=100000
reduce: Hello=100000
reduce: World=100000
reduce: World=100000

ByeとGoodbyeの中間集計の件数がぴったり同じなところを見ると、メモリーサイズによってけっこうきっちり分割しているのかもね。

ちなみに自分のPCの単独環境で上記プログラム(System.out.println()はコメントアウトした)のCombiner有無による実行時間を計ってみたところ、
Combiner有り: 5700ミリ秒前後
Combiner無し:11650ミリ秒前後
で、およそ2倍も違った。


Hadoop APIへ戻る / Hadoop目次へ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま