Asakusa Framework0.4(Direct I/O)でSequenceFileを入出力する方法のメモ。
|
|
Asakusa Framework0.2.6でSequenceFileFormatというクラスが用意され、HadoopのSequenceFileを入出力に使えるようになった。
(これにより、例えばSqoopのSequenceFileを使った連携が出来る)
また、0.4で、AsakusaFWが使用しているシーケンスファイルを入出力できるようなInputDescription・OutputDescriptionが自動生成できるようになった。
アーキタイプはWindGateかDirect I/O、バージョンは0.2.6以降とする。
(Direct I/Oはいわば中核機能なので、WindGateはDirect
I/Oも含んでいる。したがって、WindGateアーキタイプを選んでおけば、Direct I/Oの機能も使うことが出来る)
サンプルとして、「キーがText・値もText」というシーケンスファイルをstore_info(AsakusaFWのアーキタイプからプロジェクトを作成したときのサンプルで使われているデータモデル)にセットする例を作ってみる。
フォーマットクラスとして、SequenceFileFormatを継承したクラスを用意する。
package com.example.jobflow.desc; import java.io.IOException; import org.apache.hadoop.io.Text; import com.asakusafw.runtime.directio.hadoop.SequenceFileFormat; import com.example.modelgen.dmdl.model.StoreInfo;
public class StoreInfoSeqFileFormat extends SequenceFileFormat<Text, Text, StoreInfo>{ @Override public Class<StoreInfo> getSupportedType() { return StoreInfo.class; } @Override protected Text createKeyObject() { return new Text(); } @Override protected Text createValueObject() { return new Text(); }
@Override protected void copyToModel(Text key, Text value, StoreInfo model) throws IOException { model.setStoreCode(key); model.setStoreName(value); } @Override protected void copyFromModel(StoreInfo model, Text key, Text value) throws IOException { key.set(model.getStoreCode()); value.set(model.getStoreName()); } }
SequenceFileFormatは、ジェネリクスで「シーケンスファイルのKey」「シーケンスファイルのValue」「データモデル」のクラスを指定する。
getSupportedType()でデータモデルのクラス、createKeyObject()・createValueObject()でシーケンスファイルのキーと値を代入する為のインスタンスを返すようにする。
copyToModel()でシーケンスファイルからデータモデルへ、
copyFromModel()でデータモデルからシーケンスファイルへの
データ移送をコーディングする。
特にデータモデルからシーケンスファイルへ移送する方は、データモデルの値がnullかどうかを考慮する必要がある。
(データモデルクラスは、値がnullだったら、単純にget()するとNullPointerExceptionが発生するようになっている為)
AsakusaFW 0.4から、SequenceFileFormatでコーデック(圧縮クラス)を指定できるようになった。
「Hadoopクラスター全体として圧縮が指定されているが、あるシーケンスファイルだけは無圧縮で作りたい」というような場合に便利。
import org.apache.hadoop.io.compress.SnappyCodec;
@Override public CompressionCodec getCompressionCodec(Path path) throws IOException, InterruptedException { return new SnappyCodec(); }
getCompressionCodec()をオーバーライドしなかった場合は、デフォルトのコーデックが使われる。
nullを返すと無圧縮になる。
コーデックを指定した場合は、当然ながら、稼動するHadoopクラスターでその圧縮形式に対応している必要がある。
ファイル名(パス)とフォーマットを結びつけるInputDescriptionやOutputDescriptionを用意する。
これにより、HDFS上のシーケンスファイルを読み書きできる。
// 入力用 package com.example.jobflow.desc; import com.asakusafw.runtime.directio.DataFormat; import com.asakusafw.vocabulary.directio.DirectFileInputDescription; import com.example.modelgen.dmdl.model.StoreInfo; public class StoreInfoFromSeq extends DirectFileInputDescription { @Override public Class<?> getModelType() { return StoreInfo.class; } @Override public Class<? extends DataFormat<?>> getFormat() { return StoreInfoSeqFileFormat.class; } @Override public String getBasePath() { return "master"; } @Override public String getResourcePattern() { return "store_info.seq"; } @Override public DataSize getDataSize() { // 店舗マスタは小さい前提 return DataSize.TINY; } }
// 出力用 package com.example.jobflow.desc; import com.asakusafw.runtime.directio.DataFormat; import com.asakusafw.vocabulary.directio.DirectFileOutputDescription; import com.example.modelgen.dmdl.model.StoreInfo; public class StoreInfoToSeq extends DirectFileOutputDescription { @Override public Class<?> getModelType() { return StoreInfo.class; } @Override public Class<? extends DataFormat<?>> getFormat() { return StoreInfoSeqFileFormat.class; } @Override public String getBasePath() { return "result/master"; } @Override public String getResourcePattern() { return "store_info.seq"; } }
ジョブフローやテストドライバーやテストデータ(xlsファイル)は通常(CSVファイル)と同じ。
(xlsファイルのテストデータはデータモデルのレイアウトで記述する。
テストドライバーは、xlsファイルから読み込んで生成したデータモデルをcopyFromModel()でシーケンスファイルのキー・値に変換して入力用ファイルを作成する)
Sqoopは、Hadoop(HDFS)とRDBMS(RDBのテーブル)とのデータ移送を行うツール。
テーブルから読み込んだデータをHDFSに置いたり、HDFSのファイルをテーブルにINSERTしたりする。
HDFS上のファイルとしては、CSVファイルやSequenceFileを扱うことが出来る。
そこで、SqoopでRDBからデータを読み込んでHDFSへファイルを格納し、Asakusaアプリでそのファイルを読み込んで処理する(あるいはその逆)という連携が出来る。
CSVファイルで連携するなら特別な事は無いが、ここではシーケンスファイルで連携する方法を考えてみる。
Sqoopはcodegenでシーケンスファイルに使用する値クラス(Writable)を生成することが出来る。
したがって、AsakusaFWとの連携としては、Sqoopのシーケンスファイルとデータモデルとの移送をコーディングすることになる。
連携に使用するデータモデルには、Sqoop用の記述は特に無い。
全然関係ないアノテーションを付けていても問題ない。アノテーションが付いていると、それに応じたフォーマットクラス等を自動生成してくれるだけだから(自動生成されたクラスが無駄になるだけ^^;)
※DMDLでは正式には「アノテーション」という名前ではないらしいが、「@」付きのあれのこと!
Sqoopのcodegen(あるいはexportやimport)を実行して、値クラス(Writable)のjavaソースファイルを生成する。
sqoop codegen \ --connect jdbc:mysql://localhost/test1 --username hishidama --password hishidamac \ --table store_info \ --class-name com.example.sqoop.store_info \ --outdir src
デフォルトでは、クラス名は(通常のJavaとは違って)スネークケースになる。
(RDBMSのテーブル名の命名規則がスネークケースだから)
クラス名を指定することは出来るので通常のJavaの様にキャメルケースにすることも出来るが、
中のフィールド名やセッター・ゲッターメソッドはスネークケースになるので、クラス名も素直にスネークケースにしておくと、Sqoopだという事が分かり易くなる。
生成したクラスは、別途コンパイルしてjarファイル化してAsakusaアプリのクラスパスに含めてもよいが、
どうせAsakusaアプリもコンパイルするので、ソースごとAsakusaアプリに含めてしまうのが楽。
ただし、SqoopのクラスはSqoopRecordというSqoopのクラスを継承している。
public class store_info extends SqoopRecord implements DBWritable, Writable { 〜 }
したがって、Sqoopのライブラリーが必要となる。
Eclipse上はビルドパスに$SQOOP_HOME/sqoop-1.3.0-cdh3u4.jarを含めればよいが、Asakusaアプリのアーカイブを作る(mvnでビルドする)際にはEclipseのビルドパスは使われない。
したがって、pom.xmlにSqoopを含めるのが吉。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.cloudera.sqoop</groupId>
<artifactId>sqoop</artifactId>
<version>1.3.0-cdh3u4</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.asakusafw</groupId>
<artifactId>asakusa-dsl-vocabulary</artifactId>
<version>${asakusafw.version}</version>
</dependency>
exclusionsが無いと、HBase等の依存ライブラリーが色々とダウンロードされ、しかもslf4jが既存のものとバージョン違いでエラーになってしまう(苦笑)
(SqoopはHBaseにも対応しているからHBaseのライブラリーも依存関係に入っているのだろうが、今回は使わないので除外する)
Eclipseを使っている場合は、pom.xmlを修正した後でEclipseの設定を作り直す。
$ mvn eclipse:eclipse
SqoopのWritableとデータモデルの変換を行う処理をコーディングする。
package com.example.jobflow.desc; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import com.asakusafw.runtime.directio.hadoop.SequenceFileFormat; import com.example.modelgen.dmdl.model.StoreInfo; import com.example.sqoop.store_info;
public class StoreInfoSeqFileFormat extends SequenceFileFormat<LongWritable, store_info, StoreInfo>{ @Override public Class<StoreInfo> getSupportedType() { return StoreInfo.class; } @Override protected LongWritable createKeyObject() { return new LongWritable(); } @Override protected store_info createValueObject() { return new store_info(); }
@Override protected void copyToModel(LongWritable key, store_info value, StoreInfo model) throws IOException { model.setStoreCodeAsString(value.get_store_code()); model.setStoreNameAsString(value.get_store_name()); } @Override protected void copyFromModel(StoreInfo model, LongWritable key, store_info value) throws IOException { value.set_store_code(model.getStoreCodeAsString()); value.set_store_name(model.getStoreNameAsString()); } }
SqoopのシーケンスファイルはキーがLongWritable、値がcodegenで生成されたクラスになる。
(キーは特に使わないので、無視してよい)
今回の例では出てきていないが、java.sql.DateやTimestampとAsakusaFWのDateやDateTimeとの変換はちょっと面倒。
今のところは、DateUtil等を参考にしながらプログラミングするしかない。
InputDescription・OutputDescriptionの作り方は前述のサンプルと同様。
ただ、ファイル出力に関しては少し考慮すべき点がある。
以下のようにコーディングすると、出力するファイルは1つにまとめられる。
@Override public String getResourcePattern() { return "store_info.seq"; }
Sqoopの場合、Sqoopへの入力ファイルが分割されていても問題ないので、Direct I/Oで出力したファイルをSqoopで読み込む場合は、Direct I/Oで1ファイルにまとめる処理は冗長となる。
ファイル名のパターンに「ランダムな値」を指定したりデータ値で分割したりすれば複数ファイルになるので、1ファイルにまとめるよりは速いかもしれない。
@Override public String getResourcePattern() { // return "store_info{store_code}.seq"; //store_codeでファイル分割 return "store_info[0..19].seq"; //ランダムな値でファイル分割 }
AsakusaFW 0.4では、HadoopのMapReduceによって分割されたファイルをそのまま出力するパターンが追加された。
@Override public String getResourcePattern() { return "store_info_*.seq"; }
ジョブフローやテストクラス・テストデータの作り方は前述のサンプルと同じ。
Asakusaアプリのプログラミングが終わったら、パッケージング(jarファイルの生成)を行う。
cd $ASAKUSA_DEVELOP_HOME/workspace/プロジェクト mvn package
cd $ASAKUSA_HOME/batchapps jar xf $ASAKUSA_DEVELOP_HOME/workspace/プロジェクト/tareget/*batchapps*.jar rm -rf META-INF find . -name "*.sh" | xargs chmod +x
これにより、「$ASAKUSA_HOME/batchapps/バッチID/lib」の下に実行用のjarファイルが配置される。
Sqoopでimportやexportを実行する際にSqoop用Writableクラスの入ったjarファイルを指定する必要があるが、Asakusaアプリの実行用jarファイルが使える。
sqoop import \ --connect jdbc:mysql://localhost/test1 --username hishidama --password hishidamac \ --table store_info \ --jar-file $ASAKUSA_HOME/batchapps/バッチID/lib/ジョブフロー.jar \ --class-name com.example.sqoop.store_info \ --as-sequencefile \ --target-dir master/store_info
このAsakusaアプリを実行するに当たってSqoop用Writableクラスを使うので、Sqoopのライブラリーが必要になる。
cp -p $SQOOP_HOME/sqoop-1.3.0-cdh3u4.jar $ASAKUSA_HOME/ext/lib/
AsakusaFW 0.4では、AsakusaFW内部で使われているデータ形式のシーケンスファイルを入出力に使えるようになった。
この形式でファイルを作っておけば、Asakusaアプリで読み込む際にパース(データの構文解析)を行う手間が省ける。
DMDLファイルでデータモデル名に「@directio.sequence_file」を付ける。
@directio.sequence_file store_info = { "店舗コード" store_code : TEXT; "店舗名称" store_name : TEXT; };
cd $ASAKUSA_DEVELOP_HOME/workspace/プロジェクト mvn generate-resources
これにより、SequenceFileFormat・DirectFileInputDescription・DirectFileOutputDescriptionを継承したクラス、すなわち今回の例だと
StoreInfoSequenceFileFormat・AbstractStoreInfoSequenceFileInputDescription・AbstractStoreInfoSequenceFileOutputDescription
が生成される。
ファイルのパス・ファイル名を示すクラスだけは自作する必要がある。(CSVファイルの時と同様)
package com.example.jobflow.desc; import com.example.modelgen.dmdl.sequencefile.AbstractStoreInfoSequenceFileInputDescription; public class StoreInfoFromSeq extends AbstractStoreInfoSequenceFileInputDescription { @Override public String getBasePath() { return "master"; } @Override public String getResourcePattern() { return "store_info.seq"; } }
package com.example.jobflow.desc; import com.example.modelgen.dmdl.sequencefile.AbstractStoreInfoSequenceFileOutputDescription; public class StoreInfoToSeq extends AbstractStoreInfoSequenceFileOutputDescription { @Override public String getBasePath() { return "result/master"; } @Override public String getResourcePattern() { return "store_info.seq"; } }