Asakusa FrameworkをScalaで記述してみる為の第一歩。
α版のバージョン0.001(笑)
|
まずは(AsakusaFWに変換できることを念頭に置くが、しかしAsakusaFWを一切使わないで)Scalaだけで実行できるものを作ってみた。
基本構想としては、モデルはScalaのケースクラスで実現する。
AsakusaFWのフローDSLではSourceというクラスでデータの集まりを表したり、オペレーターDSLでデータの出力先をResultというクラスにしていたりするので、その辺りは真似る。
afwhs_a0.001.zip (8.98kB)[/2011-08-23]
まず、入出力のデータモデルをケースクラスで記述する。
import jp.hishidama.afwhs.Model
case class LineModel( var text: String ) extends Model("line_model")
case class WordCountModel( var word: String, var count: Int ) extends Model("word_count_model")
モデル名(word_count_modelとか)を取得できる必要があるだろうと思ったので、モデルである事を表すクラスを作り、そこにモデル名を渡すようにした。
(今回は使っていない)
import jp.hishidama.afwhs._
object Sample1WordCount {
def run(in: Source[LineModel]) = { val splitResult = in.cogroup("split", "", Result[WordCountModel]("out")) { (s1, r1) => s1.foreach { _.text.split("[ \t]+").foreach{ w => r1.add(WordCountModel(w, 1)) } } } println(splitResult) val sumResult = splitResult.cogroup("sum", "word", Result[WordCountModel]("out")) { (s1, r1) => r1.add(WordCountModel(s1(0).word, s1.map(_.count).sum)) } sumResult //runの戻り値 }
def main(args: Array[String]) { val r = run(Source(LineModel("Hello World"), LineModel("Hello Asakusa"), LineModel("Hello Hadoop"))) println(r) } }
runというメソッドを作り、そこに処理を記述している。
イメージとしてはフローDSLのdescribeメソッドに相当する。
AsakusaFWでは処理本体(演算子)とフローを分けているが、AfwHSではScalaでのコレクション操作の記述方法に倣って、同じ場所に記述する。
フローの表現も、AsakusaFWでは演算子群を表すoperatorという変数にSourceを渡すような形式だが、
AfwHSではSourceのメソッドとして演算を呼び出す形になっている。
AfwHS(Scala) | AsakusaFW(Java) | 備考 |
---|---|---|
val splitResult = in.cogroup( |
Source<WordCountModel> splitResult = operator.split(in).out; |
AfwHSでは演算メソッドの第1引数で演算子名を指定する。 (特に使ってはいないが) Resultにも変数名を渡している。 (特に使ってはいないが) |
@CoGroup |
||
println(splitResult) |
AfwHSではSourceは実際のデータのListを保持しているので、 単純にprintlnで表示することが出来る。 |
|
val sumResult = splitResult.cogroup( |
Source<WordCountModel> sumResult =
operator.sum(splitResult).out; |
|
@CoGroup |
なお、cogroupの第2引数の型はKeyというクラスにしてある。
この例では呼び出すときに文字列を渡しているが、暗黙変換によってKeyクラスの値に変換されている。
Source++List(WordCountModel(Hello,1), WordCountModel(World,1), WordCountModel(Hello,1), WordCountModel(Asakusa,1), WordCountModel(Hello,1), WordCountModel(Hadoop,1)) Source++List(WordCountModel(World,1), WordCountModel(Asakusa,1), WordCountModel(Hello,3), WordCountModel(Hadoop,1))
1行目は中間のsplitResultの値で、2行目が最終的な処理結果。
AsakusaFWで作った偏差値算出のサンプルでは、CoGroupの他にSummarizeやConvertを使っている。
AsakusaFWでは、Summarizeで各項目がどのような演算(集計・カウント)を行うかはDMDL上に記述する。
しかしデータクラスは何かを保持する単純な入れ物であって、どのような処理をするかは演算子内に書いた方が統一がとれるんじゃないか
と思って、今回はあえて演算子側で集計内容を指定するようにしてみた。
データクラスを見たときにどういう集計を行うか分かるのも便利であり、その考え方だとデータクラスの各フィールドにアノテーションで集計方法を記述する感じになるだろう。
case class KamokuTotal( var kid: Int, var ten: Long, var count: Long ) extends Model("kamoku_total")
val total = split.summarize[KamokuTotal]("total", SAny("kid"), SSum("ten"), SCount("sid" -> "count"))
えーと、今回のAfwHSでは、SAnyが指定されたら常にキー扱いにしている(汗)
SAnyやSSumで指定しているのは項目名の文字列なので、実際の処理時はリフレクションを使って頑張ってる(苦笑)
summarized kamoku_total = kamoku_model => { any kid -> kid; sum ten -> ten; count sid -> count; } % kid;
@Summarize public abstract KamokuTotal total(KamokuModel kamoku);
Source<KamokuTotal> kamokuTotal = operator.total(kamoku).out
;
Convertは、データ(クラス)の変換(値の移し替え)を行う為の演算子。
AsakusaFWのConvertは、フローDSLではoutとoriginalの2つが出力される。
originalは変換前のデータであり、あまり使用しないのではないかと思う。
使用しないデータはstop演算子に渡して「使わない」ことを明示する。
Average a = operator.average(kamokuTotal); Source<AverageModel> average = a.out; core.stop(a.original);
Scalaでは2つ(複数)の戻り値がある場合はタプルが使えるので、下記の様になる。
val (average, _) = total.convert("average") { 処理本体 }
↑ここでの「_」は、後続で使用しない変数を意味する。
ちなみにこのconvertの書き方と意味合い(処理内容)は、(演算子名を渡す部分を除けば、)Scalaのmap関数と全く同じ。
ちなみに処理本体もScalaの方がすっきり(笑)
AfwHS(Scala) | AsakusaFW(Java) |
---|---|
total.convert("average") { s => AverageModel( s.kid, s.ten, s.count, s.ten.toDouble / s.count ) } |
@Convert public AverageModel average(KamokuTotal s) { AverageModel r = new AverageModel(); r.setKid(s.getKid()); r.setTen(s.getTen()); r.setCount(s.getCount()); r.setAverage((double) s.getTen() / s.getCount()); return r; } |
このWordCountや偏差値算出では、Sourceのcogroupやsummarizeといったメソッドを呼び出している。[2011-08-22]
これだと「どういう種類の演算を行うか」は分かるものの、「業務的(仕様的)にどんな処理を行っているか」は分からない。
AsakusaFWでは、データモデルや演算子・フロー等のことを「vocabulary(語彙)」と呼ぶ。(vocabularyというパッケージの下にそれらを扱うクラスがある)
業務で使われる言葉(語彙)がプログラム上に現れるという意図が込められていると思う。
そこで、AfwHSでも業務(処理内容)を表す単語に変えてみる。
Scalaでは既存クラスに自分用のメソッドを追加する(追加したかのように見せかける)ことが出来る。(「Pimp My Library」という、Scalaでは有名な方法)
これを使って、Sourceに処理内容が分かるメソッドを追加してみる。
implicit def implicitLineModel(in: Source[LineModel]) = new { def splitText() = in.cogroup("split", "", Result[WordCountModel]("out")) { (s1, r1) => s1.foreach { _.text.split("[ \t]+").foreach{ w => r1.add(WordCountModel(w, 1)) } } } } implicit def implicitWordCountModel(in: Source[WordCountModel]) = new { def sumByWord() = in.cogroup("sum", "word", Result[WordCountModel]("out")) { (s1, r1) => r1.add(WordCountModel(s1(0).word, s1.map(_.count).sum)) } }
def run(in: Source[LineModel]) = { in.splitText().sumByWord() }
runメソッドの中がむちゃくちゃすっきりした(笑)
def run(個人点数: Source[TenModel]) = { val 科目別個人点数 = 個人点数.splitByKamoku() val 科目別平均点 = 科目別個人点数.calcTotal().calcAverage() val 科目別標準偏差 = 科目別個人点数.calcSigma(科目別平均点) val 科目別個人偏差値 = 科目別個人点数.calcScore(科目別標準偏差) 科目別個人偏差値 }
Scalaでは(Javaでも)変数名(やメソッド名等の識別子)に日本語を使えるので、変数名を日本語にしてみた。
ある意味これはコメントと同じなので、分かりやすいと思う。
もう一歩進んで、cogroupに渡している関数も切り出して独立したメソッドにしてやると、その部分のみの単体テストが出来るようになる。[2011-08-23]
object Sample1WordCount3 {
implicit def implicitLineModel(in: Source[LineModel]) = new { def splitText() = in.cogroup("split", "", Result[WordCountModel]("out"))(Sample1WordCount3.splitText) } def splitText(s1: Seq[LineModel], r1: Result[WordCountModel]): Unit = { s1.foreach { _.text.split("[ \t]+").foreach { w => r1.add(WordCountModel(w, 1)) } } }
implicit def implicitWordCountModel(in: Source[WordCountModel]) = new { def sumByWord() = in.cogroup("sum", "word", Result[WordCountModel]("out"))(Sample1WordCount3.sumByWord) } def sumByWord(s1: Seq[WordCountModel], r1: Result[WordCountModel]): Unit = { r1.add(WordCountModel(s1(0).word, s1.map(_.count).sum)) }