S-JIS[2010-02-26/2010-03-19] 変更履歴

Hadoopサンプル

HadoopJava APIを使ったサンプル。


行毎の合算を行うサンプル

以下のような成績表(試験の点数のCSVファイル)から、各学生ごとの合計点数を算出する例。

#番号,名前,     数学,国語,理科,社会,英語
  101,天才,      100, 100, 100, 100, 100
  201,無気力,      5,  30,  10,  20,  15
  102,ひしだま,   90,  85,  80,  50,  10
  202,hoge,       10,  30,  25,  45,  20
  103,foo,        60,  60,  60,  60,  25
  204,xxx,        80,  77,  90,  40,  44
  205,yyy,        65,  90,  55,  80,  65
  104,zzz,        40,  60,  60,  70,  20

出力イメージ:

101,天才	500
102,ひしだま	315
103,foo	265
104,zzz	250
201,無気力	80
202,hoge	130
204,xxx	331
205,yyy	355

ソース:

package jp.hishidama.hadoop.sum;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class RowSum {
	public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
		private IntWritable intw = new IntWritable();
		private Text word = new Text();

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String line = value.toString();
			if (line.startsWith("#") || line.trim().isEmpty()) {
				return; //コメントや空行は集計対象外
			}

			String no = "";
			String name = "";

			StringTokenizer tokenizer = new StringTokenizer(line, ",");
			for (int i = 0; tokenizer.hasMoreTokens(); i++) {
				String s = tokenizer.nextToken();
				switch (i) {
				case 0: //学生番号
					no = s.trim();
					break;
				case 1: //名前
					name = s.trim();
					word.set(no + "," + name);
					break;
				default: //各教科の点数
					intw.set(Integer.parseInt(s.trim()));
					context.write(word, intw);
					break;
				}
			}
		}
	}

↑このMapperでは、カンマで分解して3項目目以降(各教科の点数)を出力している。
また、入力となるCSVファイルの1項目目(学生番号)と2項目目(名前)を集計キーとしている。
別に番号だけでもいいんだけど、最終的な出力には名前も入っている方が分かりやすいから。

	public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

		private IntWritable intw = new IntWritable();

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable value : values) {
				sum += value.get();
			}
			intw.set(sum);
			context.write(key, intw);
		}
	}

↑このReducerは、よく出てくる、何の変哲もない集計。(→IntSumReducer

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf, "rowsum");
		job.setJarByClass(RowSum.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setMapperClass(Map.class);
		job.setCombinerClass(Reduce.class);
		job.setReducerClass (Reduce.class);

		job.setInputFormatClass (TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileInputFormat .setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		boolean success = job.waitForCompletion(true);
		System.out.println(success);
	}
}

単独環境実行用のbuild.xml:

Eclipseのプロジェクトでclassesディレクトリーにコンパイルしたファイルを置き、binディレクトリーの下にこのbuild.xmlを置く想定。

<?xml version="1.0" encoding="Windows-31J"?>
<project name="hadoop rowsum" basedir="." default="all">

	<property name="cygwin"      location="C:\cygwin" />
	<property name="hadoop.home" location="${cygwin}/usr/local/hadoop-0.20.1" />

	<property name="target.home"   location="${cygwin}/home/hadoop/rowsum" />
	<property name="target.input"  location="${target.home}/input" />
	<property name="target.output" location="${target.home}/output" />

	<property name="main.class" value="jp.hishidama.hadoop.sum.RowSum" />

	<path id="class.path">
		<pathelement location="../classes" />
		<fileset dir="${target.home}">
			<include name="hadoop-*-core.jar" />
			<include name="lib/**/*.jar" />
		</fileset>
	</path>

	<target name="0.init" description="ディレクトリーや入力ファイルを準備する">
		<mkdir dir="${target.input}" />
		<property name="filename" value="${target.input}/file01" />
		<echo file="${filename}">
#番号,名前,     数学,国語,理科,社会,英語
  101,天才,      100, 100, 100, 100, 100
  201,無気力,      5,  30,  10,  20,  15
  102,ひしだま,   90,  85,  80,  50,  10
  202,hoge,       10,  30,  25,  45,  20
  103,foo,        60,  60,  60,  60,  25
  204,xxx,        80,  77,  90,  40,  44
  205,yyy,        65,  90,  55,  80,  65
  104,zzz,        40,  60,  60,  70,  20
		</echo>
		<copy file="${filename}" encoding="Windows-31J" tofile="${filename}.utf8.txt" outputencoding="UTF-8" />
		<delete file="${filename}" />
	</target>

	<target name="1.rm_output" description="outputディレクトリーを削除する">
		<delete dir="${target.output}" failonerror="true" />
	</target>

	<target name="2.execute" description="hadoopを実行する">
		<property environment="env" />
		<java classname="${main.class}" classpathref="class.path" fork="true" maxmemory="1000m">
			<arg value="${target.input}" />
			<arg value="${target.output}" />
			<sysproperty key="hadoop.log.dir" path="${target.home}/logs" />
			<sysproperty key="hadoop.log.file" value="hadoop.log" />
			<sysproperty key="hadoop.home.dir" path="${target.home}/" />
			<sysproperty key="hadoop.id.str" value="${env.COMPUTERNAME}" />
			<sysproperty key="hadoop.root.logger" value="INFO,console" />
			<sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" />
			<env key="PATH" path="${cygwin}/bin" />
		</java>
	</target>

	<target name="3.output_encode" description="outputの文字コードをSJISに変換する">
		<copy todir="${target.output}" encoding="UTF-8" outputencoding="Windows-31J">
			<fileset dir="${target.output}">
				<include name="part*" />
				<exclude name="*.sjis.txt" />
			</fileset>
			<mapper type="glob" from="*" to="*.sjis.txt" />
		</copy>
		<delete>
			<fileset dir="${target.output}">
				<include name="part*" />
				<exclude name="*.sjis.txt" />
			</fileset>
		</delete>
	</target>

	<target name="4.cat_output" description="outputの内容を表示する">
		<exec executable="${cygwin}/bin/cat.exe" dir="${target.output}">
			<arg value="*" />
		</exec>
	</target>

	<target name="all" depends="1.rm_output,2.execute,3.output_encode,4.cat_output" description="実行して結果を表示する" />
</project>

このbuild.xmlはシフトJIS(Windows-31J)で書かれている前提で、initターゲットで入力ファイルを作成している。
Hadoopではファイルの入出力はUTF-8で行うので、入力ファイルはUTF-8に変換している。
(Antのmoveタスクでは文字コードの変換が出来ないようなので、copy&deleteにしている)

同様に、output_encodeターゲットで出力ファイルをシフトJISに変換している。
そうでないとCygwin以外から表示した際に文字化けするので。

Hadoopを実行するexecuteターゲットでは、hadoopシェルを使わずにjavaタスクで直接実行するようにしてみた。
チュートリアル用のbuild.xmlではCygwinのbashを起動してhadoopシェルを実行していた)


カスタマイズ:IntSumReducerを使用

このサンプルのReducerは、Textとintを受け取ってText毎に合算し、またTextとintを出力している。
このソースはチュートリアルのWordCountとも全く同じ(context.write()に使うインスタンスを一度しか生成しないことを除けば)だし、集計としてはよく使われるパターンだと思われる。

Hadoopでは、これを専用に行うIntSumReducerというクラスが用意されている。
(longで集計を行う場合はLongSumReducer

import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass (IntSumReducer.class);

カスタマイズ:出力をカンマ区切りに変更

テキスト出力フォーマットであるTextOutputFormatは、デフォルトではタブ区切りになっている。 (キーと値の間がタブで区切られる)

Configurationのmapred.textoutputformat.separatorで区切り文字を指定することが出来る。[2010-03-12]

		Configuration conf = new Configuration();
		conf.set("mapred.textoutputformat.separator", ",");
		Job job = new Job(conf, "rowsum");

※confへの値の設定は、Jobインスタンスを作る前に行うこと。
(Jobのコンストラクターの中でconfが複製されて別インスタンスとなる為)


もしくは、内部で使用しているライターのコンストラクターで区切り文字を指定できるので、独自クラスにすれば区切り文字を変えられる。

	public static class CommaTextOutputFormat extends TextOutputFormat<Text, IntWritable> {

		@Override
		public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
			Configuration conf = job.getConfiguration();
			String extension = ""; //拡張子
			Path file = getDefaultWorkFile(job, extension);
			FileSystem fs = file.getFileSystem(conf);
			FSDataOutputStream fileOut = fs.create(file, false);
			return new LineRecordWriter<Text, IntWritable>(fileOut, ",");
		}
	}
		job.setInputFormatClass (TextInputFormat.class);
		job.setOutputFormatClass(CommaTextOutputFormat.class);

なお、extensionに文字列(例:".txt")を指定すれば、最終的に出力されるファイル 名に拡張子を付けることが出来る。

また、出力を整形したい場合は、LineRecordWriterのwirte()メソッドをオーバーライドする。

			return new LineRecordWriter<Text, IntWritable>(fileOut) {
				protected final byte[] separator = ", ".getBytes();
				protected final byte[] newline = "\n".getBytes();

				@Override
				public synchronized void write(Text key, IntWritable value) throws IOException {
					boolean nullKey = (key == null);
					boolean nullValue = (value == null);
					if (nullKey && nullValue) {
						return;
					}
					if (!nullKey) {
						out.write(key.getBytes(), 0, key.getLength());
					}
					if (!(nullKey || nullValue)) {
						out.write(separator); //カンマ区切り
					}
					if (!nullValue) {
						String s = String.format("%3d", value.get()); //3桁数値
						out.write(s.getBytes());
					}
					out.write(newline);
				}
			};

ついでにMapperでの出力キーに入力の全行を出力してやれば、各行の末尾に合計を追加した形で出力できる。

	public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
		private IntWritable intw = new IntWritable();

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String line = value.toString();
			if (line.startsWith("#") || line.trim().isEmpty()) {
				return; //コメントや空行は集計対象外
			}

			StringTokenizer tokenizer = new StringTokenizer(line, ",");
			for (int i = 0; tokenizer.hasMoreTokens(); i++) {
				String s = tokenizer.nextToken();
				switch (i) {
				case 0: //学生番号
				case 1: //名前
					break;
				default: //各教科の点数
					intw.set(Integer.parseInt(s.trim()));
					context.write(value, intw);
					break;
				}
			}
		}
	}

出力イメージ:

  101,天才,      100, 100, 100, 100, 100, 500
  102,ひしだま,   90,  85,  80,  50,  10, 315
  103,foo,        60,  60,  60,  60,  25, 265
  104,zzz,        40,  60,  60,  70,  20, 250
  201,無気力,      5,  30,  10,  20,  15,  80
  202,hoge,       10,  30,  25,  45,  20, 130
  204,xxx,        80,  77,  90,  40,  44, 331
  205,yyy,        65,  90,  55,  80,  65, 355

行毎の合算を行うサンプル(Reducerを使わない例)

上記の行毎の集計サンプルではきちんとReducerを使って集計しているが、こういった例では列数なんてたかが知れているので、わざわざReducerを使うまでもない気がする。

public class RowSum2 {
	public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
		private IntWritable intw = new IntWritable();

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String line = value.toString();
			if (line.startsWith("#") || line.trim().isEmpty()) {
				return;
			}

			int sum = 0;
			StringTokenizer tokenizer = new StringTokenizer(line, ",");
			for (int i = 0; tokenizer.hasMoreTokens(); i++) {
				String s = tokenizer.nextToken();
				switch (i) {
				case 0: //学生番号
				case 1: //名前
					break;
				default: //各教科の点数
					sum += Integer.parseInt(s.trim());
					break;
				}
			}

			intw.set(sum);
			context.write(value, intw);
		}
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf, "rowsum");
		job.setJarByClass(RowSum2.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setMapperClass(Map.class);
		job.setNumReduceTasks(0);		// Reducerを使わない

		job.setInputFormatClass (TextInputFormat.class);
		job.setOutputFormatClass(CommaTextOutputFormat.class);

		FileInputFormat .setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		boolean success = job.waitForCompletion(true);
		System.out.println(success);
	}
}

Job#setNumReduceTasks()でReducerの個数を指定できる 。
0を指定すると、Reducerを実行しない。

出力イメージ:

  101,天才,      100, 100, 100, 100, 100, 500
  201,無気力,      5,  30,  10,  20,  15,  80
  102,ひしだま,   90,  85,  80,  50,  10, 315
  202,hoge,       10,  30,  25,  45,  20, 130
  103,foo,        60,  60,  60,  60,  25, 265
  204,xxx,        80,  77,  90,  40,  44, 331
  205,yyy,        65,  90,  55,  80,  65, 355
  104,zzz,        40,  60,  60,  70,  20, 250

↑ちゃんと合計は計算されたが、ソートはされてない…まぁReducerが呼ばれないんだから、その前処理であるソートが行われないのも当然か。

ちなみに実行速度はやはりこちら(Reducerを使わないバージョン)の方が速かった。


各列の合計を求めるサンプル

成績表(各行の合計を求めた出力結果)から、各列の合計と人数(要するに平均点を算出するための情報)を出力する例。

public class ColSum {
	public static class Map extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
		private IntWritable intk = new IntWritable();
		private IntWritable intw = new IntWritable();

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String line = value.toString();
			if (line.startsWith("#") || line.trim().isEmpty()) {
				return;
			}

			StringTokenizer tokenizer = new StringTokenizer(line, ",");
			for (int i = 0; tokenizer.hasMoreTokens(); i++) {
				String s = tokenizer.nextToken();
				switch (i) {
				case 0: // 学生番号
				case 1: // 名前
					break;
				default: // 各教科の点数
					intk.set(i);
					intw.set(Integer.parseInt(s.trim())); // 点数を合算する
					context.write(intk, intw);
					break;
				}
			}

			// 人数をカウントする
			intk.set(0); //キー
			intw.set(1);
			context.write(intk, intw);
		}
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf, "colsum");
		job.setJarByClass(ColSum.class);

		job.setOutputKeyClass  (IntWritable.class);
		job.setOutputValueClass(IntWritable.class);

		job.setMapperClass(Map.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass (IntSumReducer.class);

		job.setInputFormatClass (TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileInputFormat .setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		boolean success = job.waitForCompletion(true);
		System.out.println(success);
	}
}

出力イメージ:

0	8
2	450
3	532
4	480
5	465
6	299
7	2226

左側の数字は入力ファイルの列番号(0は人数、2は数学、…、7は合計)。
平均点も計算して出したいところだけど、ReducerにIntSumReducerを使ってしまったから合計しか出ない(苦笑)


そもそもReducerではキー毎にしかreduce()メソッドは呼ばれないから、他のキーの値を参照することは出来ない気がする。
(今回はキー0番に人数を入れているから、各キーから0番の値を参照する必要がある)
Reducerには ソートされて順番にデータが渡されるから、フィールドに保持して使っていいんだろうか。

ただ、Reducerが複数使われたら0番はどこか1箇所でしか呼ばれないから、Reducerを1個に限定する必要がある。
そこでJob#setNumReduceTasks()で1を指定すると、使われるReducerは1個のみになる。

	public static class Reduce extends Reducer<IntWritable, IntWritable, Text, Text> {

		protected static final String[] ns = { "数学", "国語", "理科", "社会", "英語", "合計" };

		protected int count;

		@Override
		protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable value : values) {
				sum += value.get();
			}

			if (key.get() == 0) {
				count = sum;
			} else {
				String n = ns[key.get() - 2];
				String s = String.format("%5d\t%7.2f", sum, (double) sum / count);
				context.write(new Text(n), new Text(s));
			}
		}
	}
		job.setMapOutputKeyClass  (IntWritable.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass  (Text.class);
		job.setOutputValueClass(Text.class);

		job.setMapperClass  (Map.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass (Reduce.class);
		job.setNumReduceTasks(1);

出力結果:

数学	  450	  56.25
国語	  532	  66.50
理科	  480	  60.00
社会	  465	  58.13
英語	  299	  37.38
合計	 2226	 278.25

カスタマイズ:独自Writableの使用

点数の集計と人数のカウントを同時に行う為に、双方のデータを保持するクラスを用意する例。[2010-03-19]

先述した人数に0番を使う例だと、順序に依存するしキーによって値の意味(種類)が異なるので、設計上は好ましくない。
種類別(人数と点数)の値を保持するクラスを用意する方が良い。

	public static class Map extends Mapper<LongWritable, Text, IntWritable, Sum> {
		private IntWritable intw = new IntWritable();
		private Sum sum = new Sum(); //独自Writable

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String line = value.toString();
			if (line.startsWith("#") || line.trim().isEmpty()) {
				return;
			}

			StringTokenizer tokenizer = new StringTokenizer(line, ",");
			for (int i = 0; tokenizer.hasMoreTokens(); i++) {
				String s = tokenizer.nextToken();
				switch (i) {
				case 0: // 学生番号
				case 1: // 名前
					break;
				default: // 各教科の点数
					intw.set(i);
					sum.set(Integer.parseInt(s.trim()), 1);	// 点数と人数をセットする
					context.write(intw, sum);
					break;
				}
			}
		}
	}
	public static class Combine extends Reducer<IntWritable, Sum, IntWritable, Sum> {
		private Sum sum = new Sum(); //独自Writable

		@Override
		protected void reduce(IntWritable key, Iterable<Sum> values, Context context) throws IOException, InterruptedException {
			int ten = 0;
			int cnt = 0;
			for (Sum value : values) {
				ten += value.getTen();
				cnt += value.getCount();
			}
			sum.set(ten, cnt);
			context.write(key, sum);
		}
	}
	public static class Reduce extends Reducer<IntWritable, Sum, Text, Text> {

		protected static final String[] ns = { "数学", "国語", "理科", "社会", "英語", "合計" };

		@Override
		protected void reduce(IntWritable key, Iterable<Sum> values, Context context) throws IOException, InterruptedException {
			int ten = 0;
			int cnt = 0;
			for (Sum value : values) {
				ten += value.getTen();
				cnt += value.getCount();
			}

			String n = ns[key.get() - 2];
			String s = String.format("%5d\t%7.2f", ten, (double) ten / cnt);
			context.write(new Text(n), new Text(s));
		}
	}
		job.setMapOutputKeyClass  (IntWritable.class);
		job.setMapOutputValueClass(Sum.class);
		job.setOutputKeyClass  (Text.class);
		job.setOutputValueClass(Text.class);

		job.setMapperClass  (Map.class);
		job.setCombinerClass(Combine.class);
		job.setReducerClass (Reduce.class);
		job.setNumReduceTasks(1);

Mapperの出力値の型に、独自WritableであるSumクラスを指定している。
するとCombinerにIntSumReducerを使えないので、専用のCombinerを用意している。


Hadoop目次へ戻る / Hadoop APIへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま