S-JIS[2012-07-09/2012-07-10] 変更履歴

Asakusa Framework バイナリーファイル入出力

Asakusa Framework0.2.6(Direct I/O)でバイナリーファイルを入出力する方法のメモ。


概要

Asakusa Framework(というかHadoop)は、基本的にテキストファイルを対象としている。
(バイナリーデータを扱う場合は、SequenceFileを使うのが一番楽かな?)

しかしAsakusaFW 0.2.6にはBinaryStreamFormatというクラスがあるので、これを継承した独自クラスを作ればバイナリーファイルを扱うことが出来る。


実装手順は以下の通り。


サンプル

アーキタイプはWindGateかDirect I/Oとする。
(Direct I/Oはいわば中核機能なので、WindGateはDirect I/Oも含んでいる。したがって、WindGateアーキタイプを選んでおけば、Direct I/Oの機能も使うことが出来る)

データモデル

データモデルは普通に作成する。
アノテーションは特に付けない。(アノテーションを付けると、それに応じたフォーマットクラス類を自動生成してくれる。今回はフォーマットクラスを自分で作るので、アノテーションは必要ない。逆にアノテーションを付けていても、使わないフォーマットクラス類が生成されるだけで、特に害は無い)

※DMDLでは正式には「アノテーション」という名前ではないらしいが、「@」付きのあれ(属性)のこと!

models.dmdl:

sample_info = {
    item_code          : TEXT;
    registered_date    : DATETIME;
    unit_selling_price : INT;
};

今回のサンプルでは、固定長バイナリーデータとする。
item_codeが4バイト(半角4文字)、DATETIMEは実体がlongなので8バイト、INTは4バイト。合計して、1レコード16バイト。


BinaryStreamFormat

フォーマットクラスとして、BinaryStreamFormatを継承したクラスを用意する。

package com.example.jobflow.desc;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import com.asakusafw.runtime.directio.BinaryStreamFormat;
import com.asakusafw.runtime.io.ModelInput;
import com.asakusafw.runtime.io.ModelOutput;
import com.example.modelgen.dmdl.model.SampleInfo;
public class SampleInfoBinaryFormat extends BinaryStreamFormat<SampleInfo> {

	public static final int RECORD_LENGTH = 4 + 8 + 4;
	@Override
	public Class<SampleInfo> getSupportedType() {
		return SampleInfo.class;
	}

	@Override
	public long getPreferredFragmentSize() throws IOException, InterruptedException {
		return 64 * 1024 * 1024 / RECORD_LENGTH * RECORD_LENGTH;
	}

	@Override
	public long getMinimumFragmentSize() throws IOException, InterruptedException {
		return RECORD_LENGTH;
	}
	@Override
	public ModelInput<SampleInfo> createInput(Class<? extends SampleInfo> dataType, String path, InputStream stream, long offset, long fragmentSize)
			throws IOException, InterruptedException {
		long skip = offset % RECORD_LENGTH;
		if (skip > 0) {
			skip = RECORD_LENGTH - skip;
			stream.skip(skip);
		}
		long pos = offset + skip;
		long end = (fragmentSize < 0) ? -1 : offset + fragmentSize;
		return new SampleInfoModelInput(stream, pos, end);
	}

	@Override
	public ModelOutput<SampleInfo> createOutput(Class<? extends SampleInfo> dataType, String path, OutputStream stream) throws IOException, InterruptedException {
		return new SampleInfoModelOutput(stream);
	}
}

getPreferredFragmentSize()は、フラグメント(ブロック/スプリット)の推奨サイズを返す。
getMinimumFragmentSize()は、たぶんフラグメントの最小サイズを返す。
(createInput()には、概ねgetPreferredFragmentSize()よりちょっと大きいサイズで入ってくる)

createInput()・createOutput()で、ModelInputおよびModelOutputを返す。
引数としてInputStreamやOutputStreamが渡されるので、これを使って入出力する。


createInput()の引数にはoffsetとfragmentSizeがあるので、これに応じた範囲を読み込む。
各タスクでファイル読込処理を行う訳だが、当然ながら、offset+fragmentSizeの範囲がかぶる事は無い。

streamはオフセット分すでにスキップされた状態になっている。[2012-07-10]
が、offsetはレコードサイズの倍数になっているとは限らない。(ファイルのローカリティーが考慮される(ファイルの実体があるノードで処理される)ため)
なので、ずれている分だけスキップしてレコードの頭から読み出せるようにする必要がある。

streamはfragmentSizeを超えて読み込むことが出来るので、フラグメントがレコード境界でぴったり終わらない場合は、超えたレコードだけは読み込むようにする。(これはSampleInfoModelInputで処理する)
(fragmentSizeは-1が来る可能性があるので、注意。-1が来たらファイル末尾まで処理する必要がある。(例えばファイルが分割されない場合に-1になるような想定らしい))


ModelInput

ファイルの1レコード分のデータをデータモデルに移送する為にModelInputを用意する。

package com.example.jobflow.desc;

import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;

import com.asakusafw.runtime.io.ModelInput;
import com.asakusafw.runtime.value.DateTimeOption;
import com.asakusafw.runtime.value.IntOption;
import com.asakusafw.runtime.value.StringOption;
import com.example.modelgen.dmdl.model.SampleInfo;
public class SampleInfoModelInput implements ModelInput<SampleInfo> {

	private InputStream stream;
	private long pos;
	private long end;
	private byte[] buf = new byte[SampleInfoBinaryFormat.RECORD_LENGTH];

	public SampleInfoModelInput(InputStream stream, long pos, long end) {
		this.stream = stream;
		this.pos = pos;
		this.end = end;
	}
	@Override
	public boolean readTo(SampleInfo model) throws IOException {
		if (end >= 0 && pos >= end) {
			return false;
		}
		int len = stream.read(buf);
		if (len <= 0) {
			return false;
		}
		if (len != SampleInfoBinaryFormat.RECORD_LENGTH) {
			throw new IllegalStateException("len=" + len);
		}

		fill(model.getItemCodeOption(), buf, 0, 4);
		fill(model.getRegisteredDateOption(), buf, 4, 8);
		fill(model.getUnitSellingPriceOption(), buf, 12, 4);

		pos += len;
		return true;
	}
	@SuppressWarnings("deprecation")
	void fill(StringOption opt, byte[] buf, int pos, int len) throws UnsupportedEncodingException {
		opt.modify(new String(buf, pos, len, "UTF-8"));
	}

	@SuppressWarnings("deprecation")
	void fill(IntOption opt, byte[] buf, int pos, int len) {
		opt.modify((int) readLong(buf, pos, len));
	}

	@SuppressWarnings("deprecation")
	void fill(DateTimeOption opt, byte[] buf, int pos, int len) {
		opt.modify(readLong(buf, pos, len));
	}

	long readLong(byte[] buf, int pos, int len) {
		long n = 0;
		for (int i = 0; i < len; i++) {
			n <<= 8;
			n |= buf[pos + i] & 0xff;
		}
		return n;
	}
	@Override
	public void close() throws IOException {
		stream.close();
	}
}

readTo()メソッドを実装し、バイト列をデータモデルに移送する。

フラグメントサイズを超えたら(それ以上は別タスクが処理するはずなので)処理をやめるようにする。[2012-07-10]
フラグメントサイズぴったりに収まらない場合、最後に超過するレコードだけは処理する。(次のオフセットを処理するタスクでは、半端な先頭レコードはスキップする)
なお、コンストラクターで受け取るようにしているendは負の数(ファイル末尾まで処理)になる可能性があるので注意。

バイナリーデータなので、intに当たるものでもビッグエンディアン・リトルエンディアンとかバイト数とか色々考えられる。
変換メソッドは自分の仕様に合わせて用意する。


ModelOutput

データモデル1個分のデータをファイルに出力する為にModelOutputを用意する。

package com.example.jobflow.desc;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;

import com.asakusafw.runtime.io.ModelOutput;
import com.asakusafw.runtime.value.DateTimeOption;
import com.asakusafw.runtime.value.IntOption;
import com.asakusafw.runtime.value.StringOption;
import com.example.modelgen.dmdl.model.SampleInfo;
public class SampleInfoModelOutput implements ModelOutput<SampleInfo> {

	private OutputStream stream;
	private byte[] buf = new byte[SampleInfoBinaryFormat.RECORD_LENGTH];

	public SampleInfoModelOutput(OutputStream stream) {
		this.stream = stream;
	}
	@Override
	public void write(SampleInfo model) throws IOException {
		emit(model.getItemCodeOption(), buf, 0, 4);
		emit(model.getRegisteredDateOption(), buf, 4, 8);
		emit(model.getUnitSellingPriceOption(), buf, 12, 4);

		stream.write(buf);
	}
	void emit(StringOption opt, byte[] buf, int pos, int len) throws UnsupportedEncodingException {
		byte[] s = opt.getAsString().getBytes("UTF-8");
		if (s.length != len) {
			throw new IllegalArgumentException("s=" + opt.getAsString());
		}
		System.arraycopy(s, 0, buf, pos, len);
	}

	void emit(IntOption opt, byte[] buf, int pos, int len) {
		writeLong(opt.get(), buf, pos, len);
	}

	void emit(DateTimeOption opt, byte[] buf, int pos, int len) {
		writeLong(opt.get().getElapsedSeconds(), buf, pos, len);
	}

	void writeLong(long n, byte[] buf, int pos, int len) {
		for (int i = len - 1; i >= 0; i--) {
			buf[pos + i] = (byte) n;
			n >>>= 8;
		}
	}
	@Override
	public void close() throws IOException {
		stream.close();
	}
}

write()メソッドを実装し、データモデルをバイト列に変換してファイルに出力する。

データモデルの値がnullの場合はget()を使うとNullPointerExceptionが起きるので、それをどうするかは各自の仕様として決めなければならない。
(当サンプルではnullがありえない前提として、特に処理していない)


Description

ファイル名(パス)とフォーマットを結びつけるInputDescriptionやOutputDescriptionを用意する。

これらはDirect I/Oでバイナリーファイルを扱う、すなわちHDFS上のバイナリーファイルを読み書きする為のクラスとなる。

// 入力用
package com.example.jobflow.desc;

import com.asakusafw.runtime.directio.DataFormat;
import com.asakusafw.vocabulary.directio.DirectFileInputDescription;
import com.example.modelgen.dmdl.model.SampleInfo;

public class SampleInfoFromFile extends DirectFileInputDescription {

	@Override
	public Class<?> getModelType() {
		return SampleInfo.class;
	}

	@Override
	public Class<? extends DataFormat<?>> getFormat() {
		return SampleInfoBinaryFormat.class;
	}

	@Override
	public String getBasePath() {
		return "master";
	}

	@Override
	public String getResourcePattern() {
		return "sample.dat";
	}
}
// 出力用
package com.example.jobflow.desc;

import java.util.Arrays;
import java.util.List;

import com.asakusafw.runtime.directio.DataFormat;
import com.asakusafw.vocabulary.directio.DirectFileOutputDescription;
import com.example.modelgen.dmdl.model.SampleInfo;

public class SampleInfoToFile extends DirectFileOutputDescription {

	@Override
	public Class<?> getModelType() {
		return SampleInfo.class;
	}

	@Override
	public Class<? extends DataFormat<?>> getFormat() {
		return SampleInfoBinaryFormat.class;
	}

	@Override
	public String getBasePath() {
		return "result/sample";
	}

	@Override
	public String getResourcePattern() {
		return "output.dat";
	}

	@Override
	public List<String> getOrder() {
		return Arrays.asList("item_code");
	}
}

getOrder()でソート項目を返すようにすれば、ちゃんとそれに応じてソートされるようだ。


ジョブフロー・テストドライバー

ジョブフローやテストドライバーやテストデータ(xlsファイル)は通常(CSVファイル)と同じ。

テストデータ用xlsファイルに書いた入力データは、データモデルに変換されてModelOutput#write()を呼び出し、ファイルとして出力される。


AsakusaFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま