CascadingのPipeのサブクラスであるSubAssemblyクラスについて。
|
|
Cascadingにおいて、複数の処理をまとめてひとつの処理のようにして扱うのがSubAssembly。
要するにサブルーチンを作り出すようなもの。
import cascading.pipe.SubAssembly;
サブアセンブリーは、コンストラクターの中で他のパイプを使った処理を記述する。
こうすることで、サブアセンブリーを使う側は他のPipe系クラスと同様にコーディングすることが出来る。
public class SampleAssembly extends SubAssembly {
private static final long serialVersionUID = 1L;
public SampleAssembly(Pipe previous, 〜) {
Pipe pipe = new Xxxx(previouse, 〜);
〜
setTails(pipe);
}
}
//使う側の例 pipe = new Yyyy(pipe, 〜); pipe = new SampleAssembly(pipe, 〜); pipe = new Zzzz(pipe, 〜);
コンストラクターの最後でsetTails()を使って、最後のパイプ(次の処理へ受け渡すパイプ)を指定する。
複数のパイプを渡すことも出来る。その場合は、使う側で別々に取り出す必要があるが。
public class SampleAssembly2 extends SubAssembly {
private static final long serialVersionUID = 1L;
public SampleAssembly2(Pipe previous, 〜) {
Pipe pipe1 = new Pipe("pipe1", previouse);
Pipe pipe2 = new Pipe("pipe2", previouse);
〜
setTails(pipe1, pipe2);
}
}
//使う側の例 Pipe[] pipes = new SampleAssembly2(pipe, 〜).getTails(); Pipe pipe1 = pipes[0]; Pipe pipe2 = pipes[1];
ユーザーマニュアルには、「SubAssemblyを使え、factoryメソッドを使うな」と書いてある(ような気がする)。
→SubAssemblies,
not Factories
factoryメソッドを使うパターンって、こんな感じ?↓
//使う側の例 pipe = new Yyyy(pipe, 〜); pipe = SampleFactory.getInstance(pipe, 〜); pipe = new Zzzz(pipe, 〜);
確かに通常のPipe系クラスを使う方式と違うので、ちょっとだけ違和感は無くもないけど。
一番の問題は、getInstance()の中身になると思う。
つまり、中でインスタンスを1つだけ作って毎回それを返すような構造(シングルトンパターン)だと、同じ処理を何度も使いたい場合に問題が出てくるかもしれない。
(毎回SubAssemblyをnewで作っていれば、そんな事は気にしなくていい)
Cascading1.1には、SubAssemblyを利用したクラスがいくつか存在する。
(Cascading1.0には公開されているものは特に無い)
| クラス名 | ver | 説明 | 例 | 備考(実装) |
|---|---|---|---|---|
| cascading.pipe. SubAssembly |
1.1 | サブアセンブリーのインターフェース。 | ||
| cascading.pipe.assembly. Shape |
1.1 | 項目を絞る。 | pipe = new Shape(pipe, |
new Each( |
| cascading.pipe.assembly. Coerce |
1.1 | 型を変換する。 | pipe = new Coerce(pipe, |
new Each( |
| cascading.pipe.assembly. Rename |
1.1 | 項目名を変更する。 | pipe = new Rename(pipe, |
new Each( |
| cascading.pipe.assembly. AverageBy |
1.2 | 平均値を算出する。[2011-12-25] | pipe = new AverageBy(pipe, |
→Average |
| cascading.pipe.assembly. CountBy |
1.2 | 件数をカウントする。[2011-12-25] | pipe = new CountBy(pipe, |
→Count |
| cascading.pipe.assembly. SumBy |
1.2 | 合計を算出する。[2011-12-25] | pipe = new SumBy(pipe, |
→Sum |
| cascading.pipe.assembly. Unique |
1.2 | 一意にする(重複を排除する)。[2011-12-25] | pipe = new Unique(pipe, |
|
| cascading.pipe.assembly. AggregateBy |
1.2 | 複数の集計をまとめて行う。[2011-12-25] | →AggregateBy |
Cascading1.2では、集計用のサブアセンブリーが追加された。[2011-12-25]
例えばCountByはGroupByとEvery・Countの組み合わせと同等。
pipe = new GroupBy(pipe, new Fields("group"));
pipe = new Every(pipe, new Count(new Fields("count")));
↓
pipe = new CountBy(pipe, new Fields("group"), new Fields("count"));
さらに、AggregateByを使って複数の集計をまとめて実施することが出来る。
pipe = new AggregateBy(pipe, new Fields("group"),
new SumBy (new Fields("sumSrc"), new Fields("sumDst"), long.class),
new countBy( new Fields("cntDst") )
);
偏差値を算出するには何手順か必要になるので、サブアセンブリーの例としてはちょうどいい。
→サブアセンブリーとせずに偏差値を算出している例
→サブアセンブリーを使用して偏差値を算出する例
| 入力項目groupFields | key | new Fields("key") |
||||
|---|---|---|---|---|---|---|
| 入力項目argumentSelector | ten | new Fields("ten") |
||||
| 中間項目fieldDeclaration | ave | sigma | score | new Fields("ave", "sigma", "score") |
||
| 出力項目outputSelector | key | ten | ave | score | new Fields("key", "ten", "ave", "score") |
/**
* 偏差値算出アセンブリー
*/
public class StandardScore extends SubAssembly {
private static final long serialVersionUID = 1L;
/**
* コンストラクター.
*
* @param previous 前パイプ
* @param groupFields グループキー項目
* @param argumentSelector 点数項目
* @param fieldDeclaration 新規項目名(0:平均、1:標準偏差、2:偏差値)
* @param outputSelector 出力項目
*/
public StandardScore(Pipe previous, Fields groupFields, Fields argumentSelector, Fields fieldDeclaration, Fields outputSelector) {
String pipeName = previous.getName();
Comparable<?> ten = argumentSelector.get(0);
String ave = (String) fieldDeclaration.get(0);
String sigma = (String) fieldDeclaration.get(1);
String score = (String) fieldDeclaration.get(2);
// 平均を算出する
Pipe pipeAve = new Pipe(pipeName + ".average", previous);
pipeAve = new GroupBy(pipeAve, groupFields);
pipeAve = new Every(pipeAve, argumentSelector, new Average(new Fields(ave)));
// 点数と平均を結合し、標準偏差を求める
Pipe pipeSigma = new CoGroupEx(pipeName + ".sigma", previous, groupFields, pipeAve, groupFields);
pipeSigma = new Every(pipeSigma, new Fields(ten, ave), new Deviation(new Fields(sigma)));
// 点数と標準偏差を結合し、偏差値を求める
Pipe pipe = new CoGroupEx(pipeName,
Pipe.pipes(previous, pipeAve, pipeSigma),
Fields.fields(groupFields, groupFields, Fields.size(groupFields.size()))
);
pipe = new Each(pipe, new Fields(ten, ave, sigma), new Score(new Fields(score)), outputSelector);
setTails(pipe);
}
/**
* 標準偏差を求めるAggregator
*/
protected static class Deviation extends BaseOperation<Deviation.Context> implements Aggregator<Deviation.Context> {
private static final long serialVersionUID = 1L;
protected static class Context {
public double sum;
public int count;
public void reset() {
sum = 0;
count = 0;
}
}
public Deviation(Fields fieldDeclaration) {
super(2, fieldDeclaration);
// args0:点数
// args1:平均点
}
@Override
public void start(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
Context ctx = aggregatorCall.getContext();
if (ctx == null) {
aggregatorCall.setContext(new Context());
} else {
ctx.reset();
}
}
@Override
public void aggregate(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
Tuple value = aggregatorCall.getArguments().getTuple();
double ten = value.getDouble(0);
double average = value.getDouble(1);
Context ctx = aggregatorCall.getContext();
ctx.sum += Math.pow(ten - average, 2);
ctx.count++;
}
@Override
public void complete(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
Context ctx = aggregatorCall.getContext();
aggregatorCall.getOutputCollector().add(new Tuple(Math.sqrt(ctx.sum / ctx.count)));
}
}
/**
* 偏差値を求めるFunction
*/
protected static class Score extends BaseOperation<Object> implements Function<Object> {
private static final long serialVersionUID = 1L;
public Score(Fields fieldDeclaration) {
super(3, fieldDeclaration);
// args0:点数
// args1:平均点
// args2:標準偏差
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {
Tuple value = functionCall.getArguments().getTuple();
double ten = value.getInteger(0);
double average = value.getDouble(1);
double sigma = value.getDouble(2);
double score = (ten - average) * 10 / sigma + 50;
functionCall.getOutputCollector().add(new Tuple(score));
}
}
}
→StandardScore(偏差値を求めるSubAssembly)の全ソース