Embulkパーサープラグイン作成メモ(Hishidama's Embulk java-parser Memo) S-JIS[2015-10-03] 変更履歴

Embulk Javaパーサープラグイン

Embulk(0.7.5)のパーサープラグインの作成方法のメモ。


概要

パーサープラグイン(java-parser)は、読み込まれたファイルをパースするプラグイン。
(例えばCSVファイルとしてデータを行・カラムごとに分解する)


プロジェクトの作成

Javaパーサープラグインは以下のコマンドでプロジェクトを作成する。

$ embulk new java-parser example-myformat

最後の引数の「example-myformat」は、プラグイン名を表す。
これで、「embulk-parser-example_myformat」というディレクトリーが作られ、その下にソース類一式が生成される。

プロジェクトを作成したら、まず以下のコマンドを実行する。

$ cd embulk-parser-example_myformat
$ ./gradlew package

生成されたプラグインの中核は、src/main/javaの下のParserPluginクラス。

package org.embulk.parser.example_myformat;
〜
public class ExampleMyformatParserPlugin implements ParserPlugin {

	public interface PluginTask extends Task {
〜
	}

	@Override
	public void transaction(ConfigSource config, ParserPlugin.Control control) {
		PluginTask task = config.loadConfig(PluginTask.class);

		Schema schema = task.getColumns().toSchema();

		control.run(task.dump(), schema);
	}

	@Override
	public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
		PluginTask task = taskSource.loadTask(PluginTask.class);

		// Write your code here :)
		throw new UnsupportedOperationException("ExampleMyformatParserPlugin.run method is not implemented yet");
	}
}

PluginTask

YAMLファイルに記述されたパラメーター(オプション)を取得するのに使うのが、PluginTaskインターフェース。

初期状態では、ParserPluginクラスの内部インターフェースとして、以下のように生成されている。

import com.google.common.base.Optional;

import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.Task;

import org.embulk.spi.SchemaConfig;
	public interface PluginTask extends Task {
		// configuration option 1 (required integer)
		@Config("option1")
		public int getOption1();

		// configuration option 2 (optional string, null is not allowed)
		@Config("option2")
		@ConfigDefault("\"myvalue\"")
		public String getOption2();

		// configuration option 3 (optional string, null is allowed)
		@Config("option3")
		@ConfigDefault("null")
		public Optional<String> getOption3();

		// if you get schema from config or data source
		@Config("columns")
		public SchemaConfig getColumns();
	}

このPluginTaskインターフェースでは、ゲッターメソッドだけを定義する。
具象クラスの作成やYAMLファイルから読み込んだ値の設定はEmbulkがやってくれる。

@ConfigアノテーションでYAMLファイル内のパラメーター名を指定する。
ゲッターメソッドの戻り値の型でデータの型を指定する。
省略可能なパラメーターの場合は@ConfigDefaultアノテーションを付けてデフォルト値を指定する。省略時にnullにしたい場合は戻り型をOptional(これはJava8のOptionalではなく、com.google.commonのもの)にするようだ。

パーサープラグインの場合、読み込んだデータがどういうカラム(項目・列)になるのかをYAMLファイルのcolumnsで指定することが多いだろう。
その場合はgetColumns()の定義はそのまま残しておき、他のパラメーターは自分独自のものに置き換えることになる。


transactionメソッド

ParserPluginのtransactionメソッドは特に修正する必要は無く、生成された状態のままで良い。
taskの中の値を変更したい場合に何か実装するようだ。

	@Override
	public void transaction(ConfigSource config, ParserPlugin.Control control) {
		PluginTask task = config.loadConfig(PluginTask.class);

		Schema schema = task.getColumns().toSchema();

		control.run(task.dump(), schema);
	}

transactionメソッドは(runメソッドと違って)、入力ファイルの個数等とは無関係に1回だけ呼ばれるようだ。


runメソッド

ParserPluginのrunメソッドが、ファイルから読み込まれたデータを受け取ってパースを行う本体。

import org.embulk.config.TaskSource;

import org.embulk.spi.FileInput;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
	@Override
	public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
		PluginTask task = taskSource.loadTask(PluginTask.class);

		// Write your code here :)
		throw new UnsupportedOperationException("ExampleMyformatParserPlugin.run method is not implemented yet");
	}

taskSource.loadTask()を使って、YAMLファイルに書かれた自分のプラグイン用の設定PluginTaskに入れる。

生成された初期状態ではUnsupportedOperationExceptionが投げられるようになっているので、ここを自分の処理に書き換えていく。

ちなみにrunメソッドは入力ファイルの個数分のマルチスレッドで呼ばれるが、ParserPluginインスタンスは個別に作られるようなので、staticフィールド以外は特にMTセーフを考慮する必要は無さそうだ。


ファイルの読み込み

パーサープラグインは読み込まれたファイルをパースするものなので、runメソッドにはファイルが渡ってくる。(引数のFileInput)
FileInputは複数のファイルが入っていることがあるようで、nextFileメソッドを使ってファイルを順次処理する。

FileInputからデータを読み込むにはpollメソッドを使うようだが、これはEmbulk独自のBufferを返すので、使い方がいまいち分からない。
org.embulk.spi.utilのFileInputInputStreamLineDecoderを使うのが便利そう。


FileInputInputStream

ファイルをバイナリーファイル(InputStream)として読み込むには、FileInputInputStreamを使う。

import java.io.IOException;

import org.embulk.config.TaskSource;

import org.embulk.spi.FileInput;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;

import org.embulk.spi.util.FileInputInputStream;
	@Override
	public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
		byte[] buf = new byte[4096];
		try (FileInputInputStream is = new FileInputInputStream(input)) {
			while (is.nextFile()) {
				for (;;) {
					int len = is.read(buf, 0, buf.length);
					if (len < 0) {
						break;
					}
					〜
				}
			}
		}
	}

ちなみにFileInputInputStreamでは、引数が0個のread()と引数が3個のread(buf, offset, len)では、IOExceptionが発生しない。( 何か起きたら、たぶんRuntimeExceptionにラップしてスローする)
引数が1個のread(buf)はFileInputInputStreamでオーバーライドされていないので、InputStreamのデフォルトのまま。つまりIOExceptionがthrows宣言されているので、IOExceptionの処理をコーディングしなければならない。


LineDecoder

ファイルをテキストファイルとして読み込むには、FileInputInputStreamを(InputStreamReader経由で)BufferedReaderでラップしてもよいが、LineDecoderを使うという方法もある。
(LineDecoderはReaderのサブクラスではない)

LineDecoderを使う場合は、エンコーディングや改行コードの種類を指定するオブジェクト(LineDecoderクラス内のDecoderTaskインターフェース)が必要になる。
これらはYAMLファイル内に記述しておく前提で、PluginTaskにDecoderTaskを含めてしまえばよい。

import org.embulk.config.Task;
import org.embulk.config.TaskSource;

import org.embulk.spi.FileInput;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;

import org.embulk.spi.util.LineDecoder;
import org.embulk.spi.util.LineDecoder.DecoderTask;
	public interface PluginTask extends Task, DecoderTask {
		// if you get schema from config or data source
		@Config("columns")
		public SchemaConfig getColumns();
	}
	@Override
	public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
		PluginTask task = taskSource.loadTask(PluginTask.class);

		try (LineDecoder reader = new LineDecoder(input, task)) {
			while (reader.nextFile()) {
				for (String line : reader) {
					〜
				}
			}
		}
	}

ちなみにLineDecoder内では、BufferedReaderを使ってデータを読み込んでいる。
LineDecoderのpollメソッドを使えばBufferedReader#readLine()と同じように1行ずつ読み込めるが、LineDecoderはIterable<String>を実装しているので、forループで読み込む方が簡単。
なお、LineDecoderのメソッドはIOExceptionを返すようになっていない。(内部でIOExceptionが発生したら、RuntimeExceptionにラップしてスローしている)


データの出力

データの出力には、runメソッドの引数のPageOutputを使う。

PageOutput#add()でPageを渡すのだが、PageはEmbulk独自のBufferを受け取るようになっていて、使い方がいまいち分からない。
org.embulk.spiのPageBuilderを使うのが便利そう。


PageBuilder

import org.embulk.config.TaskSource;

import org.embulk.spi.Exec;
import org.embulk.spi.FileInput;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
	@Override
	public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
		PluginTask task = taskSource.loadTask(PluginTask.class);

		try (LineDecoder reader = new LineDecoder(input, task);
		     PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output)) {
			while (reader.nextFile()) {
				for (String line : reader) {
					// YAMLファイル上のcolumnsが2つあって、両方ともstringである前提
					pageBuilder.setString(schema.getColumn(0), line.substring(0, 10));
					pageBuilder.setString(schema.getColumn(1), line.substring(10));

					pageBuilder.addRecord();
				}
				pageBuilder.flush();
			}
			pageBuilder.finish();
		}
	}

PageBuilderのコンストラクターにはBufferとrunメソッドの引数であるschema, outputを渡す。
BufferはExecというクラスを使って生成できる。

PageBuilderのsetString()やsetLong()等を使ってカラム毎の値をセットする。
これらのメソッドの第1引数でカラム(schemaから取得する)、第2引数で値を指定する。
値にnullを渡す事は出来ない(setString()でnullを渡すとNullPointerExceptionが発生する)。nullを明示的にセットしたい場合はsetNull()を使う。

1レコード分のデータをセットしたらaddRecord()を呼び出す。

適当な間隔でflushメソッドを呼び出してもよい。(finishメソッドを呼び出せばフラッシュされる)

closeメソッドの中ではfinishメソッドは呼ばれないようなので、finishメソッドの呼び出しは明示的に行う必要がある。


PageBuilderへの各カラムの値の設定は、YAMLファイルのcolumnsで書かれたカラム数の分だけ行う必要がある。
(上記の例では決め打ちで2カラムだけ設定しているが、本来はschema.getColumnCount()の個数分ループする必要がある)

また、setString()やsetLong()等のどのメソッドを呼び出すのかについては、columnsで書かれたデータ型と一致するメソッドを呼び出さなければならない。 (一致していないと例外が発生する)
(上記の例では決め打ちでstringとして扱っている)

つまり、本来は以下のような感じのコードになる。

				for (String line : reader) {
					for (Column column : schema.getColumns()) {
						String value = getStringValue(line, column.getIndex()) {

						Type type = column.getType();
						if (type instanceof BooleanType) {
							pageBuilder.setBoolean(column, Boolean.parseBoolean(value));
						} else if (type instanceof LongType) {
							pageBuilder.setLong(column, Long.parseLong(value));
						} else if (type instanceof DoubleType) {
							pageBuilder.setDouble(column, Double.parseDouble(value));
						} else if (type instanceof StringType) {
							pageBuilder.setString(column, value);
						} else if (type instanceof TimestampType) {
							Timestamp time = ;
							pageBuilder.setTimestamp(column, time);
						} else {
							throw new UnsupportedOperationException(MessageFormat.format("type={0}", type.getName()));
						}
					}
					pageBuilder.addRecord();
				}
	private String getStringValue(String line, int index) {
		switch (index) {
		case 0:
			return line.substring(0, 8);
		case 1:
			return line.substring(8);
		default:
			return "";
		}
	}

さすがにこのif文の羅列は嫌なので、visitorパターンで実装できるようになっている。


ColumnVisitor

YAMLファイルに書かれたcolumnsはvisitorパターンで走査できる。

import org.embulk.spi.Column;
import org.embulk.spi.ColumnVisitor;
	@Override
	public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
		PluginTask task = taskSource.loadTask(PluginTask.class);

		try (LineDecoder reader = new LineDecoder(input, task);
		    final PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output)) {
			while (reader.nextFile()) {
				for (final String line : reader) {
					schema.visitColumns(new ColumnVisitor() {

						@Override
						public void timestampColumn(Column column) {
							String value = getStringValue(line, column.getIndex());
							Timestamp time = ;
							pageBuilder.setTimestamp(column, time);
						}

						@Override
						public void stringColumn(Column column) {
							String value = getStringValue(line, column.getIndex());
							pageBuilder.setString(column, value);
						}

						@Override
						public void longColumn(Column column) {
							String value = getStringValue(line, column.getIndex());
							pageBuilder.setLong(column, Long.parseLong(value));
						}

						@Override
						public void doubleColumn(Column column) {
							String value = getStringValue(line, column.getIndex());
							pageBuilder.setDouble(column, Double.parseDouble(value));
						}

						@Override
						public void booleanColumn(Column column) {
							String value = getStringValue(line, column.getIndex());
							pageBuilder.setBoolean(column, Boolean.parseBoolean(value));
						}
					});
					pageBuilder.addRecord();
				}
				pageBuilder.flush();
			}
			pageBuilder.finish();
		}
	}

ちなみに、nullをセットしたい場合は「pageBuilder.setNull(column)」で出来る。


Timestamp

YAMLファイルのcolumnsでデータ型timestampが指定されたカラムはTimestampクラスで値を扱う。

PageBuilderでtimestampをセットするには、基本的には以下のようにコーディングする。

import org.embulk.spi.Column;
import org.embulk.spi.PageBuilder;

import org.embulk.spi.time.Timestamp;
		long msec = System.currentTimeMillis();
		Timestamp time = Timestamp.ofEpochMilli(msec);
		pageBuilder.setTimestamp(column, time);

ところで、EmbulkではYAMLファイル上でdefault_timezoneを指定したり、カラム毎にtimezoneを指定したりすることが出来る。

config.yml:

in:
  type: file
  〜
  parser:
    type: example_myformat
    default_timezone: Asia/Tokyo
    columns:
    - {name: date, type: timestamp, timezone: Asia/Tokyo, format: '%Y/%m/%d %H:%M:%S'}

out:
  type: stdout

これらの情報を取得するには、org.embulk.spi.timeのTimestampParserを使うのが便利である。


import org.embulk.spi.time.Timestamp;
import org.embulk.spi.time.TimestampParser;

import org.embulk.spi.util.Timestamps;

まず、PluginTaskにTimestampParser.Taskインターフェースを追加しておく。
(これにより、default_timezoneやcolumns内のtimezoneが取得できるようになる)

	public interface PluginTask extends Task, TimestampParser.Task {

		@Config("columns")
		public SchemaConfig getColumns();
	}

そして、runメソッドの先頭でTimestampParserを用意する。

	@Override
	public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
		PluginTask task = taskSource.loadTask(PluginTask.class);

		final TimestampParser[] timestampParsers = Timestamps.newTimestampColumnParsers(task, task.getColumns());

		〜
	}

これはTimestampParserの配列であり、使う際はcolumn.getIndex()を添字として指定する。

		schema.visitColumns(new ColumnVisitor() {

			@Override
			public void timestampColumn(Column column) {
				TimestampParser parser = timestampParsers[column.getIndex()];

//				org.joda.time.DateTimeZone dateTimeZone = parser.getDefaultTimeZone();
//				java.util.TimeZone timeZone = dateTimeZone.toTimeZone();

				String value = getStringValue(line, column.getIndex());
				Timestamp time = parser.parse(value);
				pageBuilder.setTimestamp(column, time);
			}

			〜
		}

タイムゾーンを取得したければ、TimestampParserのgetDefaultTimeZoneメソッドを呼び出す。
「DefaultTimeZone」というメソッド名だが、columnsでtimezoneが指定されている場合はそのタイムゾーンを返すし、指定されていない場合はdefault_timezoneで指定されているタイムゾーンが返る。それも指定されていない場合はUTCになる。
なお、返ってくるのはjoda.timeライブラリーのDateTimeZoneクラスなので、java.util.TimeZoneが欲しい場合はそこから変換する。

Stringの値をTimestampに変換したければ、TimestampParserのparseメソッドを呼び出す。
columnsのformat指定に従ってパースされるし、もちろんタイムゾーンも考慮される。


カラムオプション

YAMLファイルのcolumnsに独自のパラメーターを定義させることが出来る。
(columnsに余計なパラメーターが付いていても無視されるので、自分が必要なものだけ取得する)

config.yml:

in:
  type: file
〜
  parser:
    type: example_myformat
    columns:
    - {name: text, type: string, foo: abc}

out:
  type: stdout

このfooを取得する為に、まず、(PluginTaskと同様に)カラムのオプション用インターフェースを作成する。

	public interface ColumnOptionTask extends Task {

		@Config("foo")
		@ConfigDefault("\"defaultFoo\"")
		public String getFoo();
	}

そして、runメソッドの先頭でこのColumnOptionTaskを初期化する。

	@Override
	public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
		PluginTask task = taskSource.loadTask(PluginTask.class);

		SchemaConfig schemaConfig = task.getColumns();
		final List<ColumnOptionTask> columnOptions = new ArrayList<>(schemaConfig.getColumnCount());
		for (ColumnConfig column : schemaConfig.getColumns()) {
			ColumnOptionTask option = column.getOption().loadConfig(ColumnOptionTask.class);
			columnOptions.add(option);
		}

		〜
	}

あとは、欲しい場面で使うだけ。

		schema.visitColumns(new ColumnVisitor() {

			@Override
			public void stringColumn(Column column) {
				String value = getStringValue(line, column.getIndex());
				if (value.isEmpty()) {
					ColumnOptionTask option = columnOptions.get(column.getIndex());
					value = option.getFoo();
				}
				pageBuilder.setString(column, value);
			}

			〜
		}

プラグイン作成へ戻る / Embulk目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま