Asakusa Framework0.2.6(Direct I/O)でバイナリーファイルを入出力する方法のメモ。
|
|
Asakusa Framework(というかHadoop)は、基本的にテキストファイルを対象としている。
(バイナリーデータを扱う場合は、SequenceFileを使うのが一番楽かな?)
しかしAsakusaFW 0.2.6にはBinaryStreamFormatというクラスがあるので、これを継承した独自クラスを作ればバイナリーファイルを扱うことが出来る。
実装手順は以下の通り。
アーキタイプはWindGateかDirect I/Oとする。
(Direct I/Oはいわば中核機能なので、WindGateはDirect
I/Oも含んでいる。したがって、WindGateアーキタイプを選んでおけば、Direct I/Oの機能も使うことが出来る)
データモデルは普通に作成する。
アノテーションは特に付けない。(アノテーションを付けると、それに応じたフォーマットクラス類を自動生成してくれる。今回はフォーマットクラスを自分で作るので、アノテーションは必要ない。逆にアノテーションを付けていても、使わないフォーマットクラス類が生成されるだけで、特に害は無い)
※DMDLでは正式には「アノテーション」という名前ではないらしいが、「@」付きのあれ(属性)のこと!
sample_info = {
item_code : TEXT;
registered_date : DATETIME;
unit_selling_price : INT;
};
今回のサンプルでは、固定長バイナリーデータとする。
item_codeが4バイト(半角4文字)、DATETIMEは実体がlongなので8バイト、INTは4バイト。合計して、1レコード16バイト。
フォーマットクラスとして、BinaryStreamFormatを継承したクラスを用意する。
package com.example.jobflow.desc; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import com.asakusafw.runtime.directio.BinaryStreamFormat; import com.asakusafw.runtime.io.ModelInput; import com.asakusafw.runtime.io.ModelOutput; import com.example.modelgen.dmdl.model.SampleInfo;
public class SampleInfoBinaryFormat extends BinaryStreamFormat<SampleInfo> {
public static final int RECORD_LENGTH = 4 + 8 + 4;
@Override
public Class<SampleInfo> getSupportedType() {
return SampleInfo.class;
}
@Override
public long getPreferredFragmentSize() throws IOException, InterruptedException {
return 64 * 1024 * 1024 / RECORD_LENGTH * RECORD_LENGTH;
}
@Override
public long getMinimumFragmentSize() throws IOException, InterruptedException {
return RECORD_LENGTH;
}
@Override
public ModelInput<SampleInfo> createInput(Class<? extends SampleInfo> dataType, String path, InputStream stream, long offset, long fragmentSize)
throws IOException, InterruptedException {
long skip = offset % RECORD_LENGTH;
if (skip > 0) {
skip = RECORD_LENGTH - skip;
stream.skip(skip);
}
long pos = offset + skip;
long end = (fragmentSize < 0) ? -1 : offset + fragmentSize;
return new SampleInfoModelInput(stream, pos, end);
}
@Override
public ModelOutput<SampleInfo> createOutput(Class<? extends SampleInfo> dataType, String path, OutputStream stream) throws IOException, InterruptedException {
return new SampleInfoModelOutput(stream);
}
}
getPreferredFragmentSize()は、フラグメント(ブロック/スプリット)の推奨サイズを返す。
getMinimumFragmentSize()は、たぶんフラグメントの最小サイズを返す。
(createInput()には、概ねgetPreferredFragmentSize()よりちょっと大きいサイズで入ってくる)
createInput()・createOutput()で、ModelInputおよびModelOutputを返す。
引数としてInputStreamやOutputStreamが渡されるので、これを使って入出力する。
createInput()の引数にはoffsetとfragmentSizeがあるので、これに応じた範囲を読み込む。
各タスクでファイル読込処理を行う訳だが、当然ながら、offset+fragmentSizeの範囲がかぶる事は無い。
streamはオフセット分すでにスキップされた状態になっている。[2012-07-10]
が、offsetはレコードサイズの倍数になっているとは限らない。(ファイルのローカリティーが考慮される(ファイルの実体があるノードで処理される)ため)
なので、ずれている分だけスキップしてレコードの頭から読み出せるようにする必要がある。
streamはfragmentSizeを超えて読み込むことが出来るので、フラグメントがレコード境界でぴったり終わらない場合は、超えたレコードだけは読み込むようにする。(これはSampleInfoModelInputで処理する)
(fragmentSizeは-1が来る可能性があるので、注意。-1が来たらファイル末尾まで処理する必要がある。(例えばファイルが分割されない場合に-1になるような想定らしい))
ファイルの1レコード分のデータをデータモデルに移送する為にModelInputを用意する。
package com.example.jobflow.desc; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; import com.asakusafw.runtime.io.ModelInput; import com.asakusafw.runtime.value.DateTimeOption; import com.asakusafw.runtime.value.IntOption; import com.asakusafw.runtime.value.StringOption; import com.example.modelgen.dmdl.model.SampleInfo;
public class SampleInfoModelInput implements ModelInput<SampleInfo> {
private InputStream stream;
private long pos;
private long end;
private byte[] buf = new byte[SampleInfoBinaryFormat.RECORD_LENGTH];
public SampleInfoModelInput(InputStream stream, long pos, long end) {
this.stream = stream;
this.pos = pos;
this.end = end;
}
@Override
public boolean readTo(SampleInfo model) throws IOException {
if (end >= 0 && pos >= end) {
return false;
}
int len = stream.read(buf);
if (len <= 0) {
return false;
}
if (len != SampleInfoBinaryFormat.RECORD_LENGTH) {
throw new IllegalStateException("len=" + len);
}
fill(model.getItemCodeOption(), buf, 0, 4);
fill(model.getRegisteredDateOption(), buf, 4, 8);
fill(model.getUnitSellingPriceOption(), buf, 12, 4);
pos += len;
return true;
}
@SuppressWarnings("deprecation")
void fill(StringOption opt, byte[] buf, int pos, int len) throws UnsupportedEncodingException {
opt.modify(new String(buf, pos, len, "UTF-8"));
}
@SuppressWarnings("deprecation")
void fill(IntOption opt, byte[] buf, int pos, int len) {
opt.modify((int) readLong(buf, pos, len));
}
@SuppressWarnings("deprecation")
void fill(DateTimeOption opt, byte[] buf, int pos, int len) {
opt.modify(readLong(buf, pos, len));
}
long readLong(byte[] buf, int pos, int len) {
long n = 0;
for (int i = 0; i < len; i++) {
n <<= 8;
n |= buf[pos + i] & 0xff;
}
return n;
}
@Override
public void close() throws IOException {
stream.close();
}
}
readTo()メソッドを実装し、バイト列をデータモデルに移送する。
フラグメントサイズを超えたら(それ以上は別タスクが処理するはずなので)処理をやめるようにする。[2012-07-10]
フラグメントサイズぴったりに収まらない場合、最後に超過するレコードだけは処理する。(次のオフセットを処理するタスクでは、半端な先頭レコードはスキップする)
なお、コンストラクターで受け取るようにしているendは負の数(ファイル末尾まで処理)になる可能性があるので注意。
バイナリーデータなので、intに当たるものでもビッグエンディアン・リトルエンディアンとかバイト数とか色々考えられる。
変換メソッドは自分の仕様に合わせて用意する。
データモデル1個分のデータをファイルに出力する為にModelOutputを用意する。
package com.example.jobflow.desc; import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import com.asakusafw.runtime.io.ModelOutput; import com.asakusafw.runtime.value.DateTimeOption; import com.asakusafw.runtime.value.IntOption; import com.asakusafw.runtime.value.StringOption; import com.example.modelgen.dmdl.model.SampleInfo;
public class SampleInfoModelOutput implements ModelOutput<SampleInfo> {
private OutputStream stream;
private byte[] buf = new byte[SampleInfoBinaryFormat.RECORD_LENGTH];
public SampleInfoModelOutput(OutputStream stream) {
this.stream = stream;
}
@Override
public void write(SampleInfo model) throws IOException {
emit(model.getItemCodeOption(), buf, 0, 4);
emit(model.getRegisteredDateOption(), buf, 4, 8);
emit(model.getUnitSellingPriceOption(), buf, 12, 4);
stream.write(buf);
}
void emit(StringOption opt, byte[] buf, int pos, int len) throws UnsupportedEncodingException {
byte[] s = opt.getAsString().getBytes("UTF-8");
if (s.length != len) {
throw new IllegalArgumentException("s=" + opt.getAsString());
}
System.arraycopy(s, 0, buf, pos, len);
}
void emit(IntOption opt, byte[] buf, int pos, int len) {
writeLong(opt.get(), buf, pos, len);
}
void emit(DateTimeOption opt, byte[] buf, int pos, int len) {
writeLong(opt.get().getElapsedSeconds(), buf, pos, len);
}
void writeLong(long n, byte[] buf, int pos, int len) {
for (int i = len - 1; i >= 0; i--) {
buf[pos + i] = (byte) n;
n >>>= 8;
}
}
@Override
public void close() throws IOException {
stream.close();
}
}
write()メソッドを実装し、データモデルをバイト列に変換してファイルに出力する。
データモデルの値がnullの場合はget()を使うとNullPointerExceptionが起きるので、それをどうするかは各自の仕様として決めなければならない。
(当サンプルではnullがありえない前提として、特に処理していない)
ファイル名(パス)とフォーマットを結びつけるInputDescriptionやOutputDescriptionを用意する。
これらはDirect I/Oでバイナリーファイルを扱う、すなわちHDFS上のバイナリーファイルを読み書きする為のクラスとなる。
// 入力用
package com.example.jobflow.desc;
import com.asakusafw.runtime.directio.DataFormat;
import com.asakusafw.vocabulary.directio.DirectFileInputDescription;
import com.example.modelgen.dmdl.model.SampleInfo;
public class SampleInfoFromFile extends DirectFileInputDescription {
@Override
public Class<?> getModelType() {
return SampleInfo.class;
}
@Override
public Class<? extends DataFormat<?>> getFormat() {
return SampleInfoBinaryFormat.class;
}
@Override
public String getBasePath() {
return "master";
}
@Override
public String getResourcePattern() {
return "sample.dat";
}
}
// 出力用
package com.example.jobflow.desc;
import java.util.Arrays;
import java.util.List;
import com.asakusafw.runtime.directio.DataFormat;
import com.asakusafw.vocabulary.directio.DirectFileOutputDescription;
import com.example.modelgen.dmdl.model.SampleInfo;
public class SampleInfoToFile extends DirectFileOutputDescription {
@Override
public Class<?> getModelType() {
return SampleInfo.class;
}
@Override
public Class<? extends DataFormat<?>> getFormat() {
return SampleInfoBinaryFormat.class;
}
@Override
public String getBasePath() {
return "result/sample";
}
@Override
public String getResourcePattern() {
return "output.dat";
}
@Override
public List<String> getOrder() {
return Arrays.asList("item_code");
}
}
getOrder()でソート項目を返すようにすれば、ちゃんとそれに応じてソートされるようだ。
ジョブフローやテストドライバーやテストデータ(xlsファイル)は通常(CSVファイル)と同じ。
テストデータ用xlsファイルに書いた入力データは、データモデルに変換されてModelOutput#write()を呼び出し、ファイルとして出力される。