S-JIS[2010-04-24/2011-12-25] 変更履歴

Cascading SubAssembly

CascadingPipeのサブクラスであるSubAssemblyクラスについて。


SubAssemblyの概要

Cascadingにおいて、複数の処理をまとめてひとつの処理のようにして扱うのがSubAssembly
要するにサブルーチンを作り出すようなもの。

import cascading.pipe.SubAssembly;

サブアセンブリーは、コンストラクターの中で他のパイプを使った処理を記述する。
こうすることで、サブアセンブリーを使う側は他のPipe系クラスと同様にコーディングすることが出来る。

public class SampleAssembly extends SubAssembly {
	private static final long serialVersionUID = 1L;

	public SampleAssembly(Pipe previous, 〜) {
		Pipe pipe = new Xxxx(previouse, 〜);
〜
		setTails(pipe);
	}
}
//使う側の例
		pipe = new Yyyy(pipe, 〜);
		pipe = new SampleAssembly(pipe, 〜);
		pipe = new Zzzz(pipe, 〜);

コンストラクターの最後でsetTails()を使って、最後のパイプ(次の処理へ受け渡すパイプ)を指定する。
複数のパイプを渡すことも出来る。その場合は、使う側で別々に取り出す必要があるが。

public class SampleAssembly2 extends SubAssembly {
	private static final long serialVersionUID = 1L;

	public SampleAssembly2(Pipe previous, 〜) {
		Pipe pipe1 = new Pipe("pipe1", previouse);
		Pipe pipe2 = new Pipe("pipe2", previouse);
〜
		setTails(pipe1, pipe2);
	}
}
//使う側の例
		Pipe[] pipes = new SampleAssembly2(pipe, 〜).getTails();
		Pipe pipe1 = pipes[0];
		Pipe pipe2 = pipes[1];

ユーザーマニュアルには、「SubAssemblyを使え、factoryメソッドを使うな」と書いてある(ような気がする)。
SubAssemblies, not Factories

factoryメソッドを使うパターンって、こんな感じ?↓

//使う側の例
		pipe = new Yyyy(pipe, 〜);
		pipe = SampleFactory.getInstance(pipe, 〜);
		pipe = new Zzzz(pipe, 〜);

確かに通常のPipe系クラスを使う方式と違うので、ちょっとだけ違和感は無くもないけど。

一番の問題は、getInstance()の中身になると思う。
つまり、中でインスタンスを1つだけ作って毎回それを返すような構造(シングルトンパターン)だと、同じ処理を何度も使いたい場合に問題が出てくるかもしれない。
(毎回SubAssemblyをnewで作っていれば、そんな事は気にしなくていい)


SubAssemblyの種類

Cascading1.1には、SubAssemblyを利用したクラスがいくつか存在する。
(Cascading1.0には公開されているものは特に無い)

クラス名 ver 説明 備考(実装)
cascading.pipe.
SubAssembly
1.1 サブアセンブリーのインターフェース。    
cascading.pipe.assembly.
Shape
1.1 項目を絞る。 pipe = new Shape(pipe,
 new Fields("col2")
);
new Each(
 previous,
 fields,
 new Identity()
)
cascading.pipe.assembly.
Coerce
1.1 型を変換する。 pipe = new Coerce(pipe,
 int.class
);
new Each(
 previous,
 new Identity(types)
)
cascading.pipe.assembly.
Rename
1.1 項目名を変更する。 pipe = new Rename(pipe,
 new Fields("col2"),
 new Fields("new")
);
new Each(
 previous,
 fromFields,
 new Identity( toFields ),
 Fields.SWAP
)
cascading.pipe.assembly.
AverageBy
1.2 平均値を算出する。[2011-12-25] pipe = new AverageBy(pipe,
 new Fields("group"),
 new Fields("value"),
 new Fields("name")
);
Average
cascading.pipe.assembly.
CountBy
1.2 件数をカウントする。[2011-12-25] pipe = new CountBy(pipe,
 new Fields("group"),
 new Fields("name")
);
Count
cascading.pipe.assembly.
SumBy
1.2 合計を算出する。[2011-12-25] pipe = new SumBy(pipe,
 new Fields("group"),
 new Fields("value"),
 new Fields("name"),
 long.class
);
Sum
cascading.pipe.assembly.
Unique
1.2 一意にする(重複を排除する)。[2011-12-25] pipe = new Unique(pipe,
 new Fields("group")
);
 
cascading.pipe.assembly.
AggregateBy
1.2 複数の集計をまとめて行う。[2011-12-25] AggregateBy  

AggregateBy

Cascading1.2では、集計用のサブアセンブリーが追加された。[2011-12-25]

例えばCountByGroupByEvery・Countの組み合わせと同等。

	pipe = new GroupBy(pipe, new Fields("group"));
	pipe = new Every(pipe, new Count(new Fields("count")));
