Embulk(0.7.5)のパーサープラグインの作成方法のメモ。
|
|
パーサープラグイン(java-parser)は、読み込まれたファイルをパースするプラグイン。
(例えばCSVファイルとしてデータを行・カラムごとに分解する)
Javaパーサープラグインは以下のコマンドでプロジェクトを作成する。
$ embulk new java-parser example-myformat
最後の引数の「example-myformat」は、プラグイン名を表す。
これで、「embulk-parser-example_myformat」というディレクトリーが作られ、その下にソース類一式が生成される。
プロジェクトを作成したら、まず以下のコマンドを実行する。
$ cd embulk-parser-example_myformat $ ./gradlew package
生成されたプラグインの中核は、src/main/javaの下のParserPluginクラス。
package org.embulk.parser.example_myformat;
〜
public class ExampleMyformatParserPlugin implements ParserPlugin {
public interface PluginTask extends Task {
〜
}
@Override
public void transaction(ConfigSource config, ParserPlugin.Control control) {
PluginTask task = config.loadConfig(PluginTask.class);
Schema schema = task.getColumns().toSchema();
control.run(task.dump(), schema);
}
@Override
public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
PluginTask task = taskSource.loadTask(PluginTask.class);
// Write your code here :)
throw new UnsupportedOperationException("ExampleMyformatParserPlugin.run method is not implemented yet");
}
}
YAMLファイルに記述されたパラメーター(オプション)を取得するのに使うのが、PluginTaskインターフェース。
初期状態では、ParserPluginクラスの内部インターフェースとして、以下のように生成されている。
import com.google.common.base.Optional; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.Task; import org.embulk.spi.SchemaConfig;
public interface PluginTask extends Task {
// configuration option 1 (required integer)
@Config("option1")
public int getOption1();
// configuration option 2 (optional string, null is not allowed)
@Config("option2")
@ConfigDefault("\"myvalue\"")
public String getOption2();
// configuration option 3 (optional string, null is allowed)
@Config("option3")
@ConfigDefault("null")
public Optional<String> getOption3();
// if you get schema from config or data source
@Config("columns")
public SchemaConfig getColumns();
}
このPluginTaskインターフェースでは、ゲッターメソッドだけを定義する。
具象クラスの作成やYAMLファイルから読み込んだ値の設定はEmbulkがやってくれる。
@ConfigアノテーションでYAMLファイル内のパラメーター名を指定する。
ゲッターメソッドの戻り値の型でデータの型を指定する。
省略可能なパラメーターの場合は@ConfigDefaultアノテーションを付けてデフォルト値を指定する。省略時にnullにしたい場合は戻り型をOptional(これはJava8のOptionalではなく、com.google.commonのもの)にするようだ。
パーサープラグインの場合、読み込んだデータがどういうカラム(項目・列)になるのかをYAMLファイルのcolumnsで指定することが多いだろう。
その場合はgetColumns()の定義はそのまま残しておき、他のパラメーターは自分独自のものに置き換えることになる。
ParserPluginのtransactionメソッドは特に修正する必要は無く、生成された状態のままで良い。
taskの中の値を変更したい場合に何か実装するようだ。
@Override
public void transaction(ConfigSource config, ParserPlugin.Control control) {
PluginTask task = config.loadConfig(PluginTask.class);
Schema schema = task.getColumns().toSchema();
control.run(task.dump(), schema);
}
transactionメソッドは(runメソッドと違って)、入力ファイルの個数等とは無関係に1回だけ呼ばれるようだ。
ParserPluginのrunメソッドが、ファイルから読み込まれたデータを受け取ってパースを行う本体。
import org.embulk.config.TaskSource; import org.embulk.spi.FileInput; import org.embulk.spi.PageOutput; import org.embulk.spi.Schema;
@Override
public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
PluginTask task = taskSource.loadTask(PluginTask.class);
// Write your code here :)
throw new UnsupportedOperationException("ExampleMyformatParserPlugin.run method is not implemented yet");
}
taskSource.loadTask()を使って、YAMLファイルに書かれた自分のプラグイン用の設定をPluginTaskに入れる。
生成された初期状態ではUnsupportedOperationExceptionが投げられるようになっているので、ここを自分の処理に書き換えていく。
ちなみにrunメソッドは入力ファイルの個数分のマルチスレッドで呼ばれるが、ParserPluginインスタンスは個別に作られるようなので、staticフィールド以外は特にMTセーフを考慮する必要は無さそうだ。
パーサープラグインは読み込まれたファイルをパースするものなので、runメソッドにはファイルが渡ってくる。(引数のFileInput)
FileInputは複数のファイルが入っていることがあるようで、nextFileメソッドを使ってファイルを順次処理する。
FileInputからデータを読み込むにはpollメソッドを使うようだが、これはEmbulk独自のBufferを返すので、使い方がいまいち分からない。
org.embulk.spi.utilのFileInputInputStreamやLineDecoderを使うのが便利そう。
ファイルをバイナリーファイル(InputStream)として読み込むには、FileInputInputStreamを使う。
import java.io.IOException; import org.embulk.config.TaskSource; import org.embulk.spi.FileInput; import org.embulk.spi.PageOutput; import org.embulk.spi.Schema; import org.embulk.spi.util.FileInputInputStream;
@Override
public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
byte[] buf = new byte[4096];
try (FileInputInputStream is = new FileInputInputStream(input)) {
while (is.nextFile()) {
for (;;) {
int len = is.read(buf, 0, buf.length);
if (len < 0) {
break;
}
〜
}
}
}
}
ちなみにFileInputInputStreamでは、引数が0個のread()と引数が3個のread(buf, offset, len)では、IOExceptionが発生しない。(
何か起きたら、たぶんRuntimeExceptionにラップしてスローする)
引数が1個のread(buf)はFileInputInputStreamでオーバーライドされていないので、InputStreamのデフォルトのまま。つまりIOExceptionがthrows宣言されているので、IOExceptionの処理をコーディングしなければならない。
ファイルをテキストファイルとして読み込むには、FileInputInputStreamを(InputStreamReader経由で)BufferedReaderでラップしてもよいが、LineDecoderを使うという方法もある。
(LineDecoderはReaderのサブクラスではない)
LineDecoderを使う場合は、エンコーディングや改行コードの種類を指定するオブジェクト(LineDecoderクラス内のDecoderTaskインターフェース)が必要になる。
これらはYAMLファイル内に記述しておく前提で、PluginTaskにDecoderTaskを含めてしまえばよい。
import org.embulk.config.Task; import org.embulk.config.TaskSource; import org.embulk.spi.FileInput; import org.embulk.spi.PageOutput; import org.embulk.spi.Schema; import org.embulk.spi.util.LineDecoder; import org.embulk.spi.util.LineDecoder.DecoderTask;
public interface PluginTask extends Task, DecoderTask {
// if you get schema from config or data source
@Config("columns")
public SchemaConfig getColumns();
}
@Override
public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
PluginTask task = taskSource.loadTask(PluginTask.class);
try (LineDecoder reader = new LineDecoder(input, task)) {
while (reader.nextFile()) {
for (String line : reader) {
〜
}
}
}
}
ちなみにLineDecoder内では、BufferedReaderを使ってデータを読み込んでいる。
LineDecoderのpollメソッドを使えばBufferedReader#readLine()と同じように1行ずつ読み込めるが、LineDecoderはIterable<String>を実装しているので、forループで読み込む方が簡単。
なお、LineDecoderのメソッドはIOExceptionを返すようになっていない。(内部でIOExceptionが発生したら、RuntimeExceptionにラップしてスローしている)
データの出力には、runメソッドの引数のPageOutputを使う。
PageOutput#add()でPageを渡すのだが、PageはEmbulk独自のBufferを受け取るようになっていて、使い方がいまいち分からない。
org.embulk.spiのPageBuilderを使うのが便利そう。
import org.embulk.config.TaskSource; import org.embulk.spi.Exec; import org.embulk.spi.FileInput; import org.embulk.spi.PageBuilder; import org.embulk.spi.PageOutput; import org.embulk.spi.Schema;
@Override
public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
PluginTask task = taskSource.loadTask(PluginTask.class);
try (LineDecoder reader = new LineDecoder(input, task);
PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output)) {
while (reader.nextFile()) {
for (String line : reader) {
// YAMLファイル上のcolumnsが2つあって、両方ともstringである前提
pageBuilder.setString(schema.getColumn(0), line.substring(0, 10));
pageBuilder.setString(schema.getColumn(1), line.substring(10));
pageBuilder.addRecord();
}
pageBuilder.flush();
}
pageBuilder.finish();
}
}
PageBuilderのコンストラクターにはBufferとrunメソッドの引数であるschema, outputを渡す。
BufferはExecというクラスを使って生成できる。
PageBuilderのsetString()やsetLong()等を使ってカラム毎の値をセットする。
これらのメソッドの第1引数でカラム(schemaから取得する)、第2引数で値を指定する。
値にnullを渡す事は出来ない(setString()でnullを渡すとNullPointerExceptionが発生する)。nullを明示的にセットしたい場合はsetNull()を使う。
1レコード分のデータをセットしたらaddRecord()を呼び出す。
適当な間隔でflushメソッドを呼び出してもよい。(finishメソッドを呼び出せばフラッシュされる)
closeメソッドの中ではfinishメソッドは呼ばれないようなので、finishメソッドの呼び出しは明示的に行う必要がある。
PageBuilderへの各カラムの値の設定は、YAMLファイルのcolumnsで書かれたカラム数の分だけ行う必要がある。
(上記の例では決め打ちで2カラムだけ設定しているが、本来はschema.getColumnCount()の個数分ループする必要がある)
また、setString()やsetLong()等のどのメソッドを呼び出すのかについては、columnsで書かれたデータ型と一致するメソッドを呼び出さなければならない。
(一致していないと例外が発生する)
(上記の例では決め打ちでstringとして扱っている)
つまり、本来は以下のような感じのコードになる。
for (String line : reader) {
for (Column column : schema.getColumns()) {
String value = getStringValue(line, column.getIndex()) {
Type type = column.getType();
if (type instanceof BooleanType) {
pageBuilder.setBoolean(column, Boolean.parseBoolean(value));
} else if (type instanceof LongType) {
pageBuilder.setLong(column, Long.parseLong(value));
} else if (type instanceof DoubleType) {
pageBuilder.setDouble(column, Double.parseDouble(value));
} else if (type instanceof StringType) {
pageBuilder.setString(column, value);
} else if (type instanceof TimestampType) {
Timestamp time = 〜;
pageBuilder.setTimestamp(column, time);
} else {
throw new UnsupportedOperationException(MessageFormat.format("type={0}", type.getName()));
}
}
pageBuilder.addRecord();
}
private String getStringValue(String line, int index) {
switch (index) {
case 0:
return line.substring(0, 8);
case 1:
return line.substring(8);
default:
return "";
}
}
さすがにこのif文の羅列は嫌なので、visitorパターンで実装できるようになっている。
YAMLファイルに書かれたcolumnsはvisitorパターンで走査できる。
import org.embulk.spi.Column; import org.embulk.spi.ColumnVisitor;
@Override
public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
PluginTask task = taskSource.loadTask(PluginTask.class);
try (LineDecoder reader = new LineDecoder(input, task);
final PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output)) {
while (reader.nextFile()) {
for (final String line : reader) {
schema.visitColumns(new ColumnVisitor() {
@Override
public void timestampColumn(Column column) {
String value = getStringValue(line, column.getIndex());
Timestamp time = 〜;
pageBuilder.setTimestamp(column, time);
}
@Override
public void stringColumn(Column column) {
String value = getStringValue(line, column.getIndex());
pageBuilder.setString(column, value);
}
@Override
public void longColumn(Column column) {
String value = getStringValue(line, column.getIndex());
pageBuilder.setLong(column, Long.parseLong(value));
}
@Override
public void doubleColumn(Column column) {
String value = getStringValue(line, column.getIndex());
pageBuilder.setDouble(column, Double.parseDouble(value));
}
@Override
public void booleanColumn(Column column) {
String value = getStringValue(line, column.getIndex());
pageBuilder.setBoolean(column, Boolean.parseBoolean(value));
}
});
pageBuilder.addRecord();
}
pageBuilder.flush();
}
pageBuilder.finish();
}
}
ちなみに、nullをセットしたい場合は「pageBuilder.setNull(column)」で出来る。
YAMLファイルのcolumnsでデータ型timestampが指定されたカラムはTimestampクラスで値を扱う。
PageBuilderでtimestampをセットするには、基本的には以下のようにコーディングする。
import org.embulk.spi.Column; import org.embulk.spi.PageBuilder; import org.embulk.spi.time.Timestamp;
long msec = System.currentTimeMillis(); Timestamp time = Timestamp.ofEpochMilli(msec); pageBuilder.setTimestamp(column, time);
ところで、EmbulkではYAMLファイル上でdefault_timezoneを指定したり、カラム毎にtimezoneを指定したりすることが出来る。
in:
type: file
〜
parser:
type: example_myformat
default_timezone: Asia/Tokyo
columns:
- {name: date, type: timestamp, timezone: Asia/Tokyo, format: '%Y/%m/%d %H:%M:%S'}
out:
type: stdout
これらの情報を取得するには、org.embulk.spi.timeのTimestampParserを使うのが便利である。
import org.embulk.spi.time.Timestamp; import org.embulk.spi.time.TimestampParser; import org.embulk.spi.util.Timestamps;
まず、PluginTaskにTimestampParser.Taskインターフェースを追加しておく。
(これにより、default_timezoneやcolumns内のtimezoneが取得できるようになる)
public interface PluginTask extends Task, TimestampParser.Task {
@Config("columns")
public SchemaConfig getColumns();
}
そして、runメソッドの先頭でTimestampParserを用意する。
@Override
public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
PluginTask task = taskSource.loadTask(PluginTask.class);
final TimestampParser[] timestampParsers = Timestamps.newTimestampColumnParsers(task, task.getColumns());
〜
}
これはTimestampParserの配列であり、使う際はcolumn.getIndex()を添字として指定する。
schema.visitColumns(new ColumnVisitor() {
@Override
public void timestampColumn(Column column) {
TimestampParser parser = timestampParsers[column.getIndex()];
// org.joda.time.DateTimeZone dateTimeZone = parser.getDefaultTimeZone();
// java.util.TimeZone timeZone = dateTimeZone.toTimeZone();
String value = getStringValue(line, column.getIndex());
Timestamp time = parser.parse(value);
pageBuilder.setTimestamp(column, time);
}
〜
}
タイムゾーンを取得したければ、TimestampParserのgetDefaultTimeZoneメソッドを呼び出す。
「DefaultTimeZone」というメソッド名だが、columnsでtimezoneが指定されている場合はそのタイムゾーンを返すし、指定されていない場合はdefault_timezoneで指定されているタイムゾーンが返る。それも指定されていない場合はUTCになる。
なお、返ってくるのはjoda.timeライブラリーのDateTimeZoneクラスなので、java.util.TimeZoneが欲しい場合はそこから変換する。
Stringの値をTimestampに変換したければ、TimestampParserのparseメソッドを呼び出す。
columnsのformat指定に従ってパースされるし、もちろんタイムゾーンも考慮される。
YAMLファイルのcolumnsに独自のパラメーターを定義させることが出来る。
(columnsに余計なパラメーターが付いていても無視されるので、自分が必要なものだけ取得する)
in:
type: file
〜
parser:
type: example_myformat
columns:
- {name: text, type: string, foo: abc}
out:
type: stdout
このfooを取得する為に、まず、(PluginTaskと同様に)カラムのオプション用インターフェースを作成する。
public interface ColumnOptionTask extends Task {
@Config("foo")
@ConfigDefault("\"defaultFoo\"")
public String getFoo();
}
そして、runメソッドの先頭でこのColumnOptionTaskを初期化する。
@Override
public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutput output) {
PluginTask task = taskSource.loadTask(PluginTask.class);
SchemaConfig schemaConfig = task.getColumns();
final List<ColumnOptionTask> columnOptions = new ArrayList<>(schemaConfig.getColumnCount());
for (ColumnConfig column : schemaConfig.getColumns()) {
ColumnOptionTask option = column.getOption().loadConfig(ColumnOptionTask.class);
columnOptions.add(option);
}
〜
}
あとは、欲しい場面で使うだけ。
schema.visitColumns(new ColumnVisitor() {
@Override
public void stringColumn(Column column) {
String value = getStringValue(line, column.getIndex());
if (value.isEmpty()) {
ColumnOptionTask option = columnOptions.get(column.getIndex());
value = option.getFoo();
}
pageBuilder.setString(column, value);
}
〜
}