Asakusa FrameworkをScalaで記述してみるAfwHS α版のバージョン0.002。
次に、Scalaで記述したコードからAsakusaFWのソースを生成する。
データモデルのケースクラス | → | DMDL(dmdlファイル) Importer・Exporterのソース |
処理本体 | → | 演算子(Operator)のソース ジョブフロー(Flow)のソース |
バージョン0.001でも、データモデルのケースクラスの親クラス(Model)にモデル名を渡しているから、モデル名は取得できる。
フィールド(DMDLでのプロパティー)はリフレクションで取得できる。
演算子やジョブフローは、Scalaで記述した処理を実行して構造を取得する。
バージョン0.001での実行は、入力となるデータ一覧(リスト)を渡して実際に処理を行ったが、
データの代わりに構造を探索するオブジェクトを渡す。
どちらも表面上はSourceクラスだが、中身を変化させて対応する。(かなり癖のあるオブジェクトの使い方^^;)
afwhs_a0.002.zip (24.9kB) [/2011-08-22]
※ソース生成は出来たけど、実行は出来ない(途中で例外が発生する)
バージョン0.001ではrunメソッドで実際の処理を行った。
同じrunメソッドで、ソース生成用のオブジェクトを渡すと構造を探索する。
(runメソッドの呼び出し方以外は全く何も変えていないのがポイント)
def main(args: Array[String]) { val gen = new Generator( "C:/workspace", //生成先のワークスペース "afws-wordcount", //生成先のプロジェクト名 "afws.wordcount", //生成先のパッケージ名 "WordCount" //生成先の主たるクラス名 ) val r = run(gen.newSource()) gen.out(r) println(gen) gen.generateDmdl("afwhs_wordcount.dmdl") gen.generateOperator() gen.generateImporter() gen.generateExporter() gen.generateFlow() }
先に空のAsakusaFWプロジェクトを作っておく必要がある。
gen.newSource()で探索用のSourceを作っている。
runから戻ってきたSourceをout()に渡すことにしている。
これでフロー全体の入出力データが分かる。
DMDLは単純なので、割愛(笑)
(コメントが無いことを除けば、手で作ったものと区別つかない)
package afws.wordcount.operator;
import java.util.List;
import com.asakusafw.runtime.core.Result;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.*;
import jp.hishidama.afwhs.java.CoGroupScalaFunction;
import afws.wordcount.modelgen.dmdl.model.*;
public abstract class WordCountOperator {
@CoGroup public void split( @Key(group={}) List<LineModel> s0, Result<WordCountModel> out) { split$function.apply(s0, out); } private CoGroupScalaFunction split$function = new CoGroupScalaFunction("afwhs.sample.Sample1WordCount$$anonfun$1", 1, 1);
@CoGroup public void sum( @Key(group={"word"}) List<WordCountModel> s0, Result<WordCountModel> out) { sum$function.apply(s0, out); } private CoGroupScalaFunction sum$function = new CoGroupScalaFunction("afwhs.sample.Sample1WordCount$$anonfun$2", 1, 1); }
インポート文は「*」を使って手抜きw
split$functionとかCoGroupScalaFunctionとか、謎の変数・クラスが出てきているが。
これは、Scalaの関数を呼び出す為に作ったもの。
AfwHS(Scala)のcogroupは以下のような形になっている。
val splitResult = in.cogroup("split", "", Result[WordCountModel]("out")) { (s1, r1) => s1.foreach { _.text.split("[ \t]+").foreach{ w => r1.add(WordCountModel(w, 1)) } } }
この太字の部分は、Scalaの関数。
s1がSource、すなわちAsakusaFWの第1引数のListを受け取る部分で、
r1がResult、すなわちAsakusaFWの第2引数のResultを受け取る。
cogroupメソッドは、この関数を受け取って、内部で呼び出して処理を実行している。
Scalaの関数は実体としてはJavaのクラス(インスタンス)になるので、リフレクションを使って(けっこう無理矢理な気もするけど)呼び出せる。
つまり、処理本体はソース生成せず、Scalaで書かれてコンパイルされたものをそのまま呼び出す。
@Override public void describe() { WordCountOperatorFactory.Split split = operator.split(this.in); WordCountOperatorFactory.Sum sum = operator.sum(split.out); this.out.add(sum.out); }
コンストラクター等は特に代わり映えしないので割愛。
フロー本体は機械的にFactoryクラス内の内部クラスの変数を常に定義し、後続でそれを使うようにしている。
なお、接続されていないデータソースをstop演算子に渡すような高度なことはやっていない。
というかそもそもCoGroup(引数2つのみ)とConvertとSummarizeしか対応してないし…。
生成されたソースをコンパイルする為に、AsakusaFWのプロジェクトのビルドパスにいくつか追加する必要がある。
runtime用jarファイル | アーカイブのlibに入っている | |
関数 | アーカイブのソースをコンパイルしてjarファイル化したもの もしくはアーカイブを格納(コンパイル)したEclipseプロジェクトそのもの |
|
Scalaのライブラリー | Scalaライブラリー(EclipseのScalaプラグイン) (C:\eclipse\configuration\org.eclipse.osgi\bundles\327\1\.cp\lib\scala-library.jar) |
Scala2.9.0.1 |
生成されたソースをコンパイルした後、途中までは(ジョブフローのテストのJUnitで)実行することが出来る。
なお、実行するためには以下のjarファイルをHadoopのクラスパスに加える必要がある。
(HADOOP_HOME/libに入れるのが一番楽)
runtime用jarファイル | アーカイブのlibに入っている | |
関数のjarファイル | アーカイブのソースをコンパイルしてjarファイル化したもの | |
Scalaのライブラリー | C:\eclipse\configuration\org.eclipse.osgi\bundles\327\1\.cp\lib\scala-library.jar | Scala2.9.0.1 |
で、実行すると途中で例外が発生するのだが。
〜 Caused by: java.lang.ClassCastException: afws.wordcount.modelgen.dmdl.model.LineModel cannot be cast to afwhs.sample.LineModel at afwhs.sample.Sample1WordCount$$anonfun$1$$anonfun$apply$1.apply(Sample1WordCount.scala:10) 〜
モデルクラスが、片方はScalaで作ったケースクラス、もう片方はAsakusaFWでDMDLから生成したデータモデル。
ここをどうやって整合性を取ればいいか、悩み中…。
AsakusaFWで生成したモデルクラスをScalaからそのまま使うという手も考えられる。
ただ、そうするとSummarizeの定義をフロー上に書けなくなるし、ということは実行も出来なくなる(どの項目がどういう集計をするかの情報が無いから)。