Asakusa FrameworkをScalaで記述するAfwHS α版のCoGroupについて。
|
CoGroupは、複数種類のデータを(一致するキーごとに)受け取って、複数種類のデータを出力する。
また、入力データ1件につき複数件(0件も可)のデータを出力することも出来る。
はっきり言って、CoGroupだけあれば何でも出来ちゃう(笑)
ということはシステムによる最適化はしづらい訳で、CoGroupだけで作ったようなプログラムは実行効率(速度やメモリー使用量)は悪い可能性がある。
他の演算で出来る処理なら、そちらを使うべきだろう。
s0.cogroup(名前, キー, 他の入力データ…, 出力データ…) { 処理 }
↓AsakusaFW
@CoGroup public void 名前(キー s0, 他の入力データ…, 出力データ…) { 処理 }
AfwHS | 備考 | |
---|---|---|
s0 | Source | 入力ソースとなるデータ。 |
名前 | String | 演算子の名前。 |
キー | Key | s0の集約キー。 |
入力データ | (Key, Source) | その他の入力データ。キーとのペア。 |
出力データ | Result | 出力データ。 |
処理 | (Source…, Result…)=>Unit | 処理内容を記述した関数。 |
AsakusaFWではプログラマーが自由に入力データと出力データの個数を(いくらでも)指定することが出来るが、
AfwHSではAfwHSが対応している個数までしか扱えない。
この“処理”(関数)は、AsakusaFW(Hadoop)で実際にMapReduceが動く際に呼ばれる。
import scala.collection.JavaConverters._ import jp.hishidama.afwhs._ //モデルクラスとそのラッパークラスと暗黙変換をインポート import afws.wordcount.modelgen.dmdl.model.LineModel import afws.wordcount.modelgen.dmdl.model.LineModelWrapper import afws.wordcount.modelgen.dmdl.model.LineModelImplicit._ import afws.wordcount.modelgen.dmdl.model.WordCountModel import afws.wordcount.modelgen.dmdl.model.WordCountModelWrapper import afws.wordcount.modelgen.dmdl.model.WordCountModelImplicit._
例 | 備考 |
---|---|
val words = in.cogroup("split", "", Result[WordCountModel]("out")) { (s, r) => s.asScala.foreach { model => model.getTextAsString().split("[ \t]+").foreach { word => val wc = WordCountModelWrapper(word, 1) r.add(wc) } } } ↓等価なAsakusaFW @CoGroup public void split(@Key(group={}) List<LineModel> in, Result<WordCountModel> out) { for (LineModel model : in) { for (String word : model.getTextAsString().split("[ \t]+")) { WordCountModel wc = new WordCountModel(); wc.setWordAsString(word); wc.setCount(1); out.add(wc); } } } |
WordCountで単語を分割する例。 1種類のデータを受け取り、1種類(1レコードにつき複数件)のデータを出力する。 inはSource[LineModel]。LineModelのリストが入っているイメージ。 AsakusaFWでのResultの変数名「out」はジョブフローでも使われることになるので、 AfwHSでもResultの引数として渡すようにしている。 |
val count = words.cogroup("sum", "word", Result[WordCountModel]("out")) { (s, r) => val wc = WordCountModelWrapper( s.get(0).getWordAsString, s.asScala.map(_.getCount).sum) r.add(wc) } ↓等価なAsakusaFW @CoGroup public void sum(@Key(group={"word"}) List<WordCountModel> s, Result<WordCountModel> out) { for (WordCountModel model : s) { 〜 } } |
WordCountで単語数を集計する例。 1種類のデータを受け取り、1種類(複数レコードにつき1件)のデータを出力する。 集約キーとして「word」を指定している。 AfwHSでは表面上文字列を渡しているだけだが、これは暗黙変換でKeyというクラスに変換される。 |
val score = split.cogroup("score", "kid", "kid" -> sigma, Result[ScoreModel]("out")) { (list, sigma, r) => 〜 } ↓等価なAsakusaFW @CoGroup public void score( @Key(group={"kid"}) List<WordCountModel> list, @Key(group={"kid"}) List<SigmaModel> sigma, Result<WordCountModel> out) { 〜 } |
偏差値算出サンプルで偏差値を計算する例。 2種類のデータを受け取り、1種類のデータを出力する。 キーがマッチしたデータが渡される。 集約キーの指定方法は、 AsakusaFWではデータの注釈として@Keyを指定しているが、 AfwHSではKeyとデータのペア(タプル)で表現する。 |