HadoopのCounterクラスについて。
|
|
Counterは、Map/Reduceの処理件数等をカウントする(数える)もの。
CounterはMapper・Reducerの入出力件数・入出力バイト数といった、Hadoopの処理件数をカウントするのに使われている。
自分用のカウンターを作ることも出来るが、あくまで最終的な処理件数(全部で何件処理したか)を数えるものなので、カウントした値をロジックで使用する(MapperやReducerで読み込む)ことは出来ない。
つまりMapperでカウントしてReducerで使う、といった事は出来ない。→ダメな理由
条件に応じた処理件数(異常データの個数とか)を数えたい場合にカウンターを自作する。
カウンターはHadoop内では まずグループ名が付いており、そのグループ内でいくつかのカウンターを個別に管理しカウントされる。
各タスクでカウンターに加算してゆくと、最終的に合算されてジョブのカウンターとして取得できる。[2012-03-27]
途中経過の状態でもジョブに合算されるので、ブラウザー(JobTracker)からカウンター値を確認することが出来る。
もしタスクがkillされたりすると、その分の値はジョブのカウンターから減算される。
余分なタスク(投機的実行)が動いていても、カウンター値が重複する(余分に加算される)ことは無い。
Hadoop0.20.2には以下のようなカウンターがある。
グループ | カウンター | 備考 | ||
---|---|---|---|---|
名前 | 表示名 | 名前 | 表示名 | |
FileSystemCounters | FileSystemCounters | FILE_BYTES_READ | FILE_BYTES_READ | ファイルから読み込んだバイト数 |
FILE_BYTES_WRITTEN | FILE_BYTES_WRITTEN | ファイルへ書き込んだバイト数 | ||
org.apache.hadoop.mapred. Task$Counter |
Map-Reduce Framework | COMBINE_INPUT_RECORDS | Combine input records | Combinerの入力件数 |
COMBINE_OUTPUT_RECORDS | Combine output records | Combinerの出力件数 | ||
MAP_INPUT_RECORDS | Map input records | Mapperの入力件数 | ||
MAP_OUTPUT_BYTES | Map output bytes | Mapperの出力バイト数 | ||
MAP_OUTPUT_RECORDS | Map output records | Mapperの出力件数 | ||
REDUCE_INPUT_GROUPS | Reduce input groups | Reducerの入力グループ数(キーの件数) | ||
REDUCE_INPUT_RECORDS | Reduce input records | Reducerの入力件数(値の件数) | ||
REDUCE_OUTPUT_RECORDS | Reduce output records | Reducerの出力件数 | ||
REDUCE_SHUFFLE_BYTES | Reduce shuffle bytes | Reducerのシャッフルのバイト数 | ||
SPILLED_RECORDS | Spilled Records | こぼれたレコード数 | ||
列挙型クラス名(FQCN) | 列挙型クラス名(FQCN) | 列挙子名 | 列挙子名 | 独自カウンターで列挙子を指定した場合 |
カウンターを取得する例。[2012-03-27]
import org.apache.hadoop.mapred.Task; 〜 Counters counters = job.getCounters(); Counter counter = counters.findCounter(Task.Counter.MAP_OUTPUT_RECORDS); long c = counter.getValue();
カウンターオブジェクトは、MapperあるいはReducer(Combiner)のcontext、もしくはJobから取得することが出来る。
カウンターの種類を区別する為に列挙子(enum)を用意するのが一般的なようだ。
public enum CounterEnum { MY_COUNT }
import org.apache.hadoop.mapreduce.Counter; 〜 public class Map extends Mapper<LongWritable, Text, IntWritable, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 〜 // 独自カウンターに1加算 Counter counter = context.getCounter(CounterEnum.MY_COUNT); counter.increment(1); } }
カウンターインスタンスが存在していなかった場合は自動的に(値が0の)カウンターインスタンスが生成されるので、getCounter()がnullを返す事は無い。
グループ名とカウンター名を明示的に指定することも出来る。
// 独自カウンターに1加算 Counter counter = context.getCounter("自作グループ", "独自カウンター"); counter.increment(1);
列挙子を指定した場合も、内部では列挙型のクラス名と列挙子の名前を使用してグループ名とカウンター名を生成している。
したがって、列挙子MY_COUNTを使用した方式と以下の指定方法は等価。
Counter counter = context.getCounter(CounterEnum.class.getName(), CounterEnum.MY_COUNT.toString());
カウントされた結果は、Jobクラスから取得することが出来る。
import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counters;
Configuration conf = new Configuration(); Job job = new Job(conf, "wordcount"); 〜 // 実行開始 job.submit(); while (!job.isComplete()) { //完了待ち Counters counters = job.getCounters(); Counter c1 = counters.findCounter(CounterEnum.MY_COUNT); // Counter c1 = counters.findCounter("自作グループ", "独自カウンター"); System.out.printf("job: %d\n", c1.getValue()); Thread.sleep(100); }
Jobの処理中も終了後(isComplete()がtrueになった後や、job.waitForCompletion()が終わった後)もカウンターを取得することが出来る。
処理中に途中経過のカウント数を取得したい場合は、毎回job.getCounters()
から行うこと。
(getCounters()を呼ぶと、毎回Countersインスタンスが新しく作られる為。すなわち呼び出した時点の件数が返ってくるので、その後増えたカウントは反映されない)
ただし途中経過と言っても、個々のMapTaskやReduceTaskが終わった後にしか更新されない模様。
import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Counters;
Counters counters = job.getCounters(); for (CounterGroup group : counters) { System.out.printf("group: %s\t%s%n", group.getName(), group.getDisplayName()); for (Counter c : group) { System.out.printf("\t%s\t%s\t%d%n", c.getName(), c.getDisplayName(), c.getValue()); } }
カウンターは件数のカウントだが、MapperでカウントしてReducerへ渡す、という使い方は出来ない。
例えば試験の点数の平均値を求める為に、Mapperの出力は点数にして、人数はカウンターを用いて集計し、Reducerで点数を合計して平均値を算出…というような事は出来ない。
public static enum CounterEnum { PERSONS //人数 }
public static class Map extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
private IntWritable intk = new IntWritable();
private IntWritable intw = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
if (line.startsWith("#") || line.trim().isEmpty()) {
return;
}
StringTokenizer tokenizer = new StringTokenizer(line, ",");
for (int i = 0; tokenizer.hasMoreTokens(); i++) {
String s = tokenizer.nextToken();
switch (i) {
case 0: // 学生番号
case 1: // 名前
break;
default: // 各教科の点数
intk.set(i);
intw.set(Integer.parseInt(s.trim())); // 点数を合算する
context.write(intk, intw);
break;
}
}
// 人数をカウントする(つもり)
Counter counter = context.getCounter(CounterEnum.PERSONS);
counter.increment(1);
System.out.printf("Mapper[%h].人数=%d%n", this, counter.getValue());
}
}
public static class Reduce extends Reducer<IntWritable, IntWritable, Text, Text> {
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 各教科の点数を合計
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
// カウントした人数を取得(できるつもり)
Counter counter = context.getCounter(CounterEnum.PERSONS);
long count = counter.getValue();
System.out.printf("Reducer[%h].人数=%d%n", this, counter.getValue());
// 平均点を出力
String n = key.toString();
String s = String.format("%5d\t%7.2f", sum, (double) sum / count);
context.write(new Text(n), new Text(s));
}
}
カウントされたカウンターの値は、MapperやReducer(Combiner)からJobへ渡されるだけで、MapperからReducerへは渡されない。
したがって、ReducerでMapperと同名のカウンターを取得したとしても、新しいカウンターとして扱われる。
上記の例ではReducerで取得したカウンターの値は常に0(Reducerの中ではインクリメントしていないから)であり、平均点の計算には使えない。
(ちなみにdouble型は0で割ると「Infinity(無限)」という値になるので、この例ではたまたま異常終了はしない)
仮にReducerでMapperと同名のカウンターに加算したとすると、
Mapperのカウンターは0から始まり、Reducerのカウンターも0から始まって別々にカウントされる。
そしてJobでは両方のカウント値が合算されるので、全体として正しいカウント値になる。
そもそもMapperの個数もReducerの個数も不定(1個以上ありうる)で、しかも別ノード(別サーバー・別JavaVM)で動く。
Reducerは各Mapperの自分用のデータが揃えば動き始める為、全Mapperのカウント数が取れるはずもない。
だからMapTask・ReduceTaskのインスタンスそれぞれで個別にカウントし、Jobで合算される仕組みになっているのだろう。
(Mapper-Reducer間も、Mapper同士・Reducer同士も、お互いのカウンター値を知る仕組みは無い)
ただしMapper-Combiner間は同一のMapTask内なので、Mapperでカウントした値はCombinerから取得できる模様。(自分以外のMapTaskの値は当然取得不可)
かと言って(Combinerは常に実行されるとは限らないし)、それに依存するコーディングはしない方がいいと思うが…。