EmbulkをJavaから実行する方法。
|
|
|
Javaアプリケーションの中からEmbulkを実行する為のクラスとして、Embulk0.6.22でEmbulkEmbedというクラスが導入された。
embulkコマンドのguess・preview・run等と同じことが実行できる。
コーディング手順は以下のような感じ。
EmbulkEmbedを使うJavaプロジェクトを作成する為のGradleのbuild.gradleは以下のような感じ。
apply plugin: 'java' apply plugin: 'eclipse' task wrapper(type: Wrapper) { gradleVersion '2.5' jarFile file('.buildtools/gradlew.jar') } group = 'example-embulk' version = '0.1-SNAPSHOT' sourceCompatibility = 1.7 targetCompatibility = 1.7 def defaultEncoding = 'UTF-8' [compileJava, compileTestJava]*.options*.encoding = defaultEncoding repositories { mavenCentral() jcenter() } dependencies { compile group: 'org.embulk', name : 'embulk-core', version: '0.7.3' compile group: 'org.embulk', name : 'embulk-standards', version: '0.7.3' testCompile 'junit:junit:4.+' } eclipse.classpath.file { whenMerged { classpath -> classpath.entries.findAll { entry -> entry.kind == 'output' }*.path = 'classes' } }
dependenciesにEmbulkのcoreとstandardsを指定する。(Embulk0.7.3では、standardsが無いと実行時にエラーになる)
これらのライブラリーをダウンロードする為に、repositoriesにjcenterを指定する。
Embulk0.7.0で、EmbulkEmbedクラス内にBootstrapクラスが追加された。
これを使ってEmbulkEmbedインスタンスを生成する。
import org.embulk.EmbulkEmbed; import org.embulk.EmbulkEmbed.Bootstrap;
Bootstrap bootstrap = new EmbulkEmbed.Bootstrap(); // bootstrap = bootstrap.addModules(new MyModule()); // EmbulkEmbed embulk = bootstrap.initialize(); // シャットダウンフックを登録する EmbulkEmbed embulk = bootstrap.initializeCloseable(); // シャットダウンフックを登録しない try { 〜 } finally { embulk.destroy(); }
まず、Bootstrapインスタンスを生成する。
例えばインジェクション用の独自のModuleを追加したい場合はaddModules()を呼んでおく。
その後、Bootstrapのinitialize()またはinitializeCloseable()でEmbulkEmbedインスタンスを生成する。
initialize()とinitializeCloseable()の違いは、Javaアプリケーションの終了時にdestroy()が呼ばれるかどうか。(シャットダウンフックにdestroy()の呼び出しが登録されるかどうか)
個人的にはリソースのクローズはプログラマーが明示的に呼び出すべきだと思う。
なお、EmbulkEmbedはAutoCloseableを実装していないので、Java7のtry-with-resource構文は使えない。
データ入出力の定義(YAMLファイル相当)はConfigSource(インターフェース)で扱う。
import org.embulk.config.ConfigSource;
具象クラスはDataSourceImplで、内部ではJSONオブジェクトとして保持しているようだ。
ConfigSourceの生成にはEmbulkEmbedから取得したConfigLoaderを使う。[2015-08-30]
(Bootstrapから取得したConfigLoaderを使うとエラーが発生するので注意)
import org.embulk.config.ConfigLoader;
ConfigLoader loader = embulk.newConfigLoader();
ConfigSourceを作るには、YAMLファイルに定義を書いておいて、それを読み込むのが一番分かりやすい。
import java.io.File;
ConfigSource config = loader.fromYamlFile(new File("/tmp/embulk/embulk-example/config.yml")); // InputStreamから作成する例 // ConfigSource config = loader.fromYaml(new FileInputStream("/tmp/embulk/embulk-example/config.yml"));
なお、ファイルはUnicode(UTF-8やUTF-16に対応しているっぽい)として読み込むようだ。
ファイルからでなく、YAMLの文字列を指定するメソッドもある。
他にも、JSONファイルを読み込んだり、JSON文字列から解析したりするメソッドが用意されている。
ConfigSource内の値はgetメソッドで取得できる。
値の型が分かっている場合は、その型を指定してget()を呼び出す。
値の構造をそのままで取得する場合はgetNested()を呼び出す。
Map<?, ?> inMap = config.get(Map.class, "in"); // inをMapとして取得 System.out.println(inMap); ConfigSource in = config.getNested("in"); // inを構造そのままで取得 String type = in.get(String.class, "type"); PluginType pluginType = in.get(PluginType.class, "type"); // PluginTypeがJSON変換に対応しているので、PluginTypeとして取得可能
(ファイルから読み込んだりせずに)空の状態のConfigSourceを作るには、ConfigLoaderのnewConfigSource()を使う。
ConfigSource config = loader.newConfigSource(); Map<String, Object> inMap = new HashMap<>(); inMap.put("type", "file"); config.set("in", inMap); Map<String, Object> outMap = new HashMap<>(); outMap.put("type", "stdout"); config.set("out", outMap);
embulkコマンドで指定するコマンド名とEmbulkEmbedクラスのメソッド名は大体同じなので、分かりやすい。
データを読み込んで定義を推測する。
推測した定義がConfigDiffとして返ってくるので、元の定義とマージすれば、後続のpreviewやrunで使用できる。 [2015-08-30]
import org.embulk.config.ConfigDiff;
ConfigDiff diff = embulk.guess(config); config.merge(diff); // configをpreviewやrunで使う
元の定義をとっておきたい場合は、deepCopy()で複製を作れる。[2015-08-30]
ConfigSource copy = config.deepCopy(); ConfigDiff diff = embulk.guess(config); copy.merge(diff); // copyをpreviewやrunで使う
データの読み込みだけ行う。
import org.embulk.exec.PreviewResult; import org.embulk.spi.Column; import org.embulk.spi.Page; import org.embulk.spi.Schema;
PreviewResult result = embulk.preview(config); Schema schema = result.getSchema(); List<Column> columns = schema.getColumns(); System.out.println(columns); for (Page page : result.getPages()) { List<String> list = page.getStringReferences(); for (String s : list) { System.out.println(s); } }
ただ、page.getStringReferences()で取れる値は、スキーマでstringとして定義されているデータだけのようだ。[2015-08-30]
(レコード(行)の終わりも判別できない)
レコード(行)毎に全カラムのデータを取得するには、PageReaderを使うのが良さそう。[2015-08-30]
import org.embulk.spi.Column; import org.embulk.spi.ColumnVisitor; import org.embulk.spi.Page; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.time.Timestamp;
PreviewResult result = embulk.preview(config); Schema schema = result.getSchema(); // @see org.embulk.standards.StdoutOutputPlugin // @see org.embulk.spi.util.PagePrinter try (PageReader reader = new PageReader(schema)) { MyColumnVisitor visitor = new MyColumnVisitor(reader); for (Page page : result.getPages()) { reader.setPage(page); while (reader.nextRecord()) { System.out.println("----"); schema.visitColumns(visitor); } } }
static class MyColumnVisitor implements ColumnVisitor { private PageReader reader; public MyColumnVisitor(PageReader reader) { this.reader = reader; } @Override public void timestampColumn(Column column) { Timestamp value = reader.getTimestamp(column); System.out.println(value); } @Override public void stringColumn(Column column) { String value = reader.getString(column); System.out.println(value); } @Override public void longColumn(Column column) { long value = reader.getLong(column); System.out.println(value); } @Override public void doubleColumn(Column column) { double value = reader.getDouble(column); System.out.println(value); } @Override public void booleanColumn(Column column) { boolean value = reader.getBoolean(column); System.out.println(value); } }
schema.visitColumns()では、(YAMLファイルの)columnsで定義された順番で呼ばれる。
データを読み込み、出力する。
import org.embulk.exec.ExecutionResult;
ExecutionResult result = embulk.run(config); List<Throwable> exceptions = result.getIgnoredExceptions(); System.out.println(exceptions);
特にエラーが起きなければ、exceptionsは空リストになっている。[2015-08-30]
import org.embulk.EmbulkEmbed.ResumableResult; import org.embulk.exec.ResumeState;
ResumableResult result = embulk.runResumable(config); ResumeState state = result.getResumeState();
EmbulkEmbedを使ってデータを読み込み、出力は自分のプログラムで行う、という例を考えてみる。[2015-09-05]
いわば、ローカルの(自分のプログラムでしか使わない)アウトプットプラグインを作るような感じ。
まず、呼び出すプラグインを作る。
import java.util.List; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Exec; import org.embulk.spi.OutputPlugin; import org.embulk.spi.Page; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput;
public class MyOutputPlugin implements OutputPlugin { /** プラグインのタイプ名 */ public static final String TYPE = "myout";
/** プラグインで使用するデータ */ public interface PluginTask extends Task { }
@Override public ConfigDiff transaction(ConfigSource config, Schema schema, int taskCount, OutputPlugin.Control control) { final PluginTask task = config.loadConfig(PluginTask.class); return resume(task.dump(), schema, taskCount, control); }
@Override public ConfigDiff resume(TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) { control.run(taskSource); return Exec.newConfigDiff(); }
@Override public void cleanup(TaskSource taskSource, Schema schema, int taskCount, List<TaskReport> successTaskReports) { }
@Override public TransactionalPageOutput open(TaskSource taskSource, final Schema schema, int taskIndex) { // taskSourceはtransaction()のtask.dump()とは別インスタンス // taskSourceは複数のOutputPluginスレッドで同じインスタンス // final PluginTask task = taskSource.loadTask(PluginTask.class); final PageReader reader = new PageReader(schema); final MyColumnVisitor visitor = new MyColumnVisitor(reader); return new TransactionalPageOutput() { @Override public void add(Page page) { reader.setPage(page); while (reader.nextRecord()) { // ここで自前の出力を行う schema.visitColumns(visitor); } } @Override public void finish() { // ここで自前の出力のフラッシュを行う System.out.flush(); } @Override public void close() { reader.close(); } @Override public void abort() { } @Override public TaskReport commit() { return Exec.newTaskReport(); } }; } }
アウトプットプラグインはOutputPluginインターフェースを実装する。
主に独自実装しないといけないのはopenメソッド。
この中で、読み込まれたデータを受け取って出力する。
アウトプットプラグインのインスタンスは実行するスレッドの個数分作られる。つまりマルチスレッドで動かせるようになっていなければならない。
スレッド数は、主に入力データを分割した個数から算出されるようだ。(つまりインプットプラグイン依存)
また、これらのスレッドはメインスレッドとも別なので、メインスレッドとデータの受け渡しをしたい場合は考慮する必要がある。
自作したプラグインをEmbulkEmbedから呼び出すようにするには、インジェクション機構の「モジュール」を使用する。
import org.embulk.plugin.InjectedPluginSource; import org.embulk.spi.OutputPlugin; import com.google.inject.Binder; import com.google.inject.Module;
// @see org.embulk.standards.StandardPluginModule static class MyModule implements Module { @Override public void configure(Binder binder) { // output plugins InjectedPluginSource.registerPluginTo(binder, OutputPlugin.class, MyOutputPlugin.TYPE, MyOutputPlugin.class); } }
registerPluginToメソッドの第2引数は登録したいプラグインの型(インターフェース)、
第3引数はデータ定義(YAMLファイル)の「type」で指定するタイプ名、
第4引数が登録したいプラグインクラスとなる。
プラグインインスタンスを自分で作りたい場合は、インジェクション機構のプロバイダー機能を使う。
import org.embulk.spi.OutputPlugin; import com.google.inject.Provider;
private static final Provider<? extends MyOutputPlugin> MY_OUTPUT_PLUGIN_PROVIDER = new Provider<MyOutputPlugin>() { @Override public MyOutputPlugin get() { return new MyOutputPlugin(); } };
// @see org.embulk.standards.StandardPluginModule static class MyModule implements Module { @Override public void configure(Binder binder) { // output plugins InjectedPluginSource.registerPluginTo(binder, OutputPlugin.class, MyOutputPlugin.TYPE, MyOutputPlugin.class); binder.bind(MyOutputPlugin.class).toProvider(MY_OUTPUT_PLUGIN_PROVIDER); } }
プラグインインスタンスの生成を自分の制御下に置けば、メインスレッドとデータの受け渡しをするのに使えそうな気がする。
作成したモジュールは、Bootstrap(EmbulkEmbedのインスタンスを作る際に使用する)に登録する。
import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.embulk.EmbulkEmbed; import org.embulk.EmbulkEmbed.Bootstrap; import org.embulk.config.ConfigLoader; import org.embulk.config.ConfigSource;
public class EmbulkOutputExample { private static final Provider<? extends MyOutputPlugin> MY_OUTPUT_PLUGIN_PROVIDER = 〜; static class MyModule implements Module { 〜 }
public static void main(String... args) throws IOException { File file = new File("D:/cygwin/tmp/embulk/embulk-example/config.yml"); EmbulkEmbed embulk = newEmbulkEmbed(); try { ConfigLoader loader = embulk.newConfigLoader(); ConfigSource config = loader.fromYamlFile(file); Map<String, Object> out = new HashMap<>(); out.put("type", MyOutputPlugin.TYPE); config.set("out", out); embulk.run(config); } finally { embulk.destroy(); } }
static EmbulkEmbed newEmbulkEmbed() { Bootstrap bootstrap = new EmbulkEmbed.Bootstrap(); bootstrap = bootstrap.addModules(new MyModule()); EmbulkEmbed embulk = bootstrap.initializeCloseable(); return embulk; }
読み込んだYAMLファイルに対し、outのtypeを自作プラグインのタイプ名に変更している。これで、自分のアウトプットプラグインが呼ばれるようになる。
Set<PluginType>のインジェクションが出来なくて?、GuessExecutorが生成できないというエラー。(Embulk0.7.3)
Exception in thread "main" com.google.inject.ConfigurationException: Guice configuration errors: 1) No implementation for java.util.Set<org.embulk.plugin.PluginType> annotated with @org.embulk.exec.ForGuess() was bound. while locating java.util.Set<org.embulk.plugin.PluginType> annotated with @org.embulk.exec.ForGuess() for parameter 1 at org.embulk.exec.GuessExecutor.<init>(GuessExecutor.java:68) while locating org.embulk.exec.GuessExecutor 1 error at com.google.inject.internal.InjectorImpl.getProvider(InjectorImpl.java:1042) at com.google.inject.internal.InjectorImpl.getProvider(InjectorImpl.java:1001) at com.google.inject.internal.InjectorImpl.getInstance(InjectorImpl.java:1051) at org.embulk.guice.InjectorProxy.getInstance(InjectorProxy.java:113) at org.embulk.EmbulkEmbed.<init>(EmbulkEmbed.java:129) at org.embulk.EmbulkEmbed$Bootstrap.build(EmbulkEmbed.java:115) at org.embulk.EmbulkEmbed$Bootstrap.initializeCloseable(EmbulkEmbed.java:95) at com.example.embulk.EmbulkExample.main(EmbulkExample.java:39)
エラー発生箇所はEmbulkEmbed.Bootstrapのinitialize()あるいはinitializeCloseable()を呼び出した所。
build.gradleのdependenciesに「embulk-standards」が入っていないと(embulk-coreのみだと)発生する。
embulk-standardsにはMETA-INF/services/org.embulk.spi.ExtensionというSPIのファイルが入っている。
この中にStandardPluginExtensionというクラスが指定されていて、そのクラスにはインジェクション定義用のModuleとしてStandardPluginModuleを返すように記述されている。
StandardPluginModuleの中でPluginTypeをいくつか定義しているようなので、これが影響しているのだと思われる。
コマンド実行時(guess, preview, run)に発生したエラー。(Embulk0.7.3)
Exception in thread "main" org.embulk.config.ConfigException: com.fasterxml.jackson.databind.JsonMappingException: Setting null to a task field is not allowed. Use Optional<T> (com.google.common.base.Optional) to represent null. at org.embulk.config.ModelManager.readObjectWithConfigSerDe(ModelManager.java:75) at org.embulk.config.DataSourceImpl.loadConfig(DataSourceImpl.java:208) at org.embulk.standards.LocalFileInputPlugin.transaction(LocalFileInputPlugin.java:60) at org.embulk.spi.FileInputRunner.transaction(FileInputRunner.java:64) at org.embulk.exec.PreviewExecutor.doPreview(PreviewExecutor.java:93) at org.embulk.exec.PreviewExecutor.access$000(PreviewExecutor.java:27) at org.embulk.exec.PreviewExecutor$1.run(PreviewExecutor.java:67) at org.embulk.exec.PreviewExecutor$1.run(PreviewExecutor.java:63) at org.embulk.spi.Exec.doWith(Exec.java:25) at org.embulk.exec.PreviewExecutor.preview(PreviewExecutor.java:63) at org.embulk.EmbulkEmbed.preview(EmbulkEmbed.java:168) at com.example.embulk.EmbulkExample.main(EmbulkExample.java:65) Caused by: com.fasterxml.jackson.databind.JsonMappingException: Setting null to a task field is not allowed. Use Optional<T> (com.google.common.base.Optional) to represent null. at org.embulk.config.TaskSerDe$TaskDeserializer.deserialize(TaskSerDe.java:172) at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3534) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:1870) at org.embulk.config.ModelManager.readObjectWithConfigSerDe(ModelManager.java:72) ... 11 more
Optional(これはJava8のOptionalではなく、google.commonのライブラリー)にnullをセットしようとしてエラーになっているらしい。
エラーメッセージ上にはどのフィールドでエラーになったのかは出ていないが、デバッグで見てみたところ、last_pathというフィールドのようだった。
ここではexampleコマンドで作ったconfig.ymlを読み込んでいるのだが、確かにconfig.ymlにはlast_pathは指定されていない。
で、config.ymlに「last_path: ''
」を追加して再実行したところ、エラー内容が変わった。
Exception in thread "main" org.embulk.config.ConfigException: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of com.google.common.base.Optional, problem: abstract types either need to be mapped to concrete types, have custom deserializer, or be instantiated with additional type information at [Source: N/A; line: -1, column: -1] at org.embulk.config.ModelManager.readObjectWithConfigSerDe(ModelManager.java:75) at org.embulk.config.DataSourceImpl.loadConfig(DataSourceImpl.java:208) at org.embulk.standards.LocalFileInputPlugin.transaction(LocalFileInputPlugin.java:60) at org.embulk.spi.FileInputRunner.transaction(FileInputRunner.java:64) at org.embulk.exec.PreviewExecutor.doPreview(PreviewExecutor.java:93) at org.embulk.exec.PreviewExecutor.access$000(PreviewExecutor.java:27) at org.embulk.exec.PreviewExecutor$1.run(PreviewExecutor.java:67) at org.embulk.exec.PreviewExecutor$1.run(PreviewExecutor.java:63) at org.embulk.spi.Exec.doWith(Exec.java:25) at org.embulk.exec.PreviewExecutor.preview(PreviewExecutor.java:63) at org.embulk.EmbulkEmbed.preview(EmbulkEmbed.java:168) at com.example.embulk.EmbulkExample.main(EmbulkExample.java:65) Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of com.google.common.base.Optional, problem: abstract types either need to be mapped to concrete types, have custom deserializer, or be instantiated with additional type information at [Source: N/A; line: -1, column: -1] at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148) at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:857) at com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserialize(AbstractDeserializer.java:139) at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3534) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:1894) at org.embulk.config.TaskSerDe$TaskDeserializer.deserialize(TaskSerDe.java:157) at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3534) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:1870) at org.embulk.config.ModelManager.readObjectWithConfigSerDe(ModelManager.java:72) ... 11 more
AbstractDeserializerのdeserialize()に行っちゃって、これが常に例外を発生させるようになっている(要するにそこへ来てはいけない)。
いずれにしてもlast_pathの値をOptionalに入れるところでエラーになってるっぽい。
ConfigSourceを作る為のローダーにbootstrapのSystemConfigLoaderを使っていると、上記のようなエラーになるらしい。(by frsyukiさん)[2015-08-30]
EmbulkEmbedのnewConfigLoader()を使うのが正しい。
ConfigLoader loader = bootstrap.getSystemConfigLoader(); // NG ConfigSource config = loader.fromYamlFile(new File("/tmp/embulk/embulk-example/config.yml"));
↓
ConfigLoader loader = embulk.newConfigLoader(); // OK ConfigSource config = loader.fromYamlFile(new File("/tmp/embulk/embulk-example/config.yml"));