Embulk(0.7.5)のフォーマッタープラグインの作成方法のメモ。
|
|
フォーマッタープラグイン(java-formatter)は、読み込まれたデータをファイル出力用に整形するプラグイン。
(例えばCSVファイルとしてデータを行・カラムごとに分解する)
Javaパーサープラグインは以下のコマンドでプロジェクトを作成する。
$ embulk new java-formatter poi-excel
最後の引数の「poi-excel」は、プラグイン名を表す。
これで、「embulk-formatter-poi_excel」というディレクトリーが作られ、その下にソース類一式が生成される。
プロジェクトを作成したら、まず以下のコマンドを実行する。
$ cd embulk-formatter-poi_excel $ ./gradlew package
生成されたプラグインの中核は、src/main/javaの下のFormatterPluginクラス。
package org.embulk.formatter.poi_excel; 〜 public class PoiExcelFormatterPlugin implements FormatterPlugin { public interface PluginTask extends Task { 〜 } @Override public void transaction(ConfigSource config, Schema schema, FormatterPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); control.run(task.dump()); } @Override public PageOutput open(TaskSource taskSource, Schema schema, FileOutput output) { PluginTask task = taskSource.loadTask(PluginTask.class); // Write your code here :) throw new UnsupportedOperationException("PoiExcelFormatterPlugin.open method is not implemented yet"); } }
YAMLファイルに記述されたパラメーター(オプション)を取得するのに使うのが、PluginTaskインターフェース。
これは基本的にパーサープラグインのPluginTaskと同様。
ファイル出力用にPluginTaskからBufferAllocatorを取得するようにすることも出来る。
import org.embulk.config.ConfigInject; import org.embulk.config.Task; import org.embulk.spi.BufferAllocator;
public interface PluginTask extends Task {
〜
@ConfigInject
public BufferAllocator getBufferAllocator();
}
FormatterPluginのtransactionメソッドは特に修正する必要は無く、生成された状態のままで良い。
configの内容をチェックしたりする場合に何か実装するようだ。
@Override public void transaction(ConfigSource config, Schema schema, FormatterPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); control.run(task.dump()); }
FormatterPluginのopenメソッドが、Inputプラグインから渡されたデータを受け取って整形を行う本体。
import org.embulk.config.TaskSource; import org.embulk.spi.FileOutput; import org.embulk.spi.Schema;
@Override
public PageOutput open(TaskSource taskSource, Schema schema, FileOutput output) {
PluginTask task = taskSource.loadTask(PluginTask.class);
// Write your code here :)
throw new UnsupportedOperationException("PoiExcelFormatterPlugin.open method is not implemented yet");
}
taskSource.loadTask()を使って、YAMLファイルに書かれた自分のプラグイン用の設定をPluginTaskに入れる。
openメソッドは、PageOutputを返すようになっている。
PageOutputは以下のようなインターフェース。
package org.embulk.spi; public interface PageOutput extends AutoCloseable { public void add(Page page); public void finish(); public void close(); }
addメソッドでデータを受け取り、ファイル(FileOutput)にデータを出力する。
@Override public PageOutput open(TaskSource taskSource, final Schema schema, FileOutput output) { return new PageOutput() { @Override public void add(Page page) { 〜FileOutputにデータを出力〜 } @Override public void finish() { } @Override public void close() { } }; }
addメソッドの引数であるPageは、複数レコードを表すクラスだと思う。
Pageからデータ(1行ずつのレコード)を取得するには、PageReaderを使うのが便利。
PageReaderでPageから1レコードずつデータを取得できる。
import org.embulk.spi.PageReader;
@Override public PageOutput open(TaskSource taskSource, final Schema schema, FileOutput output) { return new PageOutput() { private final PageReader pageReader = new PageReader(schema); @Override public void add(Page page) { pageReader.setPage(page); while (pageReader.nextRecord()) { // 先頭カラム(index=0) if (pageReader.isNull(0)) { System.out.print("null"); } else { String s = pageReader.getString(0); System.out.print(s); } System.out.print(", "); // 次のカラム(index=1) if (pageReader.isNull(1)) { System.out.println("null"); } else { String s = pageReader.getString(1); System.out.println(s); } } } @Override public void finish() { } @Override public void close() { } }; }
PageReaderのコンストラクターにschemaを渡す。
これで、isNull()やgetString()・getLong()・getDouble()等を使ってカラム毎のデータが取得できる。
データ型に応じたメソッドを呼び出す必要があるので、schemaからデータ型を取得して判定する必要がある。
が、そういった判定はColumnVisitorを使ってビジターパターンで処理することが出来る。
なお、上記の例ではSystem.outを使ってとりあえず標準出力に出してみたが、実際にはここでファイル(FileOutput)に出力する。
YAMLファイルに書かれたcolumnsをvisitorパターンで走査できる。(これはパーサープラグインで使ったColumnVisitorと同じ)
@Override public PageOutput open(TaskSource taskSource, final Schema schema, FileOutput output) { return new PageOutput() { private final PageReader pageReader = new PageReader(schema); @Override public void add(Page page) { pageReader.setPage(page); while (pageReader.nextRecord()) { schema.visitColumns(new ColumnVisitor() { @Override public void booleanColumn(Column column) { if (pageReader.isNull(column)) { 〜ファイルへnullを出力〜 } else { boolean value = pageReader.getBoolean(column); 〜ファイルへvalueを出力〜 } } @Override public void longColumn(Column column) { if (pageReader.isNull(column)) { 〜ファイルへnullを出力〜 } else { long value = pageReader.getLong(column); 〜ファイルへvalueを出力〜 } } @Override public void doubleColumn(Column column) { if (pageReader.isNull(column)) { 〜ファイルへnullを出力〜 } else { double value = pageReader.getDouble(column); 〜ファイルへvalueを出力〜 } } @Override public void stringColumn(Column column) { if (pageReader.isNull(column)) { 〜ファイルへnullを出力〜 } else { String value = pageReader.getString(column); 〜ファイルへvalueを出力〜 } } @Override public void timestampColumn(Column column) { if (pageReader.isNull(column)) { 〜ファイルへnullを出力〜 } else { Timestamp value = pageReader.getTimestamp(column); 〜ファイルへvalueを出力〜 } } }); } } @Override public void finish() { 〜ファイルのフィニッシュ〜 } @Override public void close() { 〜ファイルのクローズ〜 } }; }
フォーマッタープラグインはファイルに出力する為にデータを整形するものなので、openメソッドには出力先ファイル(FileOutput)が渡ってくる。
FileOutputへデータを出力するには、 org.embulk.spi.utilのFileOutputOutputStreamやLineEncoderを使うのが便利そう。
ファイルにバイナリーデータ(OutputStream)を出力するには、FileOutputOutputStreamを使う。
import org.embulk.config.TaskSource; import org.embulk.spi.FileOutput; import org.embulk.spi.Page; import org.embulk.spi.PageOutput; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.util.FileOutputOutputStream; import org.embulk.spi.util.FileOutputOutputStream.CloseMode;
@Override public PageOutput open(TaskSource taskSource, final Schema schema, FileOutput output) { PluginTask task = taskSource.loadTask(PluginTask.class); final FileOutputOutputStream os = new FileOutputOutputStream(output, task.getBufferAllocator(), CloseMode.FLUSH_FINISH_CLOSE); // create a file os.nextFile(); return new PageOutput() { private final PageReader pageReader = new PageReader(schema); @Override public void add(Page page) { pageReader.setPage(page); while (pageReader.nextRecord()) { // とりあえずカラム0だけ String s = pageReader.getString(0); byte[] buf = s.getBytes(StandardCharsets.UTF_8); os.write(buf, 0, buf.length); } } @Override public void finish() { os.finish(); } @Override public void close() { os.close(); } }; }
FileOutputOutputStreamのコンストラクターの第3引数にCloseMode(FLUSH・FLUSH_FINISH・FLUSH_FINISH_CLOSE・CLOSE)を指定する。
これによってcloseメソッドを呼んだときの動作が変わる(FileOutputに対して、指定されたflush・finish・closeメソッドを呼び出す)。
ちなみにFileOutputOutputStreamでは、引数がintのwrite(int)と引数が3個のwrite(buf, offset, len)では、IOExceptionが発生しない。(何か起きたら、RuntimeExceptionにラップしてスローする)
引数が1個のwrite(buf)はFileOutputOutputStreamでオーバーライドされていないので、OutputStreamのデフォルトのまま。つまりIOExceptionがthrows宣言されているので、IOExceptionの処理をコーディングしなければならない。
ファイルにテキストデータを出力するには、FileOutputOutputStreamを(OutputStreamWriter経由で)BufferedWriterでラップしてもよいが、LineEncoderを使うという方法もある。
(LineEncoderはWriterのサブクラスではない)
LineEncoderを使う場合は、エンコーディングや改行コードの種類を指定するオブジェクト(LineEncoderクラス内のEncoderTaskインターフェース)が必要になる。
これらはYAMLファイル内に記述しておく前提で、PluginTaskにEncoderTaskを含めてしまえばよい。
import org.embulk.config.Task; import org.embulk.config.TaskSource; import org.embulk.spi.FileOutput; import org.embulk.spi.Page; import org.embulk.spi.PageOutput; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.util.LineEncoder; import org.embulk.spi.util.LineEncoder.EncoderTask;
public interface PluginTask extends Task, EncoderTask { 〜 }
@Override public PageOutput open(TaskSource taskSource, final Schema schema, FileOutput output) { PluginTask task = taskSource.loadTask(PluginTask.class); final LineEncoder encoder = new LineEncoder(output, task); // create a file encoder.nextFile(); return new PageOutput() { private final PageReader pageReader = new PageReader(schema); @Override public void add(Page page) { pageReader.setPage(page); while (pageReader.nextRecord()) { // とりあえずカラム0だけ String s = pageReader.getString(0); encoder.addLine(s); } } @Override public void finish() { encoder.finish(); } @Override public void close() { encoder.close(); } }; }
LineEncoderのaddLineメソッドで文字列(改行あり)を出力する。
addNewLineメソッドで改行のみ、addTextメソッドで文字列のみ(改行なし)を出力する。
YAMLファイルのcolumnsでデータ型timestampが指定されたカラムはTimestampクラスで値が渡ってくる。
PageReaderからtimestampを取得するには、以下のようにコーディングする。
import org.embulk.spi.Column; import org.embulk.spi.PageReader; import org.embulk.spi.time.Timestamp;
Timestamp timestamp = pageReader.getTimestamp(column);
ところで、EmbulkのCSVフォーマッターでは、YAMLファイル上でdefault_timezoneを指定したり、カラム毎にtimezoneや書式を指定したりすることが出来る。
in: 〜 out: type: file formatter: type: 〜 default_timezone: 'Asia/Tokyo' column_options: time: {format: '%Y-%m-%d %H:%M:%S'} purchase: {format: '%Y%m%d'}
これらと同様の情報を取得するには、org.embulk.spi.timeのTimestampFormatterを使うのが便利である。
import org.embulk.spi.time.Timestamp; import org.embulk.spi.time.TimestampFormatter; import org.embulk.spi.time.TimestampFormatter.TimestampColumnOption; import org.embulk.spi.util.Timestamps;
まず、PluginTaskにTimestampFormatter.Taskインターフェースを追加し、column_optionsのメソッドも追加しておく。
(これにより、default_timezoneやcolumn_options内のtimezoneやformatが取得できるようになる)
public interface PluginTask extends Task, TimestampFormatter.Task { @Config("column_options") @ConfigDefault("{}") public Map<String, ColumnOptionTask> getColumnOptions(); } public interface ColumnOptionTask extends Task, TimestampColumnOption { }
そして、openメソッドの先頭でTimestampFormatterを用意する。
@Override public PageOutput open(TaskSource taskSource, final Schema schema, FileOutput output) { PluginTask task = taskSource.loadTask(PluginTask.class); final TimestampFormatter[] timestampFormatters = Timestamps.newTimestampColumnFormatters(task, schema, task.getColumnOptions()); 〜 }
これはTimestampFormatterの配列であり、使う際はcolumn.getIndex()を添字として指定する。
schema.visitColumns(new ColumnVisitor() { @Override public void timestampColumn(Column column) { Timestamp timestamp = pageReader.getTimestamp(column); TimestampFormatter formatter = timestampFormatters[column.getIndex()]; // org.joda.time.DateTimeZone dateTimeZone = formatter.getTimeZone(); // java.util.TimeZone timeZone = dateTimeZone.toTimeZone(); // java.util.Date value = new org.joda.time.DateTime(timestamp.toEpochMilli(), formatter.getTimeZone()).toDate(); String value = formatter.format(timestamp); // valueをFileOutputに出力する } 〜 }
タイムゾーンを取得したければ、TimestampFormatterのgetTimeZoneメソッドを呼び出す。
column_optionsでtimezoneが指定されている場合はそのタイムゾーンを返すし、指定されていない場合はdefault_timezoneで指定されているタイムゾーンが返る。それも指定されていない場合はUTCになる。
なお、返ってくるのはjoda.timeライブラリーのDateTimeZoneクラスなので、java.util.TimeZoneが欲しい場合はそこから変換する。
TimestampをStringに変換したければ、TimestampFormatterのformatメソッドを呼び出す。
column_optionsのformat指定に従って整形されるし、もちろんタイムゾーンも考慮される。