Embulk(0.7.5)のパーサープラグインの作成方法のメモ。
|
|
パーサープラグイン(java-parser)は、読み込まれたファイルをパースするプラグイン。
(例えばCSVファイルとしてデータを行・カラムごとに分解する)
Javaパーサープラグインは以下のコマンドでプロジェクトを作成する。
$ embulk new java-parser example-myformat
最後の引数の「example-myformat」は、プラグイン名を表す。
これで、「embulk-parser-example_myformat」というディレクトリーが作られ、その下にソース類一式が生成される。
プロジェクトを作成したら、まず以下のコマンドを実行する。
$ cd embulk-parser-example_myformat $ ./gradlew package
生成されたプラグインの中核は、src/main/javaの下のParserPluginクラス。
package org.embulk.parser.example_myformat; 〜 public class ExampleMyformatParserPlugin implements ParserPlugin { public interface PluginTask extends Task { 〜 } @Override public void transaction(ConfigSource config, ParserPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); Schema schema = task.getColumns().toSchema(); control.run(task.dump(), schema); } @Override public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) { PluginTask task = taskSource.loadTask(PluginTask.class); // Write your code here :) throw new UnsupportedOperationException("ExampleMyformatParserPlugin.run method is not implemented yet"); } }
YAMLファイルに記述されたパラメーター(オプション)を取得するのに使うのが、PluginTaskインターフェース。
初期状態では、ParserPluginクラスの内部インターフェースとして、以下のように生成されている。
import com.google.common.base.Optional; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.Task; import org.embulk.spi.SchemaConfig;
public interface PluginTask extends Task { // configuration option 1 (required integer) @Config("option1") public int getOption1(); // configuration option 2 (optional string, null is not allowed) @Config("option2") @ConfigDefault("\"myvalue\"") public String getOption2(); // configuration option 3 (optional string, null is allowed) @Config("option3") @ConfigDefault("null") public Optional<String> getOption3(); // if you get schema from config or data source @Config("columns") public SchemaConfig getColumns(); }
このPluginTaskインターフェースでは、ゲッターメソッドだけを定義する。
具象クラスの作成やYAMLファイルから読み込んだ値の設定はEmbulkがやってくれる。
@ConfigアノテーションでYAMLファイル内のパラメーター名を指定する。
ゲッターメソッドの戻り値の型でデータの型を指定する。
省略可能なパラメーターの場合は@ConfigDefaultアノテーションを付けてデフォルト値を指定する。省略時にnullにしたい場合は戻り型をOptional(これはJava8のOptionalではなく、com.google.commonのもの)にするようだ。
パーサープラグインの場合、読み込んだデータがどういうカラム(項目・列)になるのかをYAMLファイルのcolumnsで指定することが多いだろう。
その場合はgetColumns()の定義はそのまま残しておき、他のパラメーターは自分独自のものに置き換えることになる。
ParserPluginのtransactionメソッドは特に修正する必要は無く、生成された状態のままで良い。
taskの中の値を変更したい場合に何か実装するようだ。
@Override public void transaction(ConfigSource config, ParserPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); Schema schema = task.getColumns().toSchema(); control.run(task.dump(), schema); }
transactionメソッドは(runメソッドと違って)、入力ファイルの個数等とは無関係に1回だけ呼ばれるようだ。
ParserPluginのrunメソッドが、ファイルから読み込まれたデータを受け取ってパースを行う本体。
import org.embulk.config.TaskSource; import org.embulk.spi.FileInput; import org.embulk.spi.PageOutput; import org.embulk.spi.Schema;
@Override
public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
PluginTask task = taskSource.loadTask(PluginTask.class);
// Write your code here :)
throw new UnsupportedOperationException("ExampleMyformatParserPlugin.run method is not implemented yet");
}
taskSource.loadTask()を使って、YAMLファイルに書かれた自分のプラグイン用の設定をPluginTaskに入れる。
生成された初期状態ではUnsupportedOperationExceptionが投げられるようになっているので、ここを自分の処理に書き換えていく。
ちなみにrunメソッドは入力ファイルの個数分のマルチスレッドで呼ばれるが、ParserPluginインスタンスは個別に作られるようなので、staticフィールド以外は特にMTセーフを考慮する必要は無さそうだ。
パーサープラグインは読み込まれたファイルをパースするものなので、runメソッドにはファイルが渡ってくる。(引数のFileInput)
FileInputは複数のファイルが入っていることがあるようで、nextFileメソッドを使ってファイルを順次処理する。
FileInputからデータを読み込むにはpollメソッドを使うようだが、これはEmbulk独自のBufferを返すので、使い方がいまいち分からない。
org.embulk.spi.utilのFileInputInputStreamやLineDecoderを使うのが便利そう。
ファイルをバイナリーファイル(InputStream)として読み込むには、FileInputInputStreamを使う。
import java.io.IOException; import org.embulk.config.TaskSource; import org.embulk.spi.FileInput; import org.embulk.spi.PageOutput; import org.embulk.spi.Schema; import org.embulk.spi.util.FileInputInputStream;
@Override public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) { byte[] buf = new byte[4096]; try (FileInputInputStream is = new FileInputInputStream(input)) { while (is.nextFile()) { for (;;) { int len = is.read(buf, 0, buf.length); if (len < 0) { break; } 〜 } } } }
ちなみにFileInputInputStreamでは、引数が0個のread()と引数が3個のread(buf, offset, len)では、IOExceptionが発生しない。(
何か起きたら、たぶんRuntimeExceptionにラップしてスローする)
引数が1個のread(buf)はFileInputInputStreamでオーバーライドされていないので、InputStreamのデフォルトのまま。つまりIOExceptionがthrows宣言されているので、IOExceptionの処理をコーディングしなければならない。
ファイルをテキストファイルとして読み込むには、FileInputInputStreamを(InputStreamReader経由で)BufferedReaderでラップしてもよいが、LineDecoderを使うという方法もある。
(LineDecoderはReaderのサブクラスではない)
LineDecoderを使う場合は、エンコーディングや改行コードの種類を指定するオブジェクト(LineDecoderクラス内のDecoderTaskインターフェース)が必要になる。
これらはYAMLファイル内に記述しておく前提で、PluginTaskにDecoderTaskを含めてしまえばよい。
import org.embulk.config.Task; import org.embulk.config.TaskSource; import org.embulk.spi.FileInput; import org.embulk.spi.PageOutput; import org.embulk.spi.Schema; import org.embulk.spi.util.LineDecoder; import org.embulk.spi.util.LineDecoder.DecoderTask;
public interface PluginTask extends Task, DecoderTask {
// if you get schema from config or data source
@Config("columns")
public SchemaConfig getColumns();
}
@Override public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) { PluginTask task = taskSource.loadTask(PluginTask.class); try (LineDecoder reader = new LineDecoder(input, task)) { while (reader.nextFile()) { for (String line : reader) { 〜 } } } }
ちなみにLineDecoder内では、BufferedReaderを使ってデータを読み込んでいる。
LineDecoderのpollメソッドを使えばBufferedReader#readLine()と同じように1行ずつ読み込めるが、LineDecoderはIterable<String>を実装しているので、forループで読み込む方が簡単。
なお、LineDecoderのメソッドはIOExceptionを返すようになっていない。(内部でIOExceptionが発生したら、RuntimeExceptionにラップしてスローしている)
データの出力には、runメソッドの引数のPageOutputを使う。
PageOutput#add()でPageを渡すのだが、PageはEmbulk独自のBufferを受け取るようになっていて、使い方がいまいち分からない。
org.embulk.spiのPageBuilderを使うのが便利そう。
import org.embulk.config.TaskSource; import org.embulk.spi.Exec; import org.embulk.spi.FileInput; import org.embulk.spi.PageBuilder; import org.embulk.spi.PageOutput; import org.embulk.spi.Schema;
@Override public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) { PluginTask task = taskSource.loadTask(PluginTask.class); try (LineDecoder reader = new LineDecoder(input, task); PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output)) { while (reader.nextFile()) { for (String line : reader) { // YAMLファイル上のcolumnsが2つあって、両方ともstringである前提 pageBuilder.setString(schema.getColumn(0), line.substring(0, 10)); pageBuilder.setString(schema.getColumn(1), line.substring(10)); pageBuilder.addRecord(); } pageBuilder.flush(); } pageBuilder.finish(); } }
PageBuilderのコンストラクターにはBufferとrunメソッドの引数であるschema, outputを渡す。
BufferはExecというクラスを使って生成できる。
PageBuilderのsetString()やsetLong()等を使ってカラム毎の値をセットする。
これらのメソッドの第1引数でカラム(schemaから取得する)、第2引数で値を指定する。
値にnullを渡す事は出来ない(setString()でnullを渡すとNullPointerExceptionが発生する)。nullを明示的にセットしたい場合はsetNull()を使う。
1レコード分のデータをセットしたらaddRecord()を呼び出す。
適当な間隔でflushメソッドを呼び出してもよい。(finishメソッドを呼び出せばフラッシュされる)
closeメソッドの中ではfinishメソッドは呼ばれないようなので、finishメソッドの呼び出しは明示的に行う必要がある。
PageBuilderへの各カラムの値の設定は、YAMLファイルのcolumnsで書かれたカラム数の分だけ行う必要がある。
(上記の例では決め打ちで2カラムだけ設定しているが、本来はschema.getColumnCount()の個数分ループする必要がある)
また、setString()やsetLong()等のどのメソッドを呼び出すのかについては、columnsで書かれたデータ型と一致するメソッドを呼び出さなければならない。
(一致していないと例外が発生する)
(上記の例では決め打ちでstringとして扱っている)
つまり、本来は以下のような感じのコードになる。
for (String line : reader) { for (Column column : schema.getColumns()) { String value = getStringValue(line, column.getIndex()) { Type type = column.getType(); if (type instanceof BooleanType) { pageBuilder.setBoolean(column, Boolean.parseBoolean(value)); } else if (type instanceof LongType) { pageBuilder.setLong(column, Long.parseLong(value)); } else if (type instanceof DoubleType) { pageBuilder.setDouble(column, Double.parseDouble(value)); } else if (type instanceof StringType) { pageBuilder.setString(column, value); } else if (type instanceof TimestampType) { Timestamp time = 〜; pageBuilder.setTimestamp(column, time); } else { throw new UnsupportedOperationException(MessageFormat.format("type={0}", type.getName())); } } pageBuilder.addRecord(); }
private String getStringValue(String line, int index) { switch (index) { case 0: return line.substring(0, 8); case 1: return line.substring(8); default: return ""; } }
さすがにこのif文の羅列は嫌なので、visitorパターンで実装できるようになっている。
YAMLファイルに書かれたcolumnsはvisitorパターンで走査できる。
import org.embulk.spi.Column; import org.embulk.spi.ColumnVisitor;
@Override public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) { PluginTask task = taskSource.loadTask(PluginTask.class); try (LineDecoder reader = new LineDecoder(input, task); final PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output)) { while (reader.nextFile()) { for (final String line : reader) { schema.visitColumns(new ColumnVisitor() { @Override public void timestampColumn(Column column) { String value = getStringValue(line, column.getIndex()); Timestamp time = 〜; pageBuilder.setTimestamp(column, time); } @Override public void stringColumn(Column column) { String value = getStringValue(line, column.getIndex()); pageBuilder.setString(column, value); } @Override public void longColumn(Column column) { String value = getStringValue(line, column.getIndex()); pageBuilder.setLong(column, Long.parseLong(value)); } @Override public void doubleColumn(Column column) { String value = getStringValue(line, column.getIndex()); pageBuilder.setDouble(column, Double.parseDouble(value)); } @Override public void booleanColumn(Column column) { String value = getStringValue(line, column.getIndex()); pageBuilder.setBoolean(column, Boolean.parseBoolean(value)); } }); pageBuilder.addRecord(); } pageBuilder.flush(); } pageBuilder.finish(); } }
ちなみに、nullをセットしたい場合は「pageBuilder.setNull(column)
」で出来る。
YAMLファイルのcolumnsでデータ型timestampが指定されたカラムはTimestampクラスで値を扱う。
PageBuilderでtimestampをセットするには、基本的には以下のようにコーディングする。
import org.embulk.spi.Column; import org.embulk.spi.PageBuilder; import org.embulk.spi.time.Timestamp;
long msec = System.currentTimeMillis(); Timestamp time = Timestamp.ofEpochMilli(msec); pageBuilder.setTimestamp(column, time);
ところで、EmbulkではYAMLファイル上でdefault_timezoneを指定したり、カラム毎にtimezoneを指定したりすることが出来る。
in: type: file 〜 parser: type: example_myformat default_timezone: Asia/Tokyo columns: - {name: date, type: timestamp, timezone: Asia/Tokyo, format: '%Y/%m/%d %H:%M:%S'} out: type: stdout
これらの情報を取得するには、org.embulk.spi.timeのTimestampParserを使うのが便利である。
import org.embulk.spi.time.Timestamp; import org.embulk.spi.time.TimestampParser; import org.embulk.spi.util.Timestamps;
まず、PluginTaskにTimestampParser.Taskインターフェースを追加しておく。
(これにより、default_timezoneやcolumns内のtimezoneが取得できるようになる)
public interface PluginTask extends Task, TimestampParser.Task { @Config("columns") public SchemaConfig getColumns(); }
そして、runメソッドの先頭でTimestampParserを用意する。
@Override public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) { PluginTask task = taskSource.loadTask(PluginTask.class); final TimestampParser[] timestampParsers = Timestamps.newTimestampColumnParsers(task, task.getColumns()); 〜 }
これはTimestampParserの配列であり、使う際はcolumn.getIndex()を添字として指定する。
schema.visitColumns(new ColumnVisitor() { @Override public void timestampColumn(Column column) { TimestampParser parser = timestampParsers[column.getIndex()]; // org.joda.time.DateTimeZone dateTimeZone = parser.getDefaultTimeZone(); // java.util.TimeZone timeZone = dateTimeZone.toTimeZone(); String value = getStringValue(line, column.getIndex()); Timestamp time = parser.parse(value); pageBuilder.setTimestamp(column, time); } 〜 }
タイムゾーンを取得したければ、TimestampParserのgetDefaultTimeZoneメソッドを呼び出す。
「DefaultTimeZone」というメソッド名だが、columnsでtimezoneが指定されている場合はそのタイムゾーンを返すし、指定されていない場合はdefault_timezoneで指定されているタイムゾーンが返る。それも指定されていない場合はUTCになる。
なお、返ってくるのはjoda.timeライブラリーのDateTimeZoneクラスなので、java.util.TimeZoneが欲しい場合はそこから変換する。
Stringの値をTimestampに変換したければ、TimestampParserのparseメソッドを呼び出す。
columnsのformat指定に従ってパースされるし、もちろんタイムゾーンも考慮される。
YAMLファイルのcolumnsに独自のパラメーターを定義させることが出来る。
(columnsに余計なパラメーターが付いていても無視されるので、自分が必要なものだけ取得する)
in: type: file 〜 parser: type: example_myformat columns: - {name: text, type: string, foo: abc} out: type: stdout
このfooを取得する為に、まず、(PluginTaskと同様に)カラムのオプション用インターフェースを作成する。
public interface ColumnOptionTask extends Task { @Config("foo") @ConfigDefault("\"defaultFoo\"") public String getFoo(); }
そして、runメソッドの先頭でこのColumnOptionTaskを初期化する。
@Override public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) { PluginTask task = taskSource.loadTask(PluginTask.class); SchemaConfig schemaConfig = task.getColumns(); final List<ColumnOptionTask> columnOptions = new ArrayList<>(schemaConfig.getColumnCount()); for (ColumnConfig column : schemaConfig.getColumns()) { ColumnOptionTask option = column.getOption().loadConfig(ColumnOptionTask.class); columnOptions.add(option); } 〜 }
あとは、欲しい場面で使うだけ。
schema.visitColumns(new ColumnVisitor() { @Override public void stringColumn(Column column) { String value = getStringValue(line, column.getIndex()); if (value.isEmpty()) { ColumnOptionTask option = columnOptions.get(column.getIndex()); value = option.getFoo(); } pageBuilder.setString(column, value); } 〜 }