Asakusa FrameworkをScalaで記述するAfwHS α版でWordCountを書いてみる。
インストール手順に従って、空プロジェクト(AsakusaFWプロジェクトとScalaプロジェクト)を作成し、依存関係(ビルドパス)を設定しておく。
プロジェクト | プロジェクト名の例 | パッケージ名の例 | 備考 |
---|---|---|---|
AsakusaFWプロジェクト | afwhs-wordcount | sample.wordcount |
AsakusaFW 0.2.1 |
Scalaプロジェクト | afwhs-wordcount-scala | sample.wordcount |
Scala2.9.1 |
AfwHS用ラッパークラスを作成する場所は以下の様に設定する。
<java classname="com.asakusafw.dmdl.java.Main" classpath="${compile_classpath}" fork="true" failonerror="true"> 〜 <arg value="-plugin" /> <arg value="C:/workspace/afwhs-wordcount/lib/afwhs-driver.jar" /> <jvmarg value="-Dafwhs.model.wrapper.dir=C:/workspace/afwhs-wordcount-scala/src/sample/wordcount/modelgen/dmdl/model" /> </java>
一番最初にするのは、通常のAsakusaFWでのプログラミングと同じく、DMDL(データモデル)を定義すること。
line_model = { text : TEXT; }; word_count_model = { word : TEXT; count : INT; };
↑これは、普通のAsakusaFWでのWordCountと全く同じ。
DMDLが書けたら、コンパイルする。
> mvn generate-resources
これで、通常のモデルクラスの他に、Scalaプロジェクトの下にラッパークラス(のScalaソース)が作られる。
> dir/b C:\workspace\afwhs-wordcount-scala\src\sample\wordcount\modelgen\dmdl\model LineModelWrapper.scala WordCountModelWrapper.scala
生成されたソースは以下の様な感じ。
package afws.wordcount.modelgen.dmdl.model
class LineModelWrapper(model$: LineModel) { def text = model$.getTextAsString() def text_=(v: String) = model$.setTextAsString(v) def asJava = model$ }
object LineModelWrapper { def apply(text: String) = { 〜 } def unapply(a: Any) = a match { 〜 } }
trait LineModelImplicit { class AsScala(model: LineModel) { def asScala = new LineModelWrapper(model) } implicit def asScalaLineModel(model: LineModel) = new AsScala(model) implicit def wrapper2LineModel(wrapper: LineModelWrapper) = wrapper.asJava }
object LineModelImplicit extends LineModelImplicit { }
クラス/オブジェクト/トレイト | 説明 |
---|---|
class LineModelWrapper | データモデルのラッパークラス本体。 本来のデータモデルに対し、プロパティー名でセット・ゲットできるような定義にしてある。 また、「asJava」メソッドでデータモデル本体を返す。 |
object LineModelWrapper | ラッパークラスのコンパニオンオブジェクト。 apply()やunapply()が定義される。 Summarizeの場合はこの中に集計用関数が定義される。 |
trait LineModelImplicit |
暗黙変換を定義する為のトレイト。 このトレイトをミックスインすれば、暗黙変換が使えるようになる。 「asScala」メソッドはここで定義されている。 |
object LineModelImplicit | 暗黙変換用トレイトをミックスインしたオブジェクト。 このオブジェクトをインポートすれば、暗黙変換が使えるようになる。 |
暗黙変換用にトレイトとオブジェクトの2種類があるのは、
暗黙変換を取り込む方法として、ミックスインする方法とインポートする方法の2通りの方法が使えるということ。
通常であれば変換オブジェクトの方をインポートすればいいが、モデルの個数だけ記述する必要が出てくる。
例えばmodelパッケージのパッケージオブジェクトを作ってそこに変換トレイトをミックスインしておけば、modelパッケージをインポートするだけで暗黙変換が使えるようになる。
使用例1 (変換オブジェクトを直接インポート) |
import sample.wordcount.modelgen.dmdl.model.LineModel import sample.wordcount.modelgen.dmdl.model.LineModelImplicit._ object WordCount { def run() = { 〜 val wrapper = model.asScala 〜 } } |
使用例2 (変換トレイトをミックスイン) |
import sample.wordcount.modelgen.dmdl.model.LineModel import sample.wordcount.modelgen.dmdl.model.LineModelImplicit object WordCount extends LineModelImplicit { def run() = { 〜 val wrapper = model.asScala 〜 } } |
使用例3 (パッケージオブジェクトに 変換トレイトをミックスイン) |
package sample.wordcount.modelgen.dmdl package object model extends LineModelImplicit with WordCountModelImplicit { } |
import sample.wordcount.modelgen.dmdl.model._ object WordCount { def run() = { 〜 val wrapper = model.asScala 〜 } } |
次に、処理本体を記述する。
main()から実行する前提で、オブジェクトの中に書く。
package sample.wordcount
import scala.collection.JavaConverters._ import jp.hishidama.afwhs._ import sample.wordcount.modelgen.dmdl.model.LineModel import sample.wordcount.modelgen.dmdl.model.LineModelWrapper import sample.wordcount.modelgen.dmdl.model.LineModelImplicit._ import sample.wordcount.modelgen.dmdl.model.WordCountModel import sample.wordcount.modelgen.dmdl.model.WordCountModelWrapper import sample.wordcount.modelgen.dmdl.model.WordCountModelImplicit._
object WordCount {
def run(in: Source[LineModel]) = { val words = in.cogroup("split", "", Result[WordCountModel]("out")) { (s, r) => s.asScala.foreach { _.getTextAsString().split("[ \t]+").foreach { word => r.add(WordCountModelWrapper(word, 1)) } } val count = words.cogroup("sum", "word", Result[WordCountModel]("out")) { (s, r) => r.add(WordCountModelWrapper(s.get(0).getWordAsString, s.asScala.map(_.getCount).sum)) } count }
run()の中が処理本体。
AsakusaFWに似せた、AfwHSライブラリーのメソッド(cogroup等)を呼び出す。
cogroupを使ったWordCountの処理方法は、AsakusaFWで@CoGroupを使った方法と同等。
AsakusaFWではCoGroup等の処理をOperatorクラスに書き、それをどの順序で実行するかはFlowクラスに書く。
が、AfwHSでは通常のScalaのコレクションライブラリーを使用して処理をつないで書くのと同じ様な感じで一箇所に記述する。
以下の様に一直線につなげて書くことも出来る。
def run(in: Source[LineModel]) = { in.cogroup("split", "", Result[WordCountModel]("out")) { (s, r) => s.asScala.foreach { _.getTextAsString().split("[ \t]+").foreach { word => r.add(WordCountModelWrapper(word, 1)) } }.cogroup("sum", "word", Result[WordCountModel]("out")) { (s, r) => r.add(WordCountModelWrapper(s.get(0).getWordAsString, s.asScala.map(_.getCount).sum)) } }
SourceやResultはAsakusaFWの入力データ・結果出力データクラスを表す、AfwHS独自のクラス。
関数の引数であるsやrは、AsakusaFWのCoGroupで使用するjava.util.List・com.asakusa.〜.Resultそのもの。
(なので、関数内では「asScala」を使ってjava.util.ListをScalaのListに変換して処理している)
r.add()も、AsakusaFWでの「out.add()」と全く同じ。(暗黙変換があるので、ラッパークラスからモデルクラスへは自動的に変換される)
run()を呼び出す部分をmain()に記述する。
def main(args: Array[String]) { val slist = List("Hello World", "Hello Asakusa", "Hello Hadoop") val mlist = slist.map { s => LineModelWrapper(s).asJava } val r = run(Source(mlist)) println(r) //実行結果の表示 } }
runの引数には入力データを表すSourceを渡す。
Sourceオブジェクトは、データモデルのリストを受け取って入力データとする。
このプログラムは、Scalaのプログラムとしてそのまま実行することが出来る。
※この実行方法では、(AsakusaFWのモデルクラスをそのまま使っている以外は)AsakusaFWやHadoopは全く関与しない。
(AfwHSライブラリーが、AsakusaFWの動作を真似てScalaのプログラムとして実行する為)
Source++List({class=word_count_model, word=World, count=1}, {class=word_count_model, word=Asakusa, count=1}, {class=word_count_model, word=Hello, count=3}, {class=word_count_model, word=Hadoop, count=1})
(実際は一行で表示されるが、見易さの為に改行してある)
こうして作ったプログラムを、Asakusa DSLに変換することが出来る。
runメソッドの中身はそのままにして、呼び出し方を変えるだけでいい。
run()の引数はSourceだが、Asakusa DSL生成用のSourceを渡すと、処理を行う代わりに生成情報を構築する。
def main(args: Array[String]) { val gen = new Generator("C:/workspace", "afwhs-wordcount", "sample.wordcount", "WordCount") val r = run(gen.newSource("in")) gen.out(r) gen.prepareGenerate() //println(gen) gen.generateOperator(true) gen.generateImporter(false) gen.generateExporter(false) gen.generateFlow(true) gen.generateFlowTest(true) } }
Generatorの引数は、生成場所を表す。
ワークスペースのディレクトリー、AsakusaFWプロジェクトの名前(ディレクトリー)、パッケージ、生成するのに使うクラス名。
newSource()で生成情報構築用のSourceを作成する。
ジョブフローで使われる名前とFileImporterで返すファイル名を引数で指定できる。
run()の戻り値はout()に渡す。
そしてprepareGenerate()で前準備を行った後で、各ソースを生成するメソッド(generateOperator等)を呼び出す。
引数をtrueにしておくと、常に生成する(ファイルが存在していたら上書きする)。falseだと、ファイルが存在する場合は何もしない。
Operator・JobFlow(Importer・Exporter)・FlowTestまで、AsakusaFWでの実行に必要なものは一通り生成できる。
ソース | ソースの内容(イメージ) | 備考 |
---|---|---|
Operator |
public abstract class WordCountOperator { @CoGroup public void split( @Key(group={}) List<LineModel> s0, Result<WordCountModel> out) { split$function.apply(s0, out); } private CoGroupScalaFunction split$function = new CoGroupScalaFunction("afwhs.sample.Sample1WordCount$$anonfun$1", 1, 1); @CoGroup public void sum( @Key(group={"word"}) List<WordCountModel> s0, Result<WordCountModel> out) { sum$function.apply(s0, out); } private CoGroupScalaFunction sum$function = new CoGroupScalaFunction("afwhs.sample.Sample1WordCount$$anonfun$2", 1, 1); } |
CoGroupだと、Scala側で作った関数を リフレクションで呼び出すような形になる。 |
Flow |
@JobFlow(name = "WordCountJob") public class WordCountJob extends FlowDescription { In<LineModel> in; Out<WordCountModel> out; public WordCountJob( @Import(name = "in", description = LineModelFileImporter.class) In<LineModel> intext, @Export(name = "out", description = WordCountModelFileExporter.class) Out<WordCountModel> out) { this.in = in; this.out = out; } WordCountOperatorFactory operator = new WordCountOperatorFactory(); @Override public void describe() { WordCountOperatorFactory.Split split = operator.split(this.in); WordCountOperatorFactory.Sum sum = operator.sum(split.out); this.out.add(sum.out); } } |
コンストラクター周りは通常のAsakusa DSLと同じだが describeの中身はけっこう機械的になっている^^; |
もうひとつ、AsakusaFWで実行する為に、Scalaで作ったプログラムをjarファイル化しておく必要がある。
CoGroup等ではScalaで作った関数をそのまま呼び出して実行する為。
jarファイルの生成先は、ASAKUSA_HOME/ext/libにする。(ここに入れておくと、AsakusaFWがHadoop上で実行される際に取り込まれる)
<?xml version="1.0" encoding="UTF-8"?> <project name="afwhs-wordcount-scala" default="jar" basedir="."> <property environment="env" /> <property name="asakusa.home" location="${env.ASAKUSA_HOME}" /> <target name="jar"> <jar jarfile="${asakusa.home}/ext/lib/afwhs-wordcount-scala.jar"> <fileset dir="../classes" includes="**/*" /> </jar> </target> </project>
あとは通常のAsakusaFWのジョブフローのテストと同様にExcelファイルでテストデータと検証データを用意して、実行できる。
集計で(cogroupを使うのではなく)summarizeを使う例。(→AsakusaFWのWordCountで@Summarizeを使う例)
summarizeでは、DMDLで集計項目を指定する。
〜 summarized word_count_total = word_count_model => { any word -> word; count count -> count; } % word;
そしてモデル生成を行うと、ラッパークラス内に集計用の関数が生成される。
object WordCountTotalWrapper { 〜 def summarize(d: WordCountTotal, s: WordCountModel) { d.setWordAsString(s.getWordAsString()) if (d.getCountOption().isNull) d.setCount(1) else d.getCountOption().add(1) } }
この関数は、Scalaプログラムとして実行する際に使用する。(この関数を使うよう、自分でコーディングする)
(この関数は、Asakusa DSLへの変換やAsakusaFW(Hadoop)での実行には使われない)
object Sample1WordCount { def run(in: Source[LineModel]) = { 〜 val total = words.summarize("sum2")(WordCountTotalWrapper.summarize) total } 〜 }
AsakusaFWのWordCountで集計方法をSummarizeに改造したら、フロー全体の戻り値が別のモデルクラスになってしまい、ジョブフローも修正する必要があった。
そこでConvertによって元のモデルクラスに変換するなんてことをやった。
AfwHSの場合はジョブフローまで生成し直せるので、せいぜいテスト用のExcelデータを用意するくらいしかする事は無い。
が、せっかくなのでconvertも作ってみた。
object WordCount {
def run(in: Source[LineModel]) = {
〜
val total = words.summarize("sum2")(WordCountTotalWrapper.summarize)
val (conv, _) = total.convert("convert") { s =>
WordCountModelWrapper(
s.getWordAsString(),
s.getCount().toInt //元がLongなので、Intへ変換
).asJava
}
conv
}
〜
}
@Convertでは戻り値が2種類(変換データと変換元データが)あるので、convertではタプルで戻すようになっている。
このうち、使わない方は変数名を「_」に出来る。(Scalaの機能)
ただし、AsakusaFWではどこにも出力先が無いデータ(結線されないデータ)があるとエラーになるので、
AfwHSでは、こういったデータをAsakusa DSLに変換する際にはstop演算子へ出力するようにしてある。
(これは「人間のミスかもしれないのでエラーにする」というAsakusaFWでのポリシーとは異なる。…stop演算子のような処理をAfwHSで作るのが面倒だった為、仕方ない(爆)
というか、EclipseでJavaを扱う場合、未使用変数は警告になる。ScalaでもIDEが賢くなればそういう警告が出せるはずなので、人間のミスならその警告で気付くだろうという考え)
集計でfoldを使う例。(→AsakusaFWのWordCountで@Foldを使う例)
object WordCount { def run(in: Source[LineModel]) = { 〜 words.fold("sum3", "word") { (left, right) => left.setCount(left.getCount() + right.getCount()) } } 〜 }
left・rightは畳み込み対象となるモデルクラス(ここではWordCountModel)。
Scala用のラッパークラスを使うと「+=」演算子で合算できるので、ちょっと格好いい(笑)
words.fold("sum3", "word") { (left, right) => val wl = left.asScala val wr = right.asScala wl.count += wr.count }