S-JIS[2011-08-20/2011-08-23] 変更履歴

AfwHS α0.001

Asakusa FrameworkScalaで記述してみる為の第一歩。
α版のバージョン0.001(笑)


概要

まずは(AsakusaFWに変換できることを念頭に置くが、しかしAsakusaFWを一切使わないで)Scalaだけで実行できるものを作ってみた。

基本構想としては、モデルはScalaのケースクラスで実現する。
AsakusaFWのフローDSLではSourceというクラスでデータの集まりを表したり、オペレーターDSLでデータの出力先をResultというクラスにしていたりするので、その辺りは真似る。

afwhs_a0.001.zip (8.98kB)[/2011-08-23]


WordCountの例

WordCountのモデルクラス

まず、入出力のデータモデルをケースクラスで記述する。

import jp.hishidama.afwhs.Model
case class LineModel(
  var text: String
) extends Model("line_model")
case class WordCountModel(
  var word: String,
  var count: Int
) extends Model("word_count_model")

モデル名(word_count_modelとか)を取得できる必要があるだろうと思ったので、モデルである事を表すクラスを作り、そこにモデル名を渡すようにした。
(今回は使っていない)


WordCountの本体

import jp.hishidama.afwhs._
object Sample1WordCount {
  def run(in: Source[LineModel]) = {

    val splitResult = in.cogroup("split", "", Result[WordCountModel]("out")) { (s1, r1) =>
      s1.foreach {
        _.text.split("[ \t]+").foreach{ w => r1.add(WordCountModel(w, 1)) }
      }
    }
    println(splitResult)

    val sumResult = splitResult.cogroup("sum", "word", Result[WordCountModel]("out")) { (s1, r1) =>
      r1.add(WordCountModel(s1(0).word, s1.map(_.count).sum))
    }

    sumResult //runの戻り値
  }
  def main(args: Array[String]) {
    val r = run(Source(LineModel("Hello World"), LineModel("Hello Asakusa"), LineModel("Hello Hadoop")))
    println(r)
  }
}

runというメソッドを作り、そこに処理を記述している。
イメージとしてはフローDSLのdescribeメソッドに相当する。

AsakusaFWでは処理本体(演算子)とフローを分けているが、AfwHSではScalaでのコレクション操作の記述方法に倣って、同じ場所に記述する。
フローの表現も、AsakusaFWでは演算子群を表すoperatorという変数にSourceを渡すような形式だが、
AfwHSではSourceのメソッドとして演算を呼び出す形になっている。

AfwHS(Scala) AsakusaFW(Java) 備考
val splitResult = in.cogroup(
  "split",
  "",
  Result[WordCountModel]("out")
) {
  (lineList, r) =>
    lineList.foreach { text =>
      〜
      r.add(wc)
    }
}
Source<WordCountModel> splitResult = operator.split(in).out; AfwHSでは演算メソッドの第1引数で演算子名を指定する。
(特に使ってはいないが)

Resultにも変数名を渡している。
(特に使ってはいないが)
@CoGroup
public void split(
  @Key(group = {}) List<LineModel> lineList,
  Result<WordCountModel> out
) {
  for (text: lineList) {
    〜
    out.add(wc);
  }
}
println(splitResult)   AfwHSではSourceは実際のデータのListを保持しているので、
単純にprintlnで表示することが出来る。
val sumResult = splitResult.cogroup(
  "sum",
  "word",
  Result[WordCountModel]("out")
) {
  (wordCountList, r) =>
    〜
    r.add(wc)
}
Source<WordCountModel> sumResult = operator.sum(splitResult).out;  
@CoGroup
public void sum(
  @Key(group = { "word" }) List<WordCountModel> wordCountList,
  Result<WordCountModel> out
) {
    〜
    out.add(wc);
}

なお、cogroupの第2引数の型はKeyというクラスにしてある。
この例では呼び出すときに文字列を渡しているが、暗黙変換によってKeyクラスの値に変換されている。


実行結果

Source++List(WordCountModel(Hello,1), WordCountModel(World,1), WordCountModel(Hello,1), WordCountModel(Asakusa,1), WordCountModel(Hello,1), WordCountModel(Hadoop,1))
Source++List(WordCountModel(World,1), WordCountModel(Asakusa,1), WordCountModel(Hello,3), WordCountModel(Hadoop,1))

1行目は中間のsplitResultの値で、2行目が最終的な処理結果。


偏差値算出の例

AsakusaFWで作った偏差値算出のサンプルでは、CoGroupの他にSummarizeやConvertを使っている。


Summarize

AsakusaFWでは、Summarizeで各項目がどのような演算(集計・カウント)を行うかはDMDL上に記述する。
しかしデータクラスは何かを保持する単純な入れ物であって、どのような処理をするかは演算子内に書いた方が統一がとれるんじゃないか
と思って、今回はあえて演算子側で集計内容を指定するようにしてみた。

データクラスを見たときにどういう集計を行うか分かるのも便利であり、その考え方だとデータクラスの各フィールドにアノテーションで集計方法を記述する感じになるだろう。

case class KamokuTotal(
  var kid: Int,
  var ten: Long,
  var count: Long
) extends Model("kamoku_total")
val total = split.summarize[KamokuTotal]("total", SAny("kid"), SSum("ten"), SCount("sid" -> "count"))

えーと、今回のAfwHSでは、SAnyが指定されたら常にキー扱いにしている(汗)
SAnyやSSumで指定しているのは項目名の文字列なので、実際の処理時はリフレクションを使って頑張ってる(苦笑)

AsakusaFWのSummarizeの記述:

summarized kamoku_total = kamoku_model => {
  any   kid -> kid;
  sum   ten -> ten;
  count sid -> count;
} % kid;
@Summarize
public abstract KamokuTotal total(KamokuModel kamoku);
Source<KamokuTotal> kamokuTotal = operator.total(kamoku).out;

Convert

Convertは、データ(クラス)の変換(値の移し替え)を行う為の演算子。

AsakusaFWのConvert

AsakusaFWのConvertは、フローDSLではoutとoriginalの2つが出力される。
originalは変換前のデータであり、あまり使用しないのではないかと思う。
使用しないデータはstop演算子に渡して「使わない」ことを明示する。

Average a = operator.average(kamokuTotal);
Source<AverageModel> average = a.out;
core.stop(a.original);

AfwHSのconvert

Scalaでは2つ(複数)の戻り値がある場合はタプルが使えるので、下記の様になる。

val (average, _) = total.convert("average") { 処理本体 }

↑ここでの「_」は、後続で使用しない変数を意味する。

ちなみにこのconvertの書き方と意味合い(処理内容)は、(演算子名を渡す部分を除けば、)Scalaのmap関数と全く同じ。


ちなみに処理本体もScalaの方がすっきり(笑)

AfwHS(Scala) AsakusaFW(Java)
total.convert("average") {
  s =>
    AverageModel(
      s.kid,
      s.ten,
      s.count,
      s.ten.toDouble / s.count
    )
}
@Convert
public AverageModel average(KamokuTotal s) {
  AverageModel r = new AverageModel();
  r.setKid(s.getKid());
  r.setTen(s.getTen());
  r.setCount(s.getCount());
  r.setAverage((double) s.getTen() / s.getCount());
  return r;
}

語彙化

このWordCount偏差値算出では、Sourceのcogroupやsummarizeといったメソッドを呼び出している。[2011-08-22]
これだと「どういう種類の演算を行うか」は分かるものの、「業務的(仕様的)にどんな処理を行っているか」は分からない。

AsakusaFWでは、データモデルや演算子・フロー等のことを「vocabulary(語彙)」と呼ぶ。(vocabularyというパッケージの下にそれらを扱うクラスがある)
業務で使われる言葉(語彙)がプログラム上に現れるという意図が込められていると思う。

そこで、AfwHSでも業務(処理内容)を表す単語に変えてみる。


WordCountの例

Scalaでは既存クラスに自分用のメソッドを追加する(追加したかのように見せかける)ことが出来る。(「Pimp My Library」という、Scalaでは有名な方法)
これを使って、Sourceに処理内容が分かるメソッドを追加してみる。

  implicit def implicitLineModel(in: Source[LineModel]) = new {
    def splitText() = in.cogroup("split", "", Result[WordCountModel]("out")) { (s1, r1) =>
      s1.foreach {
        _.text.split("[ \t]+").foreach{ w => r1.add(WordCountModel(w, 1)) }
      }
    }
  }

  implicit def implicitWordCountModel(in: Source[WordCountModel]) = new {
    def sumByWord() = in.cogroup("sum", "word", Result[WordCountModel]("out")) { (s1, r1) =>
      r1.add(WordCountModel(s1(0).word, s1.map(_.count).sum))
    }
  }
  def run(in: Source[LineModel]) = {
    in.splitText().sumByWord()
  }

runメソッドの中がむちゃくちゃすっきりした(笑)


偏差値算出の例

  def run(個人点数: Source[TenModel]) = {
    val 科目別個人点数   = 個人点数.splitByKamoku()
    val 科目別平均点     = 科目別個人点数.calcTotal().calcAverage()
    val 科目別標準偏差   = 科目別個人点数.calcSigma(科目別平均点)
    val 科目別個人偏差値 = 科目別個人点数.calcScore(科目別標準偏差)
    科目別個人偏差値
  }

Scalaでは(Javaでも)変数名(やメソッド名等の識別子)に日本語を使えるので、変数名を日本語にしてみた。
ある意味これはコメントと同じなので、分かりやすいと思う。


もう一歩進んで、cogroupに渡している関数も切り出して独立したメソッドにしてやると、その部分のみの単体テストが出来るようになる。[2011-08-23]

WordCountの例

object Sample1WordCount3 {
  implicit def implicitLineModel(in: Source[LineModel]) = new {
    def splitText() = in.cogroup("split", "", Result[WordCountModel]("out"))(Sample1WordCount3.splitText)
  }

  def splitText(s1: Seq[LineModel], r1: Result[WordCountModel]): Unit = {
    s1.foreach {
      _.text.split("[ \t]+").foreach { w => r1.add(WordCountModel(w, 1)) }
    }
  }
  implicit def implicitWordCountModel(in: Source[WordCountModel]) = new {
    def sumByWord() = in.cogroup("sum", "word", Result[WordCountModel]("out"))(Sample1WordCount3.sumByWord)
  }

  def sumByWord(s1: Seq[WordCountModel], r1: Result[WordCountModel]): Unit = {
    r1.add(WordCountModel(s1(0).word, s1.map(_.count).sum))
  }

AfwHS目次へ戻る / AsakusaFWへ戻る / 技術メモへ戻る
メールの送信先:ひしだま