CascadingのPipeのサブクラスであるSubAssemblyクラスについて。
|
|
Cascadingにおいて、複数の処理をまとめてひとつの処理のようにして扱うのがSubAssembly。
要するにサブルーチンを作り出すようなもの。
import cascading.pipe.SubAssembly;
サブアセンブリーは、コンストラクターの中で他のパイプを使った処理を記述する。
こうすることで、サブアセンブリーを使う側は他のPipe系クラスと同様にコーディングすることが出来る。
public class SampleAssembly extends SubAssembly { private static final long serialVersionUID = 1L; public SampleAssembly(Pipe previous, 〜) { Pipe pipe = new Xxxx(previouse, 〜); 〜 setTails(pipe); } }
//使う側の例 pipe = new Yyyy(pipe, 〜); pipe = new SampleAssembly(pipe, 〜); pipe = new Zzzz(pipe, 〜);
コンストラクターの最後でsetTails()を使って、最後のパイプ(次の処理へ受け渡すパイプ)を指定する。
複数のパイプを渡すことも出来る。その場合は、使う側で別々に取り出す必要があるが。
public class SampleAssembly2 extends SubAssembly { private static final long serialVersionUID = 1L; public SampleAssembly2(Pipe previous, 〜) { Pipe pipe1 = new Pipe("pipe1", previouse); Pipe pipe2 = new Pipe("pipe2", previouse); 〜 setTails(pipe1, pipe2); } }
//使う側の例 Pipe[] pipes = new SampleAssembly2(pipe, 〜).getTails(); Pipe pipe1 = pipes[0]; Pipe pipe2 = pipes[1];
ユーザーマニュアルには、「SubAssemblyを使え、factoryメソッドを使うな」と書いてある(ような気がする)。
→SubAssemblies,
not Factories
factoryメソッドを使うパターンって、こんな感じ?↓
//使う側の例 pipe = new Yyyy(pipe, 〜); pipe = SampleFactory.getInstance(pipe, 〜); pipe = new Zzzz(pipe, 〜);
確かに通常のPipe系クラスを使う方式と違うので、ちょっとだけ違和感は無くもないけど。
一番の問題は、getInstance()の中身になると思う。
つまり、中でインスタンスを1つだけ作って毎回それを返すような構造(シングルトンパターン)だと、同じ処理を何度も使いたい場合に問題が出てくるかもしれない。
(毎回SubAssemblyをnewで作っていれば、そんな事は気にしなくていい)
Cascading1.1には、SubAssemblyを利用したクラスがいくつか存在する。
(Cascading1.0には公開されているものは特に無い)
クラス名 | ver | 説明 | 例 | 備考(実装) |
---|---|---|---|---|
cascading.pipe. SubAssembly |
1.1 | サブアセンブリーのインターフェース。 | ||
cascading.pipe.assembly. Shape |
1.1 | 項目を絞る。 | pipe = new Shape(pipe, |
new Each( |
cascading.pipe.assembly. Coerce |
1.1 | 型を変換する。 | pipe = new Coerce(pipe, |
new Each( |
cascading.pipe.assembly. Rename |
1.1 | 項目名を変更する。 | pipe = new Rename(pipe, |
new Each( |
cascading.pipe.assembly. AverageBy |
1.2 | 平均値を算出する。[2011-12-25] | pipe = new AverageBy(pipe, |
→Average |
cascading.pipe.assembly. CountBy |
1.2 | 件数をカウントする。[2011-12-25] | pipe = new CountBy(pipe, |
→Count |
cascading.pipe.assembly. SumBy |
1.2 | 合計を算出する。[2011-12-25] | pipe = new SumBy(pipe, |
→Sum |
cascading.pipe.assembly. Unique |
1.2 | 一意にする(重複を排除する)。[2011-12-25] | pipe = new Unique(pipe, |
|
cascading.pipe.assembly. AggregateBy |
1.2 | 複数の集計をまとめて行う。[2011-12-25] | →AggregateBy |
Cascading1.2では、集計用のサブアセンブリーが追加された。[2011-12-25]
例えばCountByはGroupByとEvery・Countの組み合わせと同等。
pipe = new GroupBy(pipe, new Fields("group")); pipe = new Every(pipe, new Count(new Fields("count"))); ↓ pipe = new CountBy(pipe, new Fields("group"), new Fields("count"));
さらに、AggregateByを使って複数の集計をまとめて実施することが出来る。
pipe = new AggregateBy(pipe, new Fields("group"), new SumBy (new Fields("sumSrc"), new Fields("sumDst"), long.class), new countBy( new Fields("cntDst") ) );
偏差値を算出するには何手順か必要になるので、サブアセンブリーの例としてはちょうどいい。
→サブアセンブリーとせずに偏差値を算出している例
→サブアセンブリーを使用して偏差値を算出する例
入力項目groupFields | key | new Fields("key") |
||||
---|---|---|---|---|---|---|
入力項目argumentSelector | ten | new Fields("ten") |
||||
中間項目fieldDeclaration | ave | sigma | score | new Fields("ave", "sigma", "score") |
||
出力項目outputSelector | key | ten | ave | score | new Fields("key", "ten", "ave", "score") |
/** * 偏差値算出アセンブリー */ public class StandardScore extends SubAssembly { private static final long serialVersionUID = 1L; /** * コンストラクター. * * @param previous 前パイプ * @param groupFields グループキー項目 * @param argumentSelector 点数項目 * @param fieldDeclaration 新規項目名(0:平均、1:標準偏差、2:偏差値) * @param outputSelector 出力項目 */ public StandardScore(Pipe previous, Fields groupFields, Fields argumentSelector, Fields fieldDeclaration, Fields outputSelector) { String pipeName = previous.getName(); Comparable<?> ten = argumentSelector.get(0); String ave = (String) fieldDeclaration.get(0); String sigma = (String) fieldDeclaration.get(1); String score = (String) fieldDeclaration.get(2); // 平均を算出する Pipe pipeAve = new Pipe(pipeName + ".average", previous); pipeAve = new GroupBy(pipeAve, groupFields); pipeAve = new Every(pipeAve, argumentSelector, new Average(new Fields(ave))); // 点数と平均を結合し、標準偏差を求める Pipe pipeSigma = new CoGroupEx(pipeName + ".sigma", previous, groupFields, pipeAve, groupFields); pipeSigma = new Every(pipeSigma, new Fields(ten, ave), new Deviation(new Fields(sigma))); // 点数と標準偏差を結合し、偏差値を求める Pipe pipe = new CoGroupEx(pipeName, Pipe.pipes(previous, pipeAve, pipeSigma), Fields.fields(groupFields, groupFields, Fields.size(groupFields.size())) ); pipe = new Each(pipe, new Fields(ten, ave, sigma), new Score(new Fields(score)), outputSelector); setTails(pipe); }
/** * 標準偏差を求めるAggregator */ protected static class Deviation extends BaseOperation<Deviation.Context> implements Aggregator<Deviation.Context> { private static final long serialVersionUID = 1L; protected static class Context { public double sum; public int count; public void reset() { sum = 0; count = 0; } } public Deviation(Fields fieldDeclaration) { super(2, fieldDeclaration); // args0:点数 // args1:平均点 } @Override public void start(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) { Context ctx = aggregatorCall.getContext(); if (ctx == null) { aggregatorCall.setContext(new Context()); } else { ctx.reset(); } } @Override public void aggregate(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) { Tuple value = aggregatorCall.getArguments().getTuple(); double ten = value.getDouble(0); double average = value.getDouble(1); Context ctx = aggregatorCall.getContext(); ctx.sum += Math.pow(ten - average, 2); ctx.count++; } @Override public void complete(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) { Context ctx = aggregatorCall.getContext(); aggregatorCall.getOutputCollector().add(new Tuple(Math.sqrt(ctx.sum / ctx.count))); } }
/** * 偏差値を求めるFunction */ protected static class Score extends BaseOperation<Object> implements Function<Object> { private static final long serialVersionUID = 1L; public Score(Fields fieldDeclaration) { super(3, fieldDeclaration); // args0:点数 // args1:平均点 // args2:標準偏差 } @Override public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) { Tuple value = functionCall.getArguments().getTuple(); double ten = value.getInteger(0); double average = value.getDouble(1); double sigma = value.getDouble(2); double score = (ten - average) * 10 / sigma + 50; functionCall.getOutputCollector().add(new Tuple(score)); } } }
→StandardScore(偏差値を求めるSubAssembly)の全ソース