S-JIS[2011-08-21/2011-08-22] 変更履歴

AfwHS α0.002

Asakusa FrameworkScalaで記述してみるAfwHS α版のバージョン0.002。


概要

次に、Scalaで記述したコードからAsakusaFWのソースを生成する。

データモデルのケースクラス DMDL(dmdlファイル)
Importer・Exporterのソース
処理本体 演算子(Operator)のソース
ジョブフロー(Flow)のソース

バージョン0.001でも、データモデルのケースクラスの親クラス(Model)にモデル名を渡しているから、モデル名は取得できる。
フィールド(DMDLでのプロパティー)はリフレクションで取得できる。

演算子やジョブフローは、Scalaで記述した処理を実行して構造を取得する。
バージョン0.001での実行は、入力となるデータ一覧(リスト)を渡して実際に処理を行ったが、
データの代わりに構造を探索するオブジェクトを渡す。
どちらも表面上はSourceクラスだが、中身を変化させて対応する。(かなり癖のあるオブジェクトの使い方^^;)

afwhs_a0.002.zip (24.9kB) [/2011-08-22]

※ソース生成は出来たけど、実行は出来ない(途中で例外が発生する)


ソース生成の例

バージョン0.001ではrunメソッドで実際の処理を行った。
同じrunメソッドで、ソース生成用のオブジェクトを渡すと構造を探索する。
(runメソッドの呼び出し方以外は全く何も変えていないのがポイント)

  def main(args: Array[String]) {
    val gen = new Generator(
      "C:/workspace",  	//生成先のワークスペース
      "afws-wordcount",	//生成先のプロジェクト名
      "afws.wordcount",	//生成先のパッケージ名
      "WordCount"      	//生成先の主たるクラス名
    )
    val r = run(gen.newSource())
    gen.out(r)
    println(gen)

    gen.generateDmdl("afwhs_wordcount.dmdl")
    gen.generateOperator()
    gen.generateImporter()
    gen.generateExporter()
    gen.generateFlow()
}

先に空のAsakusaFWプロジェクトを作っておく必要がある。

gen.newSource()で探索用のSourceを作っている。
runから戻ってきたSourceをout()に渡すことにしている。
これでフロー全体の入出力データが分かる。


生成されたDMDL

DMDLは単純なので、割愛(笑)
(コメントが無いことを除けば、手で作ったものと区別つかない)


生成されたOperator

package afws.wordcount.operator;
import java.util.List;
import com.asakusafw.runtime.core.Result;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.*;
import jp.hishidama.afwhs.java.CoGroupScalaFunction;
import afws.wordcount.modelgen.dmdl.model.*;
public abstract class WordCountOperator {
	@CoGroup
	public void split(
			@Key(group={}) List<LineModel> s0,
			Result<WordCountModel> out) {
		split$function.apply(s0, out);
	}

	private CoGroupScalaFunction split$function = new CoGroupScalaFunction("afwhs.sample.Sample1WordCount$$anonfun$1", 1, 1);
	@CoGroup
	public void sum(
			@Key(group={"word"}) List<WordCountModel> s0,
			Result<WordCountModel> out) {
		sum$function.apply(s0, out);
	}

	private CoGroupScalaFunction sum$function = new CoGroupScalaFunction("afwhs.sample.Sample1WordCount$$anonfun$2", 1, 1);
}

インポート文は「*」を使って手抜きw

split$functionとかCoGroupScalaFunctionとか、謎の変数・クラスが出てきているが。
これは、Scalaの関数を呼び出す為に作ったもの。

AfwHS(Scala)のcogroupは以下のような形になっている。

  val splitResult = in.cogroup("split", "", Result[WordCountModel]("out")) { (s1, r1) =>
    s1.foreach {
      _.text.split("[ \t]+").foreach{ w => r1.add(WordCountModel(w, 1)) }
    }
  }

この太字の部分は、Scalaの関数。
s1がSource、すなわちAsakusaFWの第1引数のListを受け取る部分で、
r1がResult、すなわちAsakusaFWの第2引数のResultを受け取る。
cogroupメソッドは、この関数を受け取って、内部で呼び出して処理を実行している。
Scalaの関数は実体としてはJavaのクラス(インスタンス)になるので、リフレクションを使って(けっこう無理矢理な気もするけど)呼び出せる。
つまり、処理本体はソース生成せず、Scalaで書かれてコンパイルされたものをそのまま呼び出す。


生成されたFlow

	@Override
	public void describe() {
		WordCountOperatorFactory.Split split = operator.split(this.in);
		WordCountOperatorFactory.Sum sum = operator.sum(split.out);
		this.out.add(sum.out);
	}

コンストラクター等は特に代わり映えしないので割愛。

フロー本体は機械的にFactoryクラス内の内部クラスの変数を常に定義し、後続でそれを使うようにしている。

なお、接続されていないデータソースをstop演算子に渡すような高度なことはやっていない。
というかそもそもCoGroup(引数2つのみ)とConvertとSummarizeしか対応してないし…。


コンパイルの為の環境

生成されたソースをコンパイルする為に、AsakusaFWのプロジェクトのビルドパスにいくつか追加する必要がある。

runtime用jarファイル アーカイブのlibに入っている  
関数 アーカイブのソースをコンパイルしてjarファイル化したもの
もしくはアーカイブを格納(コンパイル)したEclipseプロジェクトそのもの
 
Scalaのライブラリー Scalaライブラリー(EclipseのScalaプラグイン)
(C:\eclipse\configuration\org.eclipse.osgi\bundles\327\1\.cp\lib\scala-library.jar)
Scala2.9.0.1

実行

生成されたソースコンパイルした後、途中までは(ジョブフローのテストのJUnitで)実行することが出来る。

なお、実行するためには以下のjarファイルをHadoopのクラスパスに加える必要がある。
HADOOP_HOME/libに入れるのが一番楽)

runtime用jarファイル アーカイブのlibに入っている  
関数のjarファイル アーカイブのソースをコンパイルしてjarファイル化したもの  
Scalaのライブラリー C:\eclipse\configuration\org.eclipse.osgi\bundles\327\1\.cp\lib\scala-library.jar Scala2.9.0.1

で、実行すると途中で例外が発生するのだが。

〜
Caused by: java.lang.ClassCastException: afws.wordcount.modelgen.dmdl.model.LineModel cannot be cast to afwhs.sample.LineModel
	at afwhs.sample.Sample1WordCount$$anonfun$1$$anonfun$apply$1.apply(Sample1WordCount.scala:10)
〜

モデルクラスが、片方はScalaで作ったケースクラス、もう片方はAsakusaFWでDMDLから生成したデータモデル。
ここをどうやって整合性を取ればいいか、悩み中…。

AsakusaFWで生成したモデルクラスをScalaからそのまま使うという手も考えられる。
ただ、そうするとSummarizeの定義をフロー上に書けなくなるし、ということは実行も出来なくなる(どの項目がどういう集計をするかの情報が無いから)。


AfwHS目次へ戻る / AsakusaFWへ戻る / 技術メモへ戻る
メールの送信先:ひしだま