S-JIS[2011-08-06/2012-12-19] 変更履歴

Asakusa Frameworkサンプル:偏差値算出

Asakusa Framework0.2.1(batchapp)で偏差値算出処理を作ってみる。[2011-08-06]
※batchappは古い。→DirectIO版


前提

Cascadingで作った偏差値算出サンプルと同様の処理をAsakusaFWで作ってみる。

偏差値は以下のような計算式で算出できる。

  1. 平均点を算出する。全学生の点数を合算し、人数をカウント。
  2. 標準偏差σを算出する。(各学生の点数 - 平均点)2を全学生分全て合算して人数で割った値。の平方根。
  3. 偏差値を算出する。(各学生の点数 - 平均点)×10÷標準偏差+50。
入力データのイメージ
  A B C D E F G H
1 sid sname suugaku kokugo rika shakai eigo  
2 101 天才 100 100 100 100 100  
3 201 無気力 5 30 10 20 15  
4 102 ひしだま 90 85 80 50 10  
5 202 hoge 10 30 25 45 20  
6 103 foo 60 60 60 60 25  
7 204 xxx 80 77 90 40 44  
8 205 yyy 65 90 55 80 65  
9 104 zzz 40 60 60 70 20  
10                

プロジェクトの作成

AsakusaFW用プロジェクトの作成方法はWordCountの例と同じ。

AsakusaFWのversion 0.2.1
groupId afw-score
artifactId afw-score
プロジェクトのversion 1.0
package sample

モデルの作成

最初にモデルを作成する。
偏差値算出で使うデータの流れに沿ってデータモデルを準備していく。

afw-score/src/main/dmdl/score.dmdl
コード 説明 備考
"成績表"
ten_model = {
  "学生番号" sid : TEXT;
  "学生名" sname : TEXT;
  suugaku : INT;
  kokugo  : INT;
  rika    : INT;
  shakai  : INT;
  eigo    : INT;
};
入力データのモデル。  
"学生・科目毎の成績"
kamoku_model = {
  "学生番号" sid : TEXT;
  "学生名" sname : TEXT;
  "科目" kid : INT;
  "点数" ten : INT;
};
入力データを科目毎に分解したモデル。
科目名でなく科目IDを使うことにする。
0:数学、1:国語、2:理科、3:社会、4:英語、5:合計
行う演算はどの科目も同じなので、科目毎に分解せず同時に処理していくロジックも考えられるが、
今までの(AsakusaFW以外の)偏差値算出サンプルでもこのように処理してきたので、
今回もそれを踏襲する。
"科目毎の点数の合計"
summarized kamoku_total = kamoku_model => {
  any kid -> kid;
  sum ten -> ten;
  count sid -> count;
} % kid;
科目毎に平均点・人数を出す為のモデル。
集計モデルsummarizedを使用。
 
"科目毎の平均点"
average_model = kamoku_total + {
  average : DOUBLE;
};
科目毎の平均点を保持するモデル。
kamoku_totalに平均点項目を追加する形式。
集計モデルのDSLにはsum・count・max・minはあるのだが、
平均は無いので自分で計算する必要がある。
"科目毎の標準偏差"
sigma_model = {
  kid : INT;
  average : DOUBLE;
  sigma   : DOUBLE;
};
科目毎の標準偏差を保持するモデル。 このモデルを使って偏差値を計算する都合上、
平均点も一緒に持つことにする。
"学生・科目毎の偏差値"
score_model = kamoku_model + {
  "科目名" kname : TEXT;
  "平均点" average : DOUBLE;
  "偏差値" score   : DOUBLE;
};
最終的な偏差値を保持するモデル。  

オペレーターの作成

次にオペレーター(演算子)を作成する。

afw-score/src/main/java/sample/operator/ScoreOperator.java:

package sample.operator;

import java.util.List;

import sample.modelgen.dmdl.model.AverageModel;
import sample.modelgen.dmdl.model.ScoreModel;
import sample.modelgen.dmdl.model.SigmaModel;
import sample.modelgen.dmdl.model.KamokuModel;
import sample.modelgen.dmdl.model.KamokuTotal;
import sample.modelgen.dmdl.model.TenModel;

import com.asakusafw.runtime.core.Result;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.CoGroup;
import com.asakusafw.vocabulary.operator.Convert;
import com.asakusafw.vocabulary.operator.Summarize;
public abstract class ScoreOperator {
コード 説明 備考
/**
 * 科目毎の成績に分割
 */
@CoGroup
public void split(@Key(group = {}) List<TenModel> list,
	Result<KamokuModel> out) {
  for (TenModel ten : list) {
    out.add(createKamoku(ten, 0, ten.getSuugaku()));
    out.add(createKamoku(ten, 1, ten.getKokugo()));
    out.add(createKamoku(ten, 2, ten.getRika()));
    out.add(createKamoku(ten, 3, ten.getShakai()));
    out.add(createKamoku(ten, 4, ten.getEigo()));
    out.add(createKamoku(ten, 5, ten.getSuugaku() +
      ten.getKokugo() + ten.getRika() + 
      ten.getShakai() + ten.getEigo()));
  }
}
KamokuModel createKamoku(TenModel tm, int kid, int ten) {
  KamokuModel t = new KamokuModel();
  t.setSid(tm.getSid());
  t.setSname(tm.getSname());
  t.setKid(kid);
  t.setTen(ten);
  return t;
}
入力データを科目毎に分割する処理。
5教科合計もここで作成する。
 
/**
 * 科目毎の合計点を算出
 */
@Summarize
public abstract KamokuTotal sumKamoku(KamokuModel kamoku);
点数の合計と人数をカウントする処理。 どの項目を合算し、どの項目をカウントするのかは
DMDLの方で記述してある。
/**
 * 科目毎の平均点を算出
 */
@Convert
public AverageModel average(KamokuTotal kamoku) {
  AverageModel r = new AverageModel();
  r.setKid(kamoku.getKid());
  r.setTen(kamoku.getTen());
  r.setCount(kamoku.getCount());
  r.setAverage((double) kamoku.getTen() / kamoku.getCount());
  return r;
}
平均点を計算する処理。
引数として合計点数と人数を受け取っている。
平均点以外の項目は移送しているだけだが
DMDL上はkamoku_totalの項目をaverage_modelが継承しているので
データ移送のメソッドが用意されたりすると便利そうなんだが。
/**
 * 科目毎の標準偏差を算出
 */
@CoGroup
public void sigma(@Key(group = "kid") List<KamokuModel> list,
	@Key(group = "kid") List<AverageModel> aveList,
	Result<SigmaModel> out) {
  AverageModel am = aveList.get(0);
  double ave = am.getAverage();

  double sum = 0;
  for (KamokuModel tm : list) {
    sum += Math.pow(tm.getTen() - ave, 2);
  }

  SigmaModel sm = new SigmaModel();
  sm.setKid(am.getKid());
  sm.setAverage(am.getAverage());
  sm.setSigma(Math.sqrt(sum / am.getCount()));
  out.add(sm);
}
標準偏差を算出する処理。 2つのListを入力としている。
どちらも集約キーを科目IDにしており、
同一の科目だけでメソッドが呼ばれることになる。
static final String 科目名[] = {
	"数学", "国語", "理科", "社会", "英語", "合計"
};
/**
 * 学生・科目毎の偏差値を算出
 */
@CoGroup
public void score(@Key(group = "kid") List<KamokuModel> list,
	@Key(group = "kid") List<SigmaModel> sigmaList,
	Result<ScoreModel> out) {
  SigmaModel sm = sigmaList.get(0);
  double ave = sm.getAverage();
  double sigma = sm.getSigma();

  for (KamokuModel tm : list) {
    ScoreModel s = new ScoreModel();
    s.setSid(tm.getSid());
    s.setSname(tm.getSname());
    s.setKid(tm.getKid());
    s.setKnameAsString(科目名[s.getKid()]);
    s.setTen(tm.getTen());
    s.setAverage(ave);
    s.setScore((tm.getTen() - ave) * 10 / sigma + 50);
    out.add(s);
  }
}
偏差値を算出する処理。
科目IDから科目名への変換もここで行っている。
 

2つの入力データから同じキーのものだけ持ってくるCoGroupは便利だ(笑)
ただ、平均点や標準偏差はレコード数が少ないので、もっと効率良いやり方はありそうな気がする。

少なくとも、CoGroupは最適化の妨げになるので、あまり使うべきではない。[2012-12-19]
(このサンプルを作ったのはまだAsakusaFWを知って間もない頃だったので、とりあえずCoGroupを使っていた)
Direct I/O版偏差値算出サンプルではCoGroup以外の演算子で作ってみた。


ジョブフローの作成

オペレーターを組み合わせてジョブと入出力ファイルの指定を作成する。

入力ファイル 出力ファイル
afw-score/src/main/java/sample/jobflow/gateway/
TenFromFile.java
afw-score/src/main/java/sample/jobflow/gateway/
ScoreToFile.java
package sample.jobflow.gateway;
import java.util.HashSet;
import java.util.Set;

import sample.modelgen.dmdl.model.TenModel;

import com.asakusafw.vocabulary.external.FileImporterDescription;
package sample.jobflow.gateway;
import sample.modelgen.dmdl.model.ScoreModel;

import com.asakusafw.vocabulary.external.FileExporterDescription;
public class TenFromFile extends FileImporterDescription {

	@Override
	public Class<?> getModelType() {
		return TenModel.class;
	}

	@Override
	public Set<String> getPaths() {
		Set<String> set = new HashSet<String>();
		set.add("score/input/file01");
		return set;
	}
}
public class ScoreToFile extends FileExporterDescription {

	@Override
	public Class<?> getModelType() {
		return ScoreModel.class;
	}

	@Override
	public String getPathPrefix() {
		return "score/output/score-*";
	}
}

afw-score/src/main/java/sample/jobflow/ScoreJob.java:

package sample.jobflow;

import sample.jobflow.gateway.ScoreToFile;
import sample.jobflow.gateway.TenFromFile;
import sample.modelgen.dmdl.model.AverageModel;
import sample.modelgen.dmdl.model.ScoreModel;
import sample.modelgen.dmdl.model.SigmaModel;
import sample.modelgen.dmdl.model.KamokuModel;
import sample.modelgen.dmdl.model.KamokuTotal;
import sample.modelgen.dmdl.model.TenModel;
import sample.operator.ScoreOperatorFactory;
import sample.operator.ScoreOperatorFactory.Average;

import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.asakusafw.vocabulary.flow.Source;
import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
@JobFlow(name = "ScoreJob")
public class ScoreJob extends FlowDescription {
	In<TenModel> in;
	Out<ScoreModel> out;

	public ScoreJob(@Import(name = "in", description = TenFromFile.class) In<TenModel> in,
		       @Export(name = "out", description = ScoreToFile.class) Out<ScoreModel> out) {
		this.in = in;
		this.out = out;
	}
	ScoreOperatorFactory operator = new ScoreOperatorFactory();
	CoreOperatorFactory core = new CoreOperatorFactory();

	@Override
	protected void describe() {
		Source<KamokuModel> kamoku = operator.split(in).out;
		Source<KamokuTotal> kamokuTotal = operator.sumKamoku(kamoku).out;

		Average a = operator.average(kamokuTotal);
		Source<AverageModel> average = a.out;
		core.stop(a.original);

		Source<SigmaModel> sigma = operator.sigma(kamoku, average).out;
		Source<ScoreModel> score = operator.score(kamoku, sigma).out;

		out.add(score);
	}
}

ほぼ一本道で進んでいるように見えるが、最初に作ったkamokuだけ何度も使用している。

ジョブフローのグラフ化(可視化)


ジョブフローのテスト

オペレーターの単体テストとかはちょっと省略して、ジョブフローのテストを書いてみた。

afw-score/src/test/java/sample/jobflow/ScoreJobTest.java:

package sample.jobflow;

import org.junit.Test;

import sample.modelgen.dmdl.model.ScoreModel;
import sample.modelgen.dmdl.model.TenModel;

import com.asakusafw.testdriver.JobFlowTester;
public class ScoreJobTest {

	@Test
	public void testDescribe() {
		String inDataSheet = "ten_model.xls";
		String outDataSheet = "score_model.xls";

		JobFlowTester tester = new JobFlowTester(getClass());

		tester.input("in", TenModel.class).prepare(inDataSheet + "#input");
		tester.output("out", ScoreModel.class).verify(outDataSheet + "#output", outDataSheet + "#rule");

		tester.runTest(ScoreJob.class);
	}
}

※実行する際は、ホームディレクトリーの指定を忘れずに!

afw-score/src/test/resources/sample/jobflow/ten_model.xlsのinputシート

  A B C D E F G H
1 sid sname suugaku kokugo rika shakai eigo  
2 101 天才 100 100 100 100 100  
3 201 無気力 5 30 10 20 15  
4 102 ひしだま 90 85 80 50 10  
5 202 hoge 10 30 25 45 20  
6 103 foo 60 60 60 60 25  
7 204 xxx 80 77 90 40 44  
8 205 yyy 65 90 55 80 65  
9 104 zzz 40 60 60 70 20  
10                

日本語データ(B列)が文字化けしたりしないかちょっと心配したが、大丈夫だった。

一番左のA列に注目。sid(学生番号)は数値だが、DMDL上はTEXTで定義している。
この場合、セルに数値のまま入れておくと、テスト実行時に以下のような例外が発生する。(2, 1)というのは、row=2・col=1のことだと思われる。

java.lang.RuntimeException: java.io.IOException: (file:/C:/workspace34/afw-score/target/test-classes/sample/jobflow/ten_model.xls#input, 2, 1)の形式を判別できませんでした。先頭に を付けて文字列を表すようにしてください
	at com.asakusafw.testdriver.JobFlowTester.runTest(JobFlowTester.java:95)
〜
Caused by: java.io.IOException: (file:/C:/workspace34/afw-score/target/test-classes/sample/jobflow/ten_model.xls#input, 2, 1)の形式を判別できませんでした。先頭に を付けて文字列を表すようにしてください
	at com.asakusafw.testdriver.excel.ExcelDataDriver$Engine.stringProperty(ExcelDataDriver.java:240)
〜

「先頭に を付けて」というのが空白になっていて分からないが(苦笑)、文字列項目に数値が入っているのがエラー原因だとすれば、文字列扱いにすればよい。
Excelで文字列扱いさせるにはセル内のデータの先頭にアポストロフィー「'」を付ける。 案の定、付けたら通るようになった。
(文字列扱いになったので、シート上も左寄せになる。数値項目(例えば点数は数値)は右寄せで表示される)
むしろ、アポストロフィーを付けないと駄目。シート上でセルの書式を文字列にしただけでは駄目だった。
(DMDLファイル上TEXTで指定された項目をExcelデータでは文字列にしないといけないのは、仕様で決まっている)

afw-score/src/test/resources/sample/jobflow/score_model.xlsのouputシート:

  A B C D E F G H
1 sid sname kid kname ten average score  
2 101 天才 2 理科 100 60 63.87375951238592  
3 101 天才 0 数学 100 56.25 63.23821580326409  
4 101 天才 5 合計 500 278.25 67.99729272586669  
5 101 天才 3 社会 100 58.125 67.86190412715338  
6 101 天才 1 国語 100 66.5 63.56931586558015  
7 101 天才 4 英語 100 37.375 71.58220049611475  
8 102 ひしだま 2 理科 80 60 56.93687975619296  
9 102 ひしだま 1 国語 85 66.5 57.49350279143978  
10 102 ひしだま 5 合計 315 278.25 52.982640395380386  
11 102 ひしだま 3 社会 50 58.125 46.534257408164265  
12 102 ひしだま 4 英語 10 37.375 40.565864453794156  
13 102 ひしだま 0 数学 90 56.25 60.21233790537516  
14                

double型の比較は大丈夫かちょっと心配だったが、基本は大丈夫。
はまったのは、有効桁数がExcelとJavaで異なること。Excelの方が1桁少ない。
G列のscore(偏差値)の値は、正しい値はCascadingのサンプルに出ているものなので そのままExcelに貼り付けたら、末尾1桁が削れてしまって検証結果が一致しない。
ここでもExcel上のデータを文字列にしてやったら全桁入れられた。(アポストロフィーを付けず、セルの書式を文字列にするだけでOKだった)

あと、データは一番上の行の項目名で判定しているようなので、列ごと入れ替えても問題なかった。
(上記の例ではknameとtenが入れ替えてある)

afw-score/src/test/resources/sample/jobflow/score_model.xlsのruleシート

  A B C D E
1 Format EVR-1.0.0      
2 全体の比較 余計なデータを無視 [Expect]      
3 プロパティ 値の比較 NULLの比較 コメント  
4 sid 検査キー [Key] 通常比較 [-] 学生番号  
5 sname 完全一致 [=] 通常比較 [-] 学生名  
6 kid 検査キー [Key] 通常比較 [-] 科目  
7 ten 完全一致 [=] 通常比較 [-] 点数  
8 kname 完全一致 [=] 通常比較 [-] 科目名  
9 average 完全一致 [=] 通常比較 [-] 平均点  
10 score 完全一致 [=] 通常比較 [-] 偏差値  
11          

今回は学生番号と科目IDがキーなので、2項目を検査キーに指定している。
また、outputシートには全出力データは書いていない。そこで、「全体の比較」に「余計なデータを無視」を指定してある。つくづくよく準備されてるw


こんな機能があれば…

やはり実際に使ってみると疑問点や願望などが出てくるので、ちょっとメモしておく。

分類 表題 説明
モデル モデルの項目名に日本語 モデルの項目名には日本語は使えない。
Javaでは変数名に日本語を使えるから、出来てもいいかなと思ったんだけど。
まぁ英数字しか使えないのも普通と言えば普通だし。
型に別名を付けたい 今回の科目ID(kid)は何度も出てきているが、型は毎回INTと書いている。
後から科目名でもいいじゃんと思ったが一箇所ずつTEXTに変更するのは面倒だったのでやめた。
型を別名で定義できると、こういうとき一箇所の変更で済むから嬉しい。
まぁこれもScalaには出来るがJavaには出来ない事だしな…w
「+」でつなぐ際の自動命名 別のモデルを拡張(別のモデルに項目を追加)して新しいモデルを作ることが出来て、とても便利。
このとき、モデルの項目名を自動的に変更する機能があると便利そう。
例えば「"点数" t = { ten : INT; }; "平均点" a = { ten : DOUBLE; };」というモデルがあって、
tとaを組み合わせて「s = t + a;」を作ると、項目名tenがかぶる。
片方に接頭辞を与えて、自動的にそれを付けてくれたりすると便利かなーと。
「+」でつないだ際に項目を減らす 今回の最終結果を出力する為のモデル(score_model)には科目ID(kid)が入っているが、これは別に要らない。
score_modelはkamoku_modelを拡張したのでkidが入ってしまったが、不要な項目は減らせると
嬉しい時があるかもしれない。
オペレーター Summarizeの自動定義 単純集計演算子Summarizeは、どの項目をどう演算するかについては全てDMDLで記述し、
Operatorクラスに定義したメソッドにはプログラマーが改めて指定しないといけないようなものは(メソッド名以外は)無い。
したがって、このようなメソッドは自動生成できるんじゃないかと思う。
(メソッド名は決めないといけないが、これもほぼ規則的に付けられると思う)
Convertのoriginal 変換演算子Convertでは、変換後の値を出力するoutの他に、変換前の値を出力するoriginalがある。
このoriginalって、必要なんだろうか?
使わない場合はoriginalはstop演算子に渡す必要があって、フローが綺麗に書けなくなる…。
変換をしているんだから変換前データを使うことは無い気がするし、
変換前データを再度使いたいときはジョブフロー上で変換前のSourceを使えばいいだけのような?
ジョブフロー デバッグ用演算子 自分がプログラムを作るときは、プログラムの先頭の処理から書いていき、
途中まで出来たところで実行して予想通りの出力になっているか試しながら処理を追加していく。
AsakusaFWのジョブフローの場合、出力ファイルクラスや出力モデルをきっちり書かないといけないので、
こういったやり方に向いていない。
途中でstop演算子に出力してしまえばその辺りは不要になるが、データ内容は確認したいよね。
一応、デバッグ出力演算子も作ろうとしてみたんだけど
@CoGroup
public <T extends DataModel<?>> void debugDump(@Key(group = {}) List<T> list, Result<T> out) {
	for (T model : list) {
		System.out.println(model);
		out.add(model);
	}
}
//ジョブフローで使う際は、簡単にコメントアウトしたり復活させたり出来ると便利
	Source<Hoge> hoge = operator.hoge(in).out;
//	core.stop(operator.debugDump(hoge).out);
	Source<Foo> foo = operator.foo(hoge).out;
	core.stop(foo); //ここから先は未作成

さすがにモデル部分を型パラメーターにしたらAshigelコンパイラーが認識できないようでエラーになった^^;
(そこが解決できても、CoGroupではResultを必ず指定しないといけないので、フロー上で使うにはちと不便)
モデルクラスはtoString()が実装されていてデバッグ出力も問題なさそうなんだけどなー。

というか、ログ出力をする演算子はあった(汗)→ロギング演算子 [2011-12-06]

入力ファイル 複数の入力ファイル指定 偏差値算出サンプルには関係ないけど、入力にディレクトリーを指定することは出来るのだろうか。
通常のHadoopはディレクトリーを指定するとその中にあるファイルが全て処理対象になるが。
AsakusaFWの場合、ジョブフローの単体テストで入力ファイルを自動生成するので、
ディレクトリー指定のような方法には対応していないかな?
例えば複数のサーバーから来たファイルを一箇所のディレクトリーに入れておいて、そこを対象に処理する
…なんて事はありそうな気がする。
Excelデータ Excelファイルチェックタイミング ジョブフローやバッチのテストを実行するとExcelファイルを読み込むが、
そこにAsakusaFWとして許容できないエラーがあると、テストが失敗する。
入力データ用については最初に読み込むのですぐ検知されるが、
出力データの検証にExcelファイルを読み込むのは処理全体の最後なので、そこでエラーがあると修正確認が大変。
Excelファイルの内容が正しいかどうかだけを別途チェックする方法が無いかなぁ。
データの型表示 Excelファイルのシート内にモデルの項目名が表示されているが、
モデル上のデータ型によって入れられる値に制約があったりするので、型も表示されているといいかも。
ついでにコメントの日本語名も。

AsakusaFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま