Asakusa Framework0.2.6(Direct I/O)でバイナリーファイルを入出力する方法のメモ。
|
|
Asakusa Framework(というかHadoop)は、基本的にテキストファイルを対象としている。
(バイナリーデータを扱う場合は、SequenceFileを使うのが一番楽かな?)
しかしAsakusaFW 0.2.6にはBinaryStreamFormatというクラスがあるので、これを継承した独自クラスを作ればバイナリーファイルを扱うことが出来る。
実装手順は以下の通り。
アーキタイプはWindGateかDirect I/Oとする。
(Direct I/Oはいわば中核機能なので、WindGateはDirect
I/Oも含んでいる。したがって、WindGateアーキタイプを選んでおけば、Direct I/Oの機能も使うことが出来る)
データモデルは普通に作成する。
アノテーションは特に付けない。(アノテーションを付けると、それに応じたフォーマットクラス類を自動生成してくれる。今回はフォーマットクラスを自分で作るので、アノテーションは必要ない。逆にアノテーションを付けていても、使わないフォーマットクラス類が生成されるだけで、特に害は無い)
※DMDLでは正式には「アノテーション」という名前ではないらしいが、「@」付きのあれ(属性)のこと!
sample_info = { item_code : TEXT; registered_date : DATETIME; unit_selling_price : INT; };
今回のサンプルでは、固定長バイナリーデータとする。
item_codeが4バイト(半角4文字)、DATETIMEは実体がlongなので8バイト、INTは4バイト。合計して、1レコード16バイト。
フォーマットクラスとして、BinaryStreamFormatを継承したクラスを用意する。
package com.example.jobflow.desc; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import com.asakusafw.runtime.directio.BinaryStreamFormat; import com.asakusafw.runtime.io.ModelInput; import com.asakusafw.runtime.io.ModelOutput; import com.example.modelgen.dmdl.model.SampleInfo;
public class SampleInfoBinaryFormat extends BinaryStreamFormat<SampleInfo> { public static final int RECORD_LENGTH = 4 + 8 + 4;
@Override public Class<SampleInfo> getSupportedType() { return SampleInfo.class; } @Override public long getPreferredFragmentSize() throws IOException, InterruptedException { return 64 * 1024 * 1024 / RECORD_LENGTH * RECORD_LENGTH; } @Override public long getMinimumFragmentSize() throws IOException, InterruptedException { return RECORD_LENGTH; }
@Override public ModelInput<SampleInfo> createInput(Class<? extends SampleInfo> dataType, String path, InputStream stream, long offset, long fragmentSize) throws IOException, InterruptedException { long skip = offset % RECORD_LENGTH; if (skip > 0) { skip = RECORD_LENGTH - skip; stream.skip(skip); } long pos = offset + skip; long end = (fragmentSize < 0) ? -1 : offset + fragmentSize; return new SampleInfoModelInput(stream, pos, end); } @Override public ModelOutput<SampleInfo> createOutput(Class<? extends SampleInfo> dataType, String path, OutputStream stream) throws IOException, InterruptedException { return new SampleInfoModelOutput(stream); } }
getPreferredFragmentSize()は、フラグメント(ブロック/スプリット)の推奨サイズを返す。
getMinimumFragmentSize()は、たぶんフラグメントの最小サイズを返す。
(createInput()には、概ねgetPreferredFragmentSize()よりちょっと大きいサイズで入ってくる)
createInput()・createOutput()で、ModelInputおよびModelOutputを返す。
引数としてInputStreamやOutputStreamが渡されるので、これを使って入出力する。
createInput()の引数にはoffsetとfragmentSizeがあるので、これに応じた範囲を読み込む。
各タスクでファイル読込処理を行う訳だが、当然ながら、offset+fragmentSizeの範囲がかぶる事は無い。
streamはオフセット分すでにスキップされた状態になっている。[2012-07-10]
が、offsetはレコードサイズの倍数になっているとは限らない。(ファイルのローカリティーが考慮される(ファイルの実体があるノードで処理される)ため)
なので、ずれている分だけスキップしてレコードの頭から読み出せるようにする必要がある。
streamはfragmentSizeを超えて読み込むことが出来るので、フラグメントがレコード境界でぴったり終わらない場合は、超えたレコードだけは読み込むようにする。(これはSampleInfoModelInputで処理する)
(fragmentSizeは-1が来る可能性があるので、注意。-1が来たらファイル末尾まで処理する必要がある。(例えばファイルが分割されない場合に-1になるような想定らしい))
ファイルの1レコード分のデータをデータモデルに移送する為にModelInputを用意する。
package com.example.jobflow.desc; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; import com.asakusafw.runtime.io.ModelInput; import com.asakusafw.runtime.value.DateTimeOption; import com.asakusafw.runtime.value.IntOption; import com.asakusafw.runtime.value.StringOption; import com.example.modelgen.dmdl.model.SampleInfo;
public class SampleInfoModelInput implements ModelInput<SampleInfo> { private InputStream stream; private long pos; private long end; private byte[] buf = new byte[SampleInfoBinaryFormat.RECORD_LENGTH]; public SampleInfoModelInput(InputStream stream, long pos, long end) { this.stream = stream; this.pos = pos; this.end = end; }
@Override public boolean readTo(SampleInfo model) throws IOException { if (end >= 0 && pos >= end) { return false; } int len = stream.read(buf); if (len <= 0) { return false; } if (len != SampleInfoBinaryFormat.RECORD_LENGTH) { throw new IllegalStateException("len=" + len); } fill(model.getItemCodeOption(), buf, 0, 4); fill(model.getRegisteredDateOption(), buf, 4, 8); fill(model.getUnitSellingPriceOption(), buf, 12, 4); pos += len; return true; }
@SuppressWarnings("deprecation") void fill(StringOption opt, byte[] buf, int pos, int len) throws UnsupportedEncodingException { opt.modify(new String(buf, pos, len, "UTF-8")); } @SuppressWarnings("deprecation") void fill(IntOption opt, byte[] buf, int pos, int len) { opt.modify((int) readLong(buf, pos, len)); } @SuppressWarnings("deprecation") void fill(DateTimeOption opt, byte[] buf, int pos, int len) { opt.modify(readLong(buf, pos, len)); } long readLong(byte[] buf, int pos, int len) { long n = 0; for (int i = 0; i < len; i++) { n <<= 8; n |= buf[pos + i] & 0xff; } return n; }
@Override public void close() throws IOException { stream.close(); } }
readTo()メソッドを実装し、バイト列をデータモデルに移送する。
フラグメントサイズを超えたら(それ以上は別タスクが処理するはずなので)処理をやめるようにする。[2012-07-10]
フラグメントサイズぴったりに収まらない場合、最後に超過するレコードだけは処理する。(次のオフセットを処理するタスクでは、半端な先頭レコードはスキップする)
なお、コンストラクターで受け取るようにしているendは負の数(ファイル末尾まで処理)になる可能性があるので注意。
バイナリーデータなので、intに当たるものでもビッグエンディアン・リトルエンディアンとかバイト数とか色々考えられる。
変換メソッドは自分の仕様に合わせて用意する。
データモデル1個分のデータをファイルに出力する為にModelOutputを用意する。
package com.example.jobflow.desc; import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import com.asakusafw.runtime.io.ModelOutput; import com.asakusafw.runtime.value.DateTimeOption; import com.asakusafw.runtime.value.IntOption; import com.asakusafw.runtime.value.StringOption; import com.example.modelgen.dmdl.model.SampleInfo;
public class SampleInfoModelOutput implements ModelOutput<SampleInfo> { private OutputStream stream; private byte[] buf = new byte[SampleInfoBinaryFormat.RECORD_LENGTH]; public SampleInfoModelOutput(OutputStream stream) { this.stream = stream; }
@Override public void write(SampleInfo model) throws IOException { emit(model.getItemCodeOption(), buf, 0, 4); emit(model.getRegisteredDateOption(), buf, 4, 8); emit(model.getUnitSellingPriceOption(), buf, 12, 4); stream.write(buf); }
void emit(StringOption opt, byte[] buf, int pos, int len) throws UnsupportedEncodingException { byte[] s = opt.getAsString().getBytes("UTF-8"); if (s.length != len) { throw new IllegalArgumentException("s=" + opt.getAsString()); } System.arraycopy(s, 0, buf, pos, len); } void emit(IntOption opt, byte[] buf, int pos, int len) { writeLong(opt.get(), buf, pos, len); } void emit(DateTimeOption opt, byte[] buf, int pos, int len) { writeLong(opt.get().getElapsedSeconds(), buf, pos, len); } void writeLong(long n, byte[] buf, int pos, int len) { for (int i = len - 1; i >= 0; i--) { buf[pos + i] = (byte) n; n >>>= 8; } }
@Override public void close() throws IOException { stream.close(); } }
write()メソッドを実装し、データモデルをバイト列に変換してファイルに出力する。
データモデルの値がnullの場合はget()を使うとNullPointerExceptionが起きるので、それをどうするかは各自の仕様として決めなければならない。
(当サンプルではnullがありえない前提として、特に処理していない)
ファイル名(パス)とフォーマットを結びつけるInputDescriptionやOutputDescriptionを用意する。
これらはDirect I/Oでバイナリーファイルを扱う、すなわちHDFS上のバイナリーファイルを読み書きする為のクラスとなる。
// 入力用 package com.example.jobflow.desc; import com.asakusafw.runtime.directio.DataFormat; import com.asakusafw.vocabulary.directio.DirectFileInputDescription; import com.example.modelgen.dmdl.model.SampleInfo; public class SampleInfoFromFile extends DirectFileInputDescription { @Override public Class<?> getModelType() { return SampleInfo.class; } @Override public Class<? extends DataFormat<?>> getFormat() { return SampleInfoBinaryFormat.class; } @Override public String getBasePath() { return "master"; } @Override public String getResourcePattern() { return "sample.dat"; } }
// 出力用 package com.example.jobflow.desc; import java.util.Arrays; import java.util.List; import com.asakusafw.runtime.directio.DataFormat; import com.asakusafw.vocabulary.directio.DirectFileOutputDescription; import com.example.modelgen.dmdl.model.SampleInfo; public class SampleInfoToFile extends DirectFileOutputDescription { @Override public Class<?> getModelType() { return SampleInfo.class; } @Override public Class<? extends DataFormat<?>> getFormat() { return SampleInfoBinaryFormat.class; } @Override public String getBasePath() { return "result/sample"; } @Override public String getResourcePattern() { return "output.dat"; } @Override public List<String> getOrder() { return Arrays.asList("item_code"); } }
getOrder()でソート項目を返すようにすれば、ちゃんとそれに応じてソートされるようだ。
ジョブフローやテストドライバーやテストデータ(xlsファイル)は通常(CSVファイル)と同じ。
テストデータ用xlsファイルに書いた入力データは、データモデルに変換されてModelOutput#write()を呼び出し、ファイルとして出力される。