S-JIS[2010-04-24] 変更履歴

Cascadingサンプル:偏差値算出

Cascadingのサンプルとして、偏差値を算出する処理を作ってみる。


概要

Hadoopのサンプルとして、成績表(試験の点数のCSVファイル)から、各学生ごとの合計点数や平均点を算出するプログラムを作ってみた。
それと同様に、Cascadingで学生の偏差値を算出するプログラムを作ってみる。

本当はHaoopの方でも偏差値算出までやってみたかったのだが、平均点を出した後に各学生の点数を結合させる部分が面倒だったので(苦笑)、そこまでは作らなかった。(ひとつのプログラムでは出来ないし)
Cascadingは結合の仕組みがあるので、こういう処理にはまさに向いている。

なお、偏差値の出し方は、以下のような計算式になる。

  1. 平均点を算出する。全学生の点数を合算・人数をカウント。(Every
  2. 標準偏差σを算出する。(各学生の点数 - 平均点)2を全学生分全て合算して人数で割った値。の平方根。(Every
  3. 偏差値を算出する。(各学生の点数 - 平均点)×10÷標準偏差+50。(Each

成績表のサンプル

#番号,名前,     数学,国語,理科,社会,英語
  101,天才,      100, 100, 100, 100, 100
  201,無気力,      5,  30,  10,  20,  15
  102,ひしだま,   90,  85,  80,  50,  10
  202,hoge,       10,  30,  25,  45,  20
  103,foo,        60,  60,  60,  60,  25
  204,xxx,        80,  77,  90,  40,  44
  205,yyy,        65,  90,  55,  80,  65
  104,zzz,        40,  60,  60,  70,  20

出力イメージ(学生番号、名前、科目名、点数、平均点、偏差値)

101, 天才, 理科, 100, 60.0, 63.87375951238592
101, 天才, 数学, 100, 56.25, 63.23821580326409
101, 天才, 合計, 500, 278.25, 67.99729272586669
101, 天才, 社会, 100, 58.125, 67.86190412715338
101, 天才, 国語, 100, 66.5, 63.56931586558015
101, 天才, 英語, 100, 37.375, 71.58220049611475
102, ひしだま, 理科, 80, 60.0, 56.93687975619296
102, ひしだま, 国語, 85, 66.5, 57.49350279143978
102, ひしだま, 合計, 315, 278.25, 52.982640395380386
102, ひしだま, 社会, 50, 58.125, 46.534257408164265
102, ひしだま, 英語, 10, 37.375, 40.565864453794156
102, ひしだま, 数学, 90, 56.25, 60.21233790537516
〜

なお、科目名ごとに偏差値を算出している理由は、Everyを使う際にはキーとなる項目が必要になる為。
都合がいいので科目名を集計キーとした。


ソース

import java.util.StringTokenizer;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import cascading.flow.*;
import cascading.operation.*;
import cascading.operation.aggregator.Average;
import cascading.operation.text.FieldJoiner;
import cascading.pipe.*;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntryCollector;

import jp.hishidama.hadoop.cascading.conf.CascadingConfigured;
public class Student0 extends CascadingConfigured implements Tool {

	public static final String F_LINE = "line";

	public static final String F_SID   = "student-id";		//学生番号
	public static final String F_NAME  = "student-name";		//学生名
	public static final String F_KID   = "kamoku-id";		//科目
	public static final String F_TEN   = "ten";			//点数
	public static final String F_AVE   = "average";		//平均点
	public static final String F_SIGMA = "standard-deviation";	//標準偏差
	public static final String F_SCORE = "standard-score";		//偏差値

CascadingConfiguredは、HadoopのConfiguredにCascading用の便利メソッドを加えた自作クラス。
ToolRunnerを使用するWordCountサンプル

F_XXXは、フィールド名として使用する定数群。


	/**
	 * 学生名を抽出するFunction
	 */
	protected static class NameFunction extends BaseOperation<Object> implements Function<Object> {
		private static final long serialVersionUID = 1L;

		public NameFunction(Fields fieldDeclaration) {
			super(1, fieldDeclaration);
		}

		@Override
		public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {

			String line = functionCall.getArguments().getTuple().getString(0);
			if (line.isEmpty() || line.startsWith("#")) {
				return;
			}
			TupleEntryCollector collector = functionCall.getOutputCollector();

			String sid = null;
			String name = null;

			StringTokenizer tokenizer = new StringTokenizer(line, ",");
			loop: for (int i = 0; tokenizer.hasMoreTokens(); i++) {
				String token = tokenizer.nextToken().trim();
				switch (i) {
				case 0: // 学生番号
					sid = token;
					break;
				case 1: // 学生名
					name = token;
					collector.add(new Tuple(sid, name));
					break loop;
				}
			}
		}
	}

NameFunctionは、成績表のCSVファイルから学生番号と学生名だけ抽出する関数。

入力項目 F_LINE
出力項目 F_SID F_NAME

偏差値を出した後に学生名を付加する為に使用する。


	static final String[] KAMOKU_NAME = { "数学", "国語", "理科", "社会", "英語", "合計" };
	/**
	 * 成績(点数)を抽出するFunction
	 */
	protected static class TenFunction extends BaseOperation<Object> implements Function<Object> {
		private static final long serialVersionUID = 1L;

		public TenFunction(Fields fieldDeclaration) {
			super(1, fieldDeclaration);
		}

		@Override
		public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {

			Tuple value = functionCall.getArguments().getTuple();
			String line = value.getString(0);
			if (line.isEmpty() || line.startsWith("#")) {
				return;
			}

			TupleEntryCollector collector = functionCall.getOutputCollector();

			String sid = null;
			int sum = 0;

			StringTokenizer tokenizer = new StringTokenizer(line, ",");
			for (int i = 0; tokenizer.hasMoreTokens(); i++) {
				String token = tokenizer.nextToken().trim();
				switch (i) {
				case 0: // 学生番号
					sid = token;
					break;
				case 1: // 学生名
					break;
				default: // 各科目の点数
					int ten = Integer.parseInt(token);
					Tuple t = Tuple.size(fieldDeclaration.size());
					t.set(0, sid);
					t.set(1, KAMOKU_NAME[i - 2]);
					t.set(2, ten);
					collector.add(t);

					sum += ten;
					break;
				}
			}

			//合計点数
			Tuple t = Tuple.size(fieldDeclaration.size());
			t.set(0, sid);
			t.set(1, KAMOKU_NAME[5]);
			t.set(2, sum);
			collector.add(t);
		}
	}

TenFunctionは、成績表のCSVファイルから学生・科目毎の点数を抽出する関数。

入力項目 F_LINE
出力項目 F_SID F_KID F_TEN

NameFunctionと入力ファイルが同じなので一括して処理したい気もしたが、ひとつの関数から二種類の出力を行うことは出来ないので、別々の関数とした。


	/**
	 * 標準偏差を求めるAggregator
	 */
	protected static class StandardDeviation extends BaseOperation<StandardDeviation.Context> implements Aggregator<StandardDeviation.Context> {
		private static final long serialVersionUID = 1L;

		protected static class Context {
			public double sum;
			public int count;

			public void reset() {
				sum = 0;
				count = 0;
			}
		}

		public StandardDeviation(Fields fieldDeclaration) {
			super(2, fieldDeclaration);
			// args0:点数
			// args1:平均点
		}

		@Override
		public void start(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
			Context ctx = aggregatorCall.getContext();
			if (ctx == null) {
				aggregatorCall.setContext(new Context());
			} else {
				ctx.reset();
			}
		}

		@Override
		public void aggregate(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
			Tuple value = aggregatorCall.getArguments().getTuple();
			double ten     = value.getDouble(0);
			double average = value.getDouble(1);

			Context ctx = aggregatorCall.getContext();
			ctx.sum += Math.pow(ten - average, 2);
			ctx.count++;
		}

		@Override
		public void complete(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
			Context ctx = aggregatorCall.getContext();
			aggregatorCall.getOutputCollector().add(new Tuple(Math.sqrt(ctx.sum / ctx.count)));
		}
	}

StandardDeviationは、標準偏差σを算出する集約関数。
σ = √( (点数 - 平均点)2 ÷人数 )

入力項目 F_TEN F_AVE
出力項目 F_SIGMA

	/**
	 * 偏差値を求めるFunction
	 */
	protected static class StandardScore extends BaseOperation<Object> implements Function<Object> {
		private static final long serialVersionUID = 1L;

		public StandardScore(Fields fields) {
			super(3, fields);
			// args0:点数
			// args1:平均点
			// args2:標準偏差
		}

		@Override
		public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {
			Tuple value = functionCall.getArguments().getTuple();
			double ten     = value.getInteger(0);
			double average = value.getDouble(1);
			double sigma   = value.getDouble(2);

			double score = (ten - average) * 10 / sigma + 50;
			functionCall.getOutputCollector().add(new Tuple(score));
		}
	}

StandardScoreは、偏差値を算出する関数。

入力項目 F_TEN F_AVE F_SIGMA
出力項目 F_SCORE

	@Override
	public int run(String[] args) throws Exception {
		Tap source = new Hfs(new TextLine(new Fields(F_LINE)), makeQualifiedPath(args[0]));
		Tap sink   = new Hfs(new TextLine(),                   makeQualifiedPath(args[1]), SinkMode.REPLACE);

		Pipe pipe = new Pipe("pipe");

		// 成績
		Pipe pipe1 = new Pipe("pipe1", pipe);
		pipe1 = new Each(pipe1, new TenFunction(new Fields(F_SID, F_KID, F_TEN)));

		// 科目毎の平均点を算出
		Pipe pipe11 = new GroupBy("pipeAverage", pipe1, new Fields(F_KID));
		pipe11 = new Every(pipe11, new Fields(F_TEN), new Average(new Fields(F_AVE)));

		// 科目毎に標準偏差を求める
		Pipe pipe12 = new CoGroup("pipeSDV",
			pipe1,  new Fields(F_KID),
			pipe11, new Fields(F_KID),
			new Fields(F_SID, F_KID, F_TEN, F_KID + ".2", F_AVE));

		// Cascadingでは、CoGroupの後でEveryを使う場合、出力項目名が0になってしまう。
		// その結果、後続でkamoku-idが見つからなくてエラーになる。
		// GroupByでソートして項目名を付ければ大丈夫。
		pipe12 = new GroupBy(pipe12, new Fields(F_KID));

		pipe12 = new Every(pipe12, new Fields(F_TEN, F_AVE), new StandardDeviation(new Fields(F_SIGMA)),
			new Fields(F_KID, F_SIGMA));

		// 学生毎に偏差値を求める
		Pipe pipe13 = new CoGroup("pipeScore",
			Pipe.pipes(pipe1, pipe11, pipe12),
			Fields.fields(new Fields(F_KID), new Fields(F_KID), new Fields(F_KID)),
			new Fields(F_SID, F_KID, F_TEN, F_KID + ".2", F_AVE, F_KID + ".3", F_SIGMA));

		pipe13 = new Each(pipe13, new Fields(F_TEN, F_AVE, F_SIGMA),
			new StandardScore(new Fields(F_SCORE)), Fields.ALL);

		// 学生名
		Pipe pipe2 = new Pipe("pipe2", pipe);
		pipe2 = new Each(pipe2, new NameFunction(new Fields(F_SID, F_NAME)));

		// 学生名と成績を結合
		Pipe pipe3 = new CoGroup("pipeResult",
			pipe13, new Fields(F_SID),
			pipe2,  new Fields(F_SID),
			new Fields(F_SID, F_KID, F_TEN, F_KID + ".2", F_AVE, F_KID + ".3", F_SIGMA, F_SCORE, F_SID + ".2", F_NAME));

		// 必要項目を抽出・並べ替え・カンマ区切り
		pipe3 = new Each(pipe3, new Fields(F_SID, F_NAME, F_KID, F_TEN, F_AVE, F_SCORE), new FieldJoiner(", "));
		pipe3 = new GroupBy(pipe3, Fields.ALL);

		FlowConnector flowConnector = new FlowConnector(getProperties());
		Flow flow = flowConnector.connect("student", source, sink, pipe3);
		flow.complete(); //実行

		return 0;
	}

	public static void main(String[] args) throws Exception {
		int r = ToolRunner.run(new Student0(), args);
		System.exit(r);
	}
}

標準偏差を求める処理で、CoGroupとEveryの間にGroupByを入れている。
グルーピングのキーが変わっているわけではないので、本来であればGroupByは不要。
では何の為に入っているかと言うと、グルーピングキーに項目名を付ける為。
CoGroupはグループキーの項目名が無くなってしまう(「0」になってしまう)ので、後続で項目名F_KIDを使って指定することが出来なくなってしまう。
その為、無駄な処理ではあるが、GroupByを入れて項目名を付与している。

したがって、GroupByを入れずに、後続のF_KIDに相当する項目全てに「0」を指定するのもひとつの手ではある…。
		// 科目毎に標準偏差を求める
		Pipe pipe12 = new CoGroup("pipeSDV",
			pipe1,  new Fields(F_KID),
			pipe11, new Fields(F_KID),
			new Fields(F_SID, F_KID, F_TEN, F_KID + ".2", F_AVE));
//		pipe12 = new GroupBy(pipe12, new Fields(F_KID));
		pipe12 = new Every(pipe12, new Fields(F_TEN, F_AVE), new StandardDeviation(new Fields(F_SIGMA)),
			new Fields(0, F_SIGMA));

		// 学生毎に偏差値を求める
		Pipe pipe13 = new CoGroup("pipeScore",
			Pipe.pipes(pipe1, pipe11, pipe12),
			Fields.fields(new Fields(F_KID), new Fields(F_KID), new Fields(0)),
			new Fields(F_SID, F_KID, F_TEN, F_KID + ".2", F_AVE, F_KID + ".3", F_SIGMA));
Cascading1.1では、CoGroupでグループキー項目名を指定できるようになった。
したがって、それ(resultGroupFields)を使えば余計なGroupByを入れずに項目名を指定できる。
		// 科目毎に標準偏差を求める
		Pipe pipe12 = new CoGroup("pipeSDV",
			pipe1,  new Fields(F_KID),
			pipe11, new Fields(F_KID),
			new Fields(F_SID, F_KID, F_TEN, F_KID + ".2", F_AVE),
			new Fields(F_KID));
//		pipe12 = new GroupBy(pipe12, new Fields(F_KID));
		pipe12 = new Every(pipe12, new Fields(F_TEN, F_AVE), new StandardDeviation(new Fields(F_SIGMA)),
			new Fields(F_KID, F_SIGMA));

		// 学生毎に偏差値を求める
		Pipe pipe13 = new CoGroup("pipeScore",
			Pipe.pipes(pipe1, pipe11, pipe12),
			Fields.fields(new Fields(F_KID), new Fields(F_KID), new Fields(F_KID)),
			new Fields(F_SID, F_KID, F_TEN, F_KID + ".2", F_AVE, F_KID + ".3", F_SIGMA));
ところで、Cascading1.1では、CoGroupに指定した全パイプのグループキー項目名が全て一致していれば、
その項目名が後続パイプで使えるようになる。
したがって実はこの例では、特に出力用のグループキー項目名(resultGroupFields)を指定する必要は無い。

カスタマイズ:CoGroupExの使用

上記のサンプルでCoGroupを使っている箇所では、「F_KID + ".2"」の様に、出力項目名にダミーの名前を付けている。
これだと項目の増減やCoGroupの引数のパイプの順序変更をした際にいちいち指定し直さなければならないので、とても大変。
そこで、自作のCoGroupExに置き換えて出力項目名を省略してしまえば楽になる。

import jp.hishidama.hadoop.cascading.pipe.CoGroupEx;
		// 科目毎に標準偏差を求める
		Pipe pipe12 = new CoGroupEx("pipeSDV",
			pipe1,  new Fields(F_KID),
			pipe11, new Fields(F_KID));
		// 学生毎に偏差値を求める
		Pipe pipe13 = new CoGroupEx("pipeScore",
			Pipe.pipes(pipe1, pipe11, pipe12),
			Fields.fields(new Fields(F_KID), new Fields(F_KID), new Fields(F_KID)));
		// 学生名と成績を結合
		Pipe pipe3 = new CoGroupEx("pipeResult",
			pipe13, new Fields(F_SID),
			pipe2,  new Fields(F_SID));

※Cascading1.1ではCoGroupに指定したグループキー項目名が全て一致していれば後続パイプでその名前が使える為、resultGroupFieldsも特に指定する必要は無い。


カスタマイズ:SubAssemblyの使用

偏差値の算出というのは それだけでひとつの処理なので、サブアセンブリー化してみる。
偏差値算出サブアセンブリー(StandardScore)

import jp.hishidama.hadoop.cascading.conf.CascadingConfigured;
import jp.hishidama.hadoop.cascading.pipe.assembly.StandardScore;
public class Student2 extends CascadingConfigured implements Tool {
〜
	@Override
	public int run(String[] args) throws Exception {
		Tap source = new Hfs(new TextLine(new Fields(F_LINE)), makeQualifiedPath(args[0]));
		Tap sink   = new Hfs(new TextLine(),                   makeQualifiedPath(args[1]), SinkMode.REPLACE);

		Pipe pipe = new Pipe("pipe");

		// 成績
		Pipe pipe1 = new Pipe("pipe1", pipe);
		pipe1 = new Each(pipe1, new TenFunction(new Fields(F_SID, F_KID, F_TEN)));

		// 偏差値を算出
		pipe1 = new StandardScore(pipe1,
			new Fields(F_KID),					//グルーピング項目
			new Fields(F_TEN),					//点数項目
			new Fields(F_AVE, F_SIGMA, F_SCORE),			//平均・標準偏差・偏差値に使う項目名
			new Fields(F_SID, F_KID, F_TEN, F_AVE, F_SCORE)	//出力項目
		);

		// 学生名
		Pipe pipe2 = new Pipe("pipe2", pipe);
		pipe2 = new Each(pipe2, new NameFunction(new Fields(F_SID, F_NAME)));

		// 学生名と成績を結合
		Pipe pipe3 = new CoGroupEx("pipeResult", pipe1, new Fields(F_SID), pipe2, new Fields(F_SID));

		// 必要項目を抽出・並べ替え・カンマ区切り
		pipe3 = new Each(pipe3, new Fields(F_SID, F_NAME, F_KID, F_TEN, F_AVE, F_SCORE), new FieldJoiner(", "));
		pipe3 = new GroupBy(pipe3, Fields.ALL);

		FlowConnector flowConnector = new FlowConnector(getProperties());
		Flow flow = flowConnector.connect("student", source, sink, pipe3);
		flow.complete(); //実行

		return 0;
	}

	public static void main(String[] args) throws Exception {
		int r = ToolRunner.run(new Student2(), args);
		System.exit(r);
	}
}

Cascadingへ戻る / Hadoopへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま