S-JIS[2011-09-04] 変更履歴

AfwHS CoGroup

Asakusa FrameworkScalaで記述するAfwHS α版のCoGroupについて。


概要

CoGroupは、複数種類のデータを(一致するキーごとに)受け取って、複数種類のデータを出力する。
また、入力データ1件につき複数件(0件も可)のデータを出力することも出来る。

はっきり言って、CoGroupだけあれば何でも出来ちゃう(笑)
ということはシステムによる最適化はしづらい訳で、CoGroupだけで作ったようなプログラムは実行効率(速度やメモリー使用量)は悪い可能性がある。
他の演算で出来る処理なら、そちらを使うべきだろう。


CoGroupの使用方法

s0.cogroup(名前, キー, 他の入力データ…, 出力データ…) { 処理 }

↓AsakusaFW

@CoGroup
public void 名前(キー s0, 他の入力データ…, 出力データ…) { 処理 }
AfwHS 備考
s0 Source 入力ソースとなるデータ。
名前 String 演算子の名前。
キー Key s0の集約キー。
入力データ (Key, Source) その他の入力データ。キーとのペア。
出力データ Result 出力データ。
処理 (Source…, Result…)=>Unit 処理内容を記述した関数。

AsakusaFWではプログラマーが自由に入力データと出力データの個数を(いくらでも)指定することが出来るが、
AfwHSではAfwHSが対応している個数までしか扱えない。

この“処理”(関数)は、AsakusaFW(Hadoop)で実際にMapReduceが動く際に呼ばれる。


CoGroupの使用例

import scala.collection.JavaConverters._

import jp.hishidama.afwhs._

//モデルクラスとそのラッパークラスと暗黙変換をインポート
import afws.wordcount.modelgen.dmdl.model.LineModel
import afws.wordcount.modelgen.dmdl.model.LineModelWrapper
import afws.wordcount.modelgen.dmdl.model.LineModelImplicit._
import afws.wordcount.modelgen.dmdl.model.WordCountModel
import afws.wordcount.modelgen.dmdl.model.WordCountModelWrapper
import afws.wordcount.modelgen.dmdl.model.WordCountModelImplicit._
備考
val words = in.cogroup("split", "", Result[WordCountModel]("out")) {
  (s, r) =>
    s.asScala.foreach { model =>
      model.getTextAsString().split("[ \t]+").foreach { word =>
        val wc = WordCountModelWrapper(word, 1)
        r.add(wc)
      }
    }
}

↓等価なAsakusaFW

@CoGroup
public void split(@Key(group={}) List<LineModel> in, Result<WordCountModel> out) {
  for (LineModel model : in) {
    for (String word : model.getTextAsString().split("[ \t]+")) {
      WordCountModel wc = new WordCountModel();
      wc.setWordAsString(word);
      wc.setCount(1);
      out.add(wc);
    }
  }
}
WordCountで単語を分割する例。
1種類のデータを受け取り、1種類(1レコードにつき複数件)のデータを出力する。

inはSource[LineModel]。LineModelのリストが入っているイメージ。
AsakusaFWでのResultの変数名「out」はジョブフローでも使われることになるので、
AfwHSでもResultの引数として渡すようにしている。
val count = words.cogroup("sum", "word", Result[WordCountModel]("out")) {
  (s, r) =>
    val wc = WordCountModelWrapper(
      s.get(0).getWordAsString,
      s.asScala.map(_.getCount).sum)
    r.add(wc)
}

↓等価なAsakusaFW

@CoGroup
public void sum(@Key(group={"word"}) List<WordCountModel> s, Result<WordCountModel> out) {
  for (WordCountModel model : s) {
    〜
  }
}
WordCountで単語数を集計する例。
1種類のデータを受け取り、1種類(複数レコードにつき1件)のデータを出力する。

集約キーとして「word」を指定している。
AfwHSでは表面上文字列を渡しているだけだが、これは暗黙変換でKeyというクラスに変換される。
val score = split.cogroup("score", "kid", "kid" -> sigma, Result[ScoreModel]("out")) {
  (list, sigma, r) =>
    〜
}

↓等価なAsakusaFW

@CoGroup
public void score(
  @Key(group={"kid"}) List<WordCountModel> list,
  @Key(group={"kid"}) List<SigmaModel> sigma,
  Result<WordCountModel> out) {
    〜
}
偏差値算出サンプルで偏差値を計算する例。
2種類のデータを受け取り、1種類のデータを出力する。
キーがマッチしたデータが渡される。

集約キーの指定方法は、
AsakusaFWではデータの注釈として@Keyを指定しているが、
AfwHSではKeyとデータのペア(タプル)で表現する。

使用方法へ戻る / AfwHS目次へ戻る / AsakusaFWへ戻る / 技術メモへ戻る
メールの送信先:ひしだま