↓
	pipe = new CountBy(pipe, new Fields("group"), new Fields("count"));

さらに、AggregateByを使って複数の集計をまとめて実施することが出来る。

	pipe = new AggregateBy(pipe, new Fields("group"),
		new SumBy  (new Fields("sumSrc"), new Fields("sumDst"), long.class),
		new countBy(                      new Fields("cntDst")            )
	);

偏差値を算出するSubAssemblyの例

偏差値を算出するには何手順か必要になるので、サブアセンブリーの例としてはちょうどいい。
サブアセンブリーとせずに偏差値を算出している例
サブアセンブリーを使用して偏差値を算出する例

コンストラクターの引数の例
入力項目groupFields key         new Fields("key")
入力項目argumentSelector   ten       new Fields("ten")
中間項目fieldDeclaration     ave sigma score new Fields("ave", "sigma", "score")
出力項目outputSelector key ten ave   score new Fields("key", "ten", "ave", "score")
/**
 * 偏差値算出アセンブリー
 */
public class StandardScore extends SubAssembly {
	private static final long serialVersionUID = 1L;

	/**
	 * コンストラクター.
	 *
	 * @param previous   	前パイプ
	 * @param groupFields	グループキー項目
	 * @param argumentSelector	点数項目
	 * @param fieldDeclaration	新規項目名(0:平均、1:標準偏差、2:偏差値)
	 * @param outputSelector	出力項目
	 */
	public StandardScore(Pipe previous, Fields groupFields, Fields argumentSelector, Fields fieldDeclaration, Fields outputSelector) {
		String pipeName = previous.getName();

		Comparable<?> ten     = argumentSelector.get(0);
		String ave   = (String) fieldDeclaration.get(0);
		String sigma = (String) fieldDeclaration.get(1);
		String score = (String) fieldDeclaration.get(2);

		// 平均を算出する
		Pipe pipeAve = new Pipe(pipeName + ".average", previous);
		pipeAve = new GroupBy(pipeAve, groupFields);
		pipeAve = new Every(pipeAve, argumentSelector, new Average(new Fields(ave)));

		// 点数と平均を結合し、標準偏差を求める
		Pipe pipeSigma = new CoGroupEx(pipeName + ".sigma", previous, groupFields, pipeAve, groupFields);
		pipeSigma = new Every(pipeSigma, new Fields(ten, ave), new Deviation(new Fields(sigma)));

		// 点数と標準偏差を結合し、偏差値を求める
		Pipe pipe = new CoGroupEx(pipeName,
			Pipe.pipes(previous, pipeAve, pipeSigma),
			Fields.fields(groupFields, groupFields, Fields.size(groupFields.size()))
		);
		pipe = new Each(pipe, new Fields(ten, ave, sigma), new Score(new Fields(score)), outputSelector);

		setTails(pipe);
	}
	/**
	 * 標準偏差を求めるAggregator
	 */
	protected static class Deviation extends BaseOperation<Deviation.Context> implements Aggregator<Deviation.Context> {
		private static final long serialVersionUID = 1L;

		protected static class Context {
			public double sum;
			public int count;

			public void reset() {
				sum = 0;
				count = 0;
			}
		}

		public Deviation(Fields fieldDeclaration) {
			super(2, fieldDeclaration);
			// args0:点数
			// args1:平均点
		}

		@Override
		public void start(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
			Context ctx = aggregatorCall.getContext();
			if (ctx == null) {
				aggregatorCall.setContext(new Context());
			} else {
				ctx.reset();
			}
		}

		@Override
		public void aggregate(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
			Tuple value = aggregatorCall.getArguments().getTuple();
			double ten     = value.getDouble(0);
			double average = value.getDouble(1);

			Context ctx = aggregatorCall.getContext();
			ctx.sum += Math.pow(ten - average, 2);
			ctx.count++;
		}

		@Override
		public void complete(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
			Context ctx = aggregatorCall.getContext();
			aggregatorCall.getOutputCollector().add(new Tuple(Math.sqrt(ctx.sum / ctx.count)));
		}
	}
	/**
	 * 偏差値を求めるFunction
	 */
	protected static class Score extends BaseOperation<Object> implements Function<Object> {
		private static final long serialVersionUID = 1L;

		public Score(Fields fieldDeclaration) {
			super(3, fieldDeclaration);
			// args0:点数
			// args1:平均点
			// args2:標準偏差
		}

		@Override
		public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {
			Tuple value = functionCall.getArguments().getTuple();
			double ten     = value.getInteger(0);
			double average = value.getDouble(1);
			double sigma   = value.getDouble(2);

			double score = (ten - average) * 10 / sigma + 50;
			functionCall.getOutputCollector().add(new Tuple(score));
		}
	}
}

StandardScore(偏差値を求めるSubAssembly)の全ソース


Cascadingへ戻る / Hadoopへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま