Embulkフォーマッタープラグイン作成メモ(Hishidama's Embulk java-formatter Memo) S-JIS[2015-10-16] 変更履歴

Embulk Javaフォーマッタープラグイン

Embulk(0.7.5)のフォーマッタープラグインの作成方法のメモ。


概要

フォーマッタープラグイン(java-formatter)は、読み込まれたデータをファイル出力用に整形するプラグイン。
(例えばCSVファイルとしてデータを行・カラムごとに分解する)


プロジェクトの作成

Javaパーサープラグインは以下のコマンドでプロジェクトを作成する。

$ embulk new java-formatter poi-excel

最後の引数の「poi-excel」は、プラグイン名を表す。
これで、「embulk-formatter-poi_excel」というディレクトリーが作られ、その下にソース類一式が生成される。

プロジェクトを作成したら、まず以下のコマンドを実行する。

$ cd embulk-formatter-poi_excel
$ ./gradlew package

生成されたプラグインの中核は、src/main/javaの下のFormatterPluginクラス。

package org.embulk.formatter.poi_excel;
〜
public class PoiExcelFormatterPlugin implements FormatterPlugin {

	public interface PluginTask extends Task {
〜
	}

	@Override
	public void transaction(ConfigSource config, Schema schema, FormatterPlugin.Control control) {
		PluginTask task = config.loadConfig(PluginTask.class);

		control.run(task.dump());
	}

	@Override
	public PageOutput open(TaskSource taskSource, Schema schema, FileOutput output) {
		PluginTask task = taskSource.loadTask(PluginTask.class);

		// Write your code here :)
		throw new UnsupportedOperationException("PoiExcelFormatterPlugin.open method is not implemented yet");
	}
}

PluginTask

YAMLファイルに記述されたパラメーター(オプション)を取得するのに使うのが、PluginTaskインターフェース。

これは基本的にパーサープラグインのPluginTaskと同様。


ファイル出力用にPluginTaskからBufferAllocatorを取得するようにすることも出来る。

import org.embulk.config.ConfigInject;
import org.embulk.config.Task;

import org.embulk.spi.BufferAllocator;
	public interface PluginTask extends Task {
〜
		@ConfigInject
		public BufferAllocator getBufferAllocator();
	}

transactionメソッド

FormatterPluginのtransactionメソッドは特に修正する必要は無く、生成された状態のままで良い。
configの内容をチェックしたりする場合に何か実装するようだ。

	@Override
	public void transaction(ConfigSource config, Schema schema, FormatterPlugin.Control control) {
		PluginTask task = config.loadConfig(PluginTask.class);

		control.run(task.dump());
	}

openメソッド

FormatterPluginのopenメソッドが、Inputプラグインから渡されたデータを受け取って整形を行う本体。

import org.embulk.config.TaskSource;

import org.embulk.spi.FileOutput;
import org.embulk.spi.Schema;
	@Override
	public PageOutput open(TaskSource taskSource, Schema schema, FileOutput output) {
		PluginTask task = taskSource.loadTask(PluginTask.class);

		// Write your code here :)
		throw new UnsupportedOperationException("PoiExcelFormatterPlugin.open method is not implemented yet");
	}

taskSource.loadTask()を使って、YAMLファイルに書かれた自分のプラグイン用の設定PluginTaskに入れる。


PageOutput

openメソッドは、PageOutputを返すようになっている。
PageOutputは以下のようなインターフェース。

package org.embulk.spi;

public interface PageOutput extends AutoCloseable {

	public void add(Page page);

	public void finish();

	public void close();
}

addメソッドでデータを受け取り、ファイル(FileOutput)にデータを出力する。

	@Override
	public PageOutput open(TaskSource taskSource, final Schema schema, FileOutput output) {

		return new PageOutput() {

			@Override
			public void add(Page page) {
				〜FileOutputにデータを出力〜
			}

			@Override
			public void finish() {
			}

			@Override
			public void close() {
			}
		};
	}

addメソッドの引数であるPageは、複数レコードを表すクラスだと思う。
Pageからデータ(1行ずつのレコード)を取得するには、PageReaderを使うのが便利。


PageReader

PageReaderでPageから1レコードずつデータを取得できる。

import org.embulk.spi.PageReader;
	@Override
	public PageOutput open(TaskSource taskSource, final Schema schema, FileOutput output) {

		return new PageOutput() {
			private final PageReader pageReader = new PageReader(schema);

			@Override
			public void add(Page page) {
				pageReader.setPage(page);
				while (pageReader.nextRecord()) {
					// 先頭カラム(index=0)
					if (pageReader.isNull(0)) {
						System.out.print("null");
					} else {
						String s = pageReader.getString(0);
						System.out.print(s);
					}

					System.out.print(", ");

					// 次のカラム(index=1)
					if (pageReader.isNull(1)) {
						System.out.println("null");
					} else {
						String s = pageReader.getString(1);
						System.out.println(s);
					}
				}
			}

			@Override
			public void finish() {
			}

			@Override
			public void close() {
			}
		};
	}

PageReaderのコンストラクターにschemaを渡す。
これで、isNull()やgetString()・getLong()・getDouble()等を使ってカラム毎のデータが取得できる。
データ型に応じたメソッドを呼び出す必要があるので、schemaからデータ型を取得して判定する必要がある。
が、そういった判定はColumnVisitorを使ってビジターパターンで処理することが出来る。

なお、上記の例ではSystem.outを使ってとりあえず標準出力に出してみたが、実際にはここでファイル(FileOutput)に出力する。


ColumnVisitor

YAMLファイルに書かれたcolumnsをvisitorパターンで走査できる。(これはパーサープラグインで使ったColumnVisitorと同じ)

	@Override
	public PageOutput open(TaskSource taskSource, final Schema schema, FileOutput output) {

		return new PageOutput() {
			private final PageReader pageReader = new PageReader(schema);

			@Override
			public void add(Page page) {
				pageReader.setPage(page);
				while (pageReader.nextRecord()) {
					schema.visitColumns(new ColumnVisitor() {

						@Override
						public void booleanColumn(Column column) {
							if (pageReader.isNull(column)) {
								〜ファイルへnullを出力〜
							} else {
								boolean value = pageReader.getBoolean(column);
								〜ファイルへvalueを出力〜
							}
						}

						@Override
						public void longColumn(Column column) {
							if (pageReader.isNull(column)) {
								〜ファイルへnullを出力〜
							} else {
								long value = pageReader.getLong(column);
								〜ファイルへvalueを出力〜
							}
						}

						@Override
						public void doubleColumn(Column column) {
							if (pageReader.isNull(column)) {
								〜ファイルへnullを出力〜
							} else {
								double value = pageReader.getDouble(column);
								〜ファイルへvalueを出力〜
							}
						}

						@Override
						public void stringColumn(Column column) {
							if (pageReader.isNull(column)) {
								〜ファイルへnullを出力〜
							} else {
								String value = pageReader.getString(column);
								〜ファイルへvalueを出力〜
							}
						}

						@Override
						public void timestampColumn(Column column) {
							if (pageReader.isNull(column)) {
								〜ファイルへnullを出力〜
							} else {
								Timestamp value = pageReader.getTimestamp(column);
								〜ファイルへvalueを出力〜
							}
						}
					});
				}
			}

			@Override
			public void finish() {
				〜ファイルのフィニッシュ〜
			}

			@Override
			public void close() {
				〜ファイルのクローズ〜
			}
		};
	}

ファイルへの出力

フォーマッタープラグインはファイルに出力する為にデータを整形するものなので、openメソッドには出力先ファイル(FileOutput)が渡ってくる。

FileOutputへデータを出力するには、 org.embulk.spi.utilのFileOutputOutputStreamLineEncoderを使うのが便利そう。


FileOutputOutputStream

ファイルにバイナリーデータ(OutputStream)を出力するには、FileOutputOutputStreamを使う。

import org.embulk.config.TaskSource;

import org.embulk.spi.FileOutput;
import org.embulk.spi.Page;
import org.embulk.spi.PageOutput;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;

import org.embulk.spi.util.FileOutputOutputStream;
import org.embulk.spi.util.FileOutputOutputStream.CloseMode;
	@Override
	public PageOutput open(TaskSource taskSource, final Schema schema, FileOutput output) {
		PluginTask task = taskSource.loadTask(PluginTask.class);

		final FileOutputOutputStream os = new FileOutputOutputStream(output, task.getBufferAllocator(), CloseMode.FLUSH_FINISH_CLOSE);

		// create a file
		os.nextFile();

		return new PageOutput() {
			private final PageReader pageReader = new PageReader(schema);

			@Override
			public void add(Page page) {
				pageReader.setPage(page);
				while (pageReader.nextRecord()) {
					// とりあえずカラム0だけ
					String s = pageReader.getString(0);

					byte[] buf = s.getBytes(StandardCharsets.UTF_8);
					os.write(buf, 0, buf.length);
				}
			}

			@Override
			public void finish() {
				os.finish();
			}

			@Override
			public void close() {
				os.close();
			}
		};
	}

FileOutputOutputStreamのコンストラクターの第3引数にCloseMode(FLUSH・FLUSH_FINISH・FLUSH_FINISH_CLOSE・CLOSE)を指定する。
これによってcloseメソッドを呼んだときの動作が変わる(FileOutputに対して、指定されたflush・finish・closeメソッドを呼び出す)。

ちなみにFileOutputOutputStreamでは、引数がintのwrite(int)と引数が3個のwrite(buf, offset, len)では、IOExceptionが発生しない。(何か起きたら、RuntimeExceptionにラップしてスローする)
引数が1個のwrite(buf)はFileOutputOutputStreamでオーバーライドされていないので、OutputStreamのデフォルトのまま。つまりIOExceptionがthrows宣言されているので、IOExceptionの処理をコーディングしなければならない。


LineEncoder

ファイルにテキストデータを出力するには、FileOutputOutputStreamを(OutputStreamWriter経由で)BufferedWriterでラップしてもよいが、LineEncoderを使うという方法もある。
(LineEncoderはWriterのサブクラスではない)

LineEncoderを使う場合は、エンコーディングや改行コードの種類を指定するオブジェクト(LineEncoderクラス内のEncoderTaskインターフェース)が必要になる。
これらはYAMLファイル内に記述しておく前提で、PluginTaskにEncoderTaskを含めてしまえばよい。

import org.embulk.config.Task;
import org.embulk.config.TaskSource;

import org.embulk.spi.FileOutput;
import org.embulk.spi.Page;
import org.embulk.spi.PageOutput;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;

import org.embulk.spi.util.LineEncoder;
import org.embulk.spi.util.LineEncoder.EncoderTask;
	public interface PluginTask extends Task, EncoderTask {
〜
	}
	@Override
	public PageOutput open(TaskSource taskSource, final Schema schema, FileOutput output) {
		PluginTask task = taskSource.loadTask(PluginTask.class);

		final LineEncoder encoder = new LineEncoder(output, task);

		// create a file
		encoder.nextFile();

		return new PageOutput() {
			private final PageReader pageReader = new PageReader(schema);

			@Override
			public void add(Page page) {
				pageReader.setPage(page);
				while (pageReader.nextRecord()) {
					// とりあえずカラム0だけ
					String s = pageReader.getString(0);

					encoder.addLine(s);
				}
			}

			@Override
			public void finish() {
				encoder.finish();
			}

			@Override
			public void close() {
				encoder.close();
			}
		};
	}

LineEncoderのaddLineメソッドで文字列(改行あり)を出力する。
addNewLineメソッドで改行のみ、addTextメソッドで文字列のみ(改行なし)を出力する。


Timestamp

YAMLファイルのcolumnsでデータ型timestampが指定されたカラムはTimestampクラスで値が渡ってくる。

PageReaderからtimestampを取得するには、以下のようにコーディングする。

import org.embulk.spi.Column;
import org.embulk.spi.PageReader;

import org.embulk.spi.time.Timestamp;
		Timestamp timestamp = pageReader.getTimestamp(column);

ところで、EmbulkのCSVフォーマッターでは、YAMLファイル上でdefault_timezoneを指定したり、カラム毎にtimezoneや書式を指定したりすることが出来る。

config.yml:

in:
  〜

out:
  type: file
  formatter:
    type: 〜
    default_timezone: 'Asia/Tokyo'
    column_options:
      time: {format: '%Y-%m-%d %H:%M:%S'}
      purchase: {format: '%Y%m%d'}

これらと同様の情報を取得するには、org.embulk.spi.timeのTimestampFormatterを使うのが便利である。


import org.embulk.spi.time.Timestamp;
import org.embulk.spi.time.TimestampFormatter;
import org.embulk.spi.time.TimestampFormatter.TimestampColumnOption;

import org.embulk.spi.util.Timestamps;

まず、PluginTaskにTimestampFormatter.Taskインターフェースを追加し、column_optionsのメソッドも追加しておく。
(これにより、default_timezoneやcolumn_options内のtimezoneやformatが取得できるようになる)

	public interface PluginTask extends Task, TimestampFormatter.Task {

		@Config("column_options")
		@ConfigDefault("{}")
		public Map<String, ColumnOptionTask> getColumnOptions();
	}

	public interface ColumnOptionTask extends Task, TimestampColumnOption {
	}

そして、openメソッドの先頭でTimestampFormatterを用意する。

	@Override
	public PageOutput open(TaskSource taskSource, final Schema schema, FileOutput output) {
		PluginTask task = taskSource.loadTask(PluginTask.class);

		final TimestampFormatter[] timestampFormatters = Timestamps.newTimestampColumnFormatters(task, schema, task.getColumnOptions());

		〜
	}

これはTimestampFormatterの配列であり、使う際はcolumn.getIndex()を添字として指定する。

		schema.visitColumns(new ColumnVisitor() {

			@Override
			public void timestampColumn(Column column) {
				Timestamp timestamp = pageReader.getTimestamp(column);
				TimestampFormatter formatter = timestampFormatters[column.getIndex()];

//				org.joda.time.DateTimeZone dateTimeZone = formatter.getTimeZone();
//				java.util.TimeZone timeZone = dateTimeZone.toTimeZone();

//				java.util.Date value = new org.joda.time.DateTime(timestamp.toEpochMilli(), formatter.getTimeZone()).toDate();

				String value = formatter.format(timestamp);

				// valueをFileOutputに出力する
			}

			〜
		}

タイムゾーンを取得したければ、TimestampFormatterのgetTimeZoneメソッドを呼び出す。
column_optionsでtimezoneが指定されている場合はそのタイムゾーンを返すし、指定されていない場合はdefault_timezoneで指定されているタイムゾーンが返る。それも指定されていない場合はUTCになる。
なお、返ってくるのはjoda.timeライブラリーのDateTimeZoneクラスなので、java.util.TimeZoneが欲しい場合はそこから変換する。

TimestampをStringに変換したければ、TimestampFormatterのformatメソッドを呼び出す。
column_optionsのformat指定に従って整形されるし、もちろんタイムゾーンも考慮される。


プラグイン作成へ戻る / Embulk目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま