S-JIS[2010-03-17/2012-03-27] 変更履歴

Hadoop Counter

HadoopのCounterクラスについて。

  • Hadoop APIドキュメントのCounter

Counterの概要

Counterは、Map/Reduceの処理件数等をカウントする(数える)もの。

CounterはMapperReducerの入出力件数・入出力バイト数といった、Hadoopの処理件数をカウントするのに使われている。

自分用のカウンターを作ることも出来るが、あくまで最終的な処理件数(全部で何件処理したか)を数えるものなので、カウントした値をロジックで使用する(MapperやReducerで読み込む)ことは出来ない。
つまりMapperでカウントしてReducerで使う、といった事は出来ない。→ダメな理由
条件に応じた処理件数(異常データの個数とか)を数えたい場合にカウンターを自作する。

カウンターはHadoop内では まずグループ名が付いており、そのグループ内でいくつかのカウンターを個別に管理しカウントされる。

各タスクでカウンターに加算してゆくと、最終的に合算されてジョブのカウンターとして取得できる。[2012-03-27]
途中経過の状態でもジョブに合算されるので、ブラウザー(JobTracker)からカウンター値を確認することが出来る。
もしタスクがkillされたりすると、その分の値はジョブのカウンターから減算される。
余分なタスク(投機的実行)が動いていても、カウンター値が重複する(余分に加算される)ことは無い。


Counterの種類

Hadoop0.20.2には以下のようなカウンターがある。

グループ カウンター 備考
名前 表示名 名前 表示名
FileSystemCounters FileSystemCounters FILE_BYTES_READ FILE_BYTES_READ ファイルから読み込んだバイト数
FILE_BYTES_WRITTEN FILE_BYTES_WRITTEN ファイルへ書き込んだバイト数
org.apache.hadoop.mapred.
Task$Counter
Map-Reduce Framework COMBINE_INPUT_RECORDS Combine input records Combinerの入力件数
COMBINE_OUTPUT_RECORDS Combine output records Combinerの出力件数
MAP_INPUT_RECORDS Map input records Mapperの入力件数
MAP_OUTPUT_BYTES Map output bytes Mapperの出力バイト数
MAP_OUTPUT_RECORDS Map output records Mapperの出力件数
REDUCE_INPUT_GROUPS Reduce input groups Reducerの入力グループ数(キーの件数)
REDUCE_INPUT_RECORDS Reduce input records Reducerの入力件数(値の件数)
REDUCE_OUTPUT_RECORDS Reduce output records Reducerの出力件数
REDUCE_SHUFFLE_BYTES Reduce shuffle bytes Reducerのシャッフルのバイト数
SPILLED_RECORDS Spilled Records こぼれたレコード数
列挙型クラス名(FQCN 列挙型クラス名(FQCN 列挙子名 列挙子名 独自カウンターで列挙子を指定した場合

カウンターを取得する例。[2012-03-27]

import org.apache.hadoop.mapred.Task;
〜
	Counters counters = job.getCounters();
	Counter counter = counters.findCounter(Task.Counter.MAP_OUTPUT_RECORDS);
	long c = counter.getValue();

独自カウンターの使用例

カウンターオブジェクトは、MapperあるいはReducerCombiner)のcontext、もしくはJobから取得することが出来る。

カウンターの種類を区別する為に列挙子(enum)を用意するのが一般的なようだ。

public enum CounterEnum {
	MY_COUNT
}
import org.apache.hadoop.mapreduce.Counter;
〜
public class Map extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
〜
		// 独自カウンターに1加算
		Counter counter = context.getCounter(CounterEnum.MY_COUNT);
		counter.increment(1);
	}
}

カウンターインスタンスが存在していなかった場合は自動的に(値が0の)カウンターインスタンスが生成されるので、getCounter()がnullを返す事は無い。


グループ名とカウンター名を明示的に指定することも出来る。

		// 独自カウンターに1加算
		Counter counter = context.getCounter("自作グループ", "独自カウンター");
		counter.increment(1);

列挙子を指定した場合も、内部では列挙型のクラス名と列挙子の名前を使用してグループ名とカウンター名を生成している。
したがって、列挙子MY_COUNTを使用した方式と以下の指定方法は等価。

		Counter counter = context.getCounter(CounterEnum.class.getName(), CounterEnum.MY_COUNT.toString());

カウントされた結果は、Jobクラスから取得することが出来る。

import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
		Configuration conf = new Configuration();
		Job job = new Job(conf, "wordcount");
〜
		// 実行開始
		job.submit();
		while (!job.isComplete()) { //完了待ち
			Counters counters = job.getCounters();

			Counter c1 = counters.findCounter(CounterEnum.MY_COUNT);
//			Counter c1 = counters.findCounter("自作グループ", "独自カウンター");
			System.out.printf("job: %d\n", c1.getValue());

			Thread.sleep(100);
		}

Jobの処理中も終了後(isComplete()がtrueになった後や、job.waitForCompletion()が終わった後)もカウンターを取得することが出来る。

処理中に途中経過のカウント数を取得したい場合は、毎回job.getCounters()から行うこと。
(getCounters()を呼ぶと、毎回Countersインスタンスが新しく作られる為。すなわち呼び出した時点の件数が返ってくるので、その後増えたカウントは反映されない)
ただし途中経過と言っても、個々のMapTaskやReduceTaskが終わった後にしか更新されない模様。

全カウンターを表示する例

import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
		Counters counters = job.getCounters();
		for (CounterGroup group : counters) {
			System.out.printf("group: %s\t%s%n", group.getName(), group.getDisplayName());

			for (Counter c : group) {
				System.out.printf("\t%s\t%s\t%d%n", c.getName(), c.getDisplayName(), c.getValue());
			}
		}

カウンターの駄目な使用例

カウンターは件数のカウントだが、MapperでカウントしてReducerへ渡す、という使い方は出来ない。
例えば試験の点数の平均値を求める為に、Mapperの出力は点数にして、人数はカウンターを用いて集計し、Reducerで点数を合計して平均値を算出…というような事は出来ない。

	public static enum CounterEnum {
		PERSONS	//人数
	}
	public static class Map extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
		private IntWritable intk = new IntWritable();
		private IntWritable intw = new IntWritable();

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String line = value.toString();
			if (line.startsWith("#") || line.trim().isEmpty()) {
				return;
			}

			StringTokenizer tokenizer = new StringTokenizer(line, ",");
			for (int i = 0; tokenizer.hasMoreTokens(); i++) {
				String s = tokenizer.nextToken();
				switch (i) {
				case 0: // 学生番号
				case 1: // 名前
					break;
				default: // 各教科の点数
					intk.set(i);
					intw.set(Integer.parseInt(s.trim())); // 点数を合算する
					context.write(intk, intw);
					break;
				}
			}

			// 人数をカウントする(つもり)
			Counter counter = context.getCounter(CounterEnum.PERSONS);
			counter.increment(1);
			System.out.printf("Mapper[%h].人数=%d%n", this, counter.getValue());
		}
	}
	public static class Reduce extends Reducer<IntWritable, IntWritable, Text, Text> {

		@Override
		protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

			// 各教科の点数を合計
			int sum = 0;
			for (IntWritable value : values) {
				sum += value.get();
			}

			// カウントした人数を取得(できるつもり)
			Counter counter = context.getCounter(CounterEnum.PERSONS);
			long count = counter.getValue();
			System.out.printf("Reducer[%h].人数=%d%n", this, counter.getValue());

			// 平均点を出力
			String n = key.toString();
			String s = String.format("%5d\t%7.2f", sum, (double) sum / count);
			context.write(new Text(n), new Text(s));
		}
	}

カウントされたカウンターの値は、MapperやReducer(Combiner)からJobへ渡されるだけで、MapperからReducerへは渡されない。
したがって、ReducerでMapperと同名のカウンターを取得したとしても、新しいカウンターとして扱われる。
上記の例ではReducerで取得したカウンターの値は常に0(Reducerの中ではインクリメントしていないから)であり、平均点の計算には使えない。
(ちなみにdouble型は0で割ると「Infinity(無限)」という値になるので、この例ではたまたま異常終了はしない)

仮にReducerでMapperと同名のカウンターに加算したとすると、
Mapperのカウンターは0から始まり、Reducerのカウンターも0から始まって別々にカウントされる。
そしてJobでは両方のカウント値が合算されるので、全体として正しいカウント値になる。

そもそもMapperの個数もReducerの個数も不定(1個以上ありうる)で、しかも別ノード(別サーバー・別JavaVM)で動く。
Reducerは各Mapperの自分用のデータが揃えば動き始める為、全Mapperのカウント数が取れるはずもない。
だからMapTask・ReduceTaskのインスタンスそれぞれで個別にカウントし、Jobで合算される仕組みになっているのだろう。
(Mapper-Reducer間も、Mapper同士・Reducer同士も、お互いのカウンター値を知る仕組みは無い)

ただしMapper-Combiner間は同一のMapTask内なので、Mapperでカウントした値はCombinerから取得できる模様。(自分以外のMapTaskの値は当然取得不可)
かと言って(Combinerは常に実行されるとは限らないし)、それに依存するコーディングはしない方がいいと思うが…。


Hadoop APIへ戻る / Hadoop目次へ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま