Asakusa FrameworkをScalaで記述してみるAfwHS α版のバージョン0.002。
次に、Scalaで記述したコードからAsakusaFWのソースを生成する。
| データモデルのケースクラス | → | DMDL(dmdlファイル) Importer・Exporterのソース |
| 処理本体 | → | 演算子(Operator)のソース ジョブフロー(Flow)のソース |
バージョン0.001でも、データモデルのケースクラスの親クラス(Model)にモデル名を渡しているから、モデル名は取得できる。
フィールド(DMDLでのプロパティー)はリフレクションで取得できる。
演算子やジョブフローは、Scalaで記述した処理を実行して構造を取得する。
バージョン0.001での実行は、入力となるデータ一覧(リスト)を渡して実際に処理を行ったが、
データの代わりに構造を探索するオブジェクトを渡す。
どちらも表面上はSourceクラスだが、中身を変化させて対応する。(かなり癖のあるオブジェクトの使い方^^;)
afwhs_a0.002.zip (24.9kB) [/2011-08-22]
※ソース生成は出来たけど、実行は出来ない(途中で例外が発生する)
バージョン0.001ではrunメソッドで実際の処理を行った。
同じrunメソッドで、ソース生成用のオブジェクトを渡すと構造を探索する。
(runメソッドの呼び出し方以外は全く何も変えていないのがポイント)
def main(args: Array[String]) {
val gen = new Generator(
"C:/workspace", //生成先のワークスペース
"afws-wordcount", //生成先のプロジェクト名
"afws.wordcount", //生成先のパッケージ名
"WordCount" //生成先の主たるクラス名
)
val r = run(gen.newSource())
gen.out(r)
println(gen)
gen.generateDmdl("afwhs_wordcount.dmdl")
gen.generateOperator()
gen.generateImporter()
gen.generateExporter()
gen.generateFlow()
}
先に空のAsakusaFWプロジェクトを作っておく必要がある。
gen.newSource()で探索用のSourceを作っている。
runから戻ってきたSourceをout()に渡すことにしている。
これでフロー全体の入出力データが分かる。
DMDLは単純なので、割愛(笑)
(コメントが無いことを除けば、手で作ったものと区別つかない)
package afws.wordcount.operator;
import java.util.List;
import com.asakusafw.runtime.core.Result;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.*;
import jp.hishidama.afwhs.java.CoGroupScalaFunction;
import afws.wordcount.modelgen.dmdl.model.*;
public abstract class WordCountOperator {
@CoGroup
public void split(
@Key(group={}) List<LineModel> s0,
Result<WordCountModel> out) {
split$function.apply(s0, out);
}
private CoGroupScalaFunction split$function = new CoGroupScalaFunction("afwhs.sample.Sample1WordCount$$anonfun$1", 1, 1);
@CoGroup
public void sum(
@Key(group={"word"}) List<WordCountModel> s0,
Result<WordCountModel> out) {
sum$function.apply(s0, out);
}
private CoGroupScalaFunction sum$function = new CoGroupScalaFunction("afwhs.sample.Sample1WordCount$$anonfun$2", 1, 1);
}
インポート文は「*」を使って手抜きw
split$functionとかCoGroupScalaFunctionとか、謎の変数・クラスが出てきているが。
これは、Scalaの関数を呼び出す為に作ったもの。
AfwHS(Scala)のcogroupは以下のような形になっている。
val splitResult = in.cogroup("split", "", Result[WordCountModel]("out")) { (s1, r1) =>
s1.foreach {
_.text.split("[ \t]+").foreach{ w => r1.add(WordCountModel(w, 1)) }
}
}
この太字の部分は、Scalaの関数。
s1がSource、すなわちAsakusaFWの第1引数のListを受け取る部分で、
r1がResult、すなわちAsakusaFWの第2引数のResultを受け取る。
cogroupメソッドは、この関数を受け取って、内部で呼び出して処理を実行している。
Scalaの関数は実体としてはJavaのクラス(インスタンス)になるので、リフレクションを使って(けっこう無理矢理な気もするけど)呼び出せる。
つまり、処理本体はソース生成せず、Scalaで書かれてコンパイルされたものをそのまま呼び出す。
@Override
public void describe() {
WordCountOperatorFactory.Split split = operator.split(this.in);
WordCountOperatorFactory.Sum sum = operator.sum(split.out);
this.out.add(sum.out);
}
コンストラクター等は特に代わり映えしないので割愛。
フロー本体は機械的にFactoryクラス内の内部クラスの変数を常に定義し、後続でそれを使うようにしている。
なお、接続されていないデータソースをstop演算子に渡すような高度なことはやっていない。
というかそもそもCoGroup(引数2つのみ)とConvertとSummarizeしか対応してないし…。
生成されたソースをコンパイルする為に、AsakusaFWのプロジェクトのビルドパスにいくつか追加する必要がある。
| runtime用jarファイル | アーカイブのlibに入っている | |
| 関数 | アーカイブのソースをコンパイルしてjarファイル化したもの もしくはアーカイブを格納(コンパイル)したEclipseプロジェクトそのもの |
|
| Scalaのライブラリー | Scalaライブラリー(EclipseのScalaプラグイン) (C:\eclipse\configuration\org.eclipse.osgi\bundles\327\1\.cp\lib\scala-library.jar) |
Scala2.9.0.1 |
生成されたソースをコンパイルした後、途中までは(ジョブフローのテストのJUnitで)実行することが出来る。
なお、実行するためには以下のjarファイルをHadoopのクラスパスに加える必要がある。
(HADOOP_HOME/libに入れるのが一番楽)
| runtime用jarファイル | アーカイブのlibに入っている | |
| 関数のjarファイル | アーカイブのソースをコンパイルしてjarファイル化したもの | |
| Scalaのライブラリー | C:\eclipse\configuration\org.eclipse.osgi\bundles\327\1\.cp\lib\scala-library.jar | Scala2.9.0.1 |
で、実行すると途中で例外が発生するのだが。
〜 Caused by: java.lang.ClassCastException: afws.wordcount.modelgen.dmdl.model.LineModel cannot be cast to afwhs.sample.LineModel at afwhs.sample.Sample1WordCount$$anonfun$1$$anonfun$apply$1.apply(Sample1WordCount.scala:10) 〜
モデルクラスが、片方はScalaで作ったケースクラス、もう片方はAsakusaFWでDMDLから生成したデータモデル。
ここをどうやって整合性を取ればいいか、悩み中…。
AsakusaFWで生成したモデルクラスをScalaからそのまま使うという手も考えられる。
ただ、そうするとSummarizeの定義をフロー上に書けなくなるし、ということは実行も出来なくなる(どの項目がどういう集計をするかの情報が無いから)。