S-JIS[2015-08-29/2015-09-05] 変更履歴

EmbulkEmbed

EmbulkをJavaから実行する方法。


概要

Javaアプリケーションの中からEmbulkを実行する為のクラスとして、Embulk0.6.22でEmbulkEmbedというクラスが導入された。

embulkコマンドのguess・preview・run等と同じことが実行できる。


コーディング手順は以下のような感じ。

  1. EmbulkEmbedインスタンスを生成する
  2. embulkコマンドで指定するconfig.ymlと同等の定義を作成する
  3. コマンド(guess, preview, run等)を実行する。

開発環境の構築

EmbulkEmbedを使うJavaプロジェクトを作成する為のGradleのbuild.gradleは以下のような感じ。

build.gradle

apply plugin: 'java'
apply plugin: 'eclipse'

task wrapper(type: Wrapper) {
    gradleVersion '2.5'
    jarFile file('.buildtools/gradlew.jar')
}

group = 'example-embulk'
version = '0.1-SNAPSHOT'

sourceCompatibility = 1.7
targetCompatibility = 1.7

def defaultEncoding = 'UTF-8'
[compileJava, compileTestJava]*.options*.encoding = defaultEncoding

repositories {
    mavenCentral()
    jcenter()
}

dependencies {
    compile group: 'org.embulk', name : 'embulk-core', version: '0.7.3'
    compile group: 'org.embulk', name : 'embulk-standards', version: '0.7.3'
    testCompile 'junit:junit:4.+'
}

eclipse.classpath.file {
    whenMerged { classpath ->
        classpath.entries.findAll { entry -> entry.kind == 'output' }*.path = 'classes'
    }
}

dependenciesにEmbulkのcoreとstandardsを指定する。(Embulk0.7.3では、standardsが無いと実行時にエラーになる
これらのライブラリーをダウンロードする為に、repositoriesにjcenterを指定する。


EmbulkEmbedインスタンスの生成

Embulk0.7.0で、EmbulkEmbedクラス内にBootstrapクラスが追加された。
これを使ってEmbulkEmbedインスタンスを生成する。

import org.embulk.EmbulkEmbed;
import org.embulk.EmbulkEmbed.Bootstrap;
		Bootstrap bootstrap = new EmbulkEmbed.Bootstrap();
//		bootstrap = bootstrap.addModules(new MyModule());

//		EmbulkEmbed embulk = bootstrap.initialize();           // シャットダウンフックを登録する
		EmbulkEmbed embulk = bootstrap.initializeCloseable(); // シャットダウンフックを登録しない
		try {
			
		} finally {
			embulk.destroy();
		}

まず、Bootstrapインスタンスを生成する。
例えばインジェクション用の独自のModuleを追加したい場合はaddModules()を呼んでおく。

その後、Bootstrapのinitialize()またはinitializeCloseable()でEmbulkEmbedインスタンスを生成する。
initialize()とinitializeCloseable()の違いは、Javaアプリケーションの終了時にdestroy()が呼ばれるかどうか。(シャットダウンフックにdestroy()の呼び出しが登録されるかどうか)

個人的にはリソースのクローズはプログラマーが明示的に呼び出すべきだと思う。
なお、EmbulkEmbedはAutoCloseableを実装していないので、Java7のtry-with-resource構文は使えない。


ConfigSourceの作成

データ入出力の定義(YAMLファイル相当)はConfigSource(インターフェース)で扱う。

import org.embulk.config.ConfigSource;

具象クラスはDataSourceImplで、内部ではJSONオブジェクトとして保持しているようだ。


ConfigSourceの生成にはEmbulkEmbedから取得したConfigLoaderを使う。[2015-08-30]
(Bootstrapから取得したConfigLoaderを使うとエラーが発生するので注意)

import org.embulk.config.ConfigLoader;
		ConfigLoader loader = embulk.newConfigLoader();

ConfigSourceを作るには、YAMLファイルに定義を書いておいて、それを読み込むのが一番分かりやすい。

import java.io.File;
		ConfigSource config = loader.fromYamlFile(new File("/tmp/embulk/embulk-example/config.yml"));

		// InputStreamから作成する例
//		ConfigSource config = loader.fromYaml(new FileInputStream("/tmp/embulk/embulk-example/config.yml"));

なお、ファイルはUnicode(UTF-8やUTF-16に対応しているっぽい)として読み込むようだ。

ファイルからでなく、YAMLの文字列を指定するメソッドもある。

他にも、JSONファイルを読み込んだり、JSON文字列から解析したりするメソッドが用意されている。


ConfigSource内の値はgetメソッドで取得できる。
値の型が分かっている場合は、その型を指定してget()を呼び出す。
値の構造をそのままで取得する場合はgetNested()を呼び出す。

		Map<?, ?> inMap = config.get(Map.class, "in"); // inをMapとして取得
		System.out.println(inMap);

		ConfigSource in = config.getNested("in");	// inを構造そのままで取得
		String type = in.get(String.class, "type");

		PluginType pluginType = in.get(PluginType.class, "type"); // PluginTypeがJSON変換に対応しているので、PluginTypeとして取得可能

(ファイルから読み込んだりせずに)空の状態のConfigSourceを作るには、ConfigLoaderのnewConfigSource()を使う。

		ConfigSource config = loader.newConfigSource();

		Map<String, Object> inMap = new HashMap<>();
		inMap.put("type", "file");
		config.set("in", inMap);

		Map<String, Object> outMap = new HashMap<>();
		outMap.put("type", "stdout");
		config.set("out", outMap);

コマンドの実行

embulkコマンドで指定するコマンド名とEmbulkEmbedクラスのメソッド名は大体同じなので、分かりやすい。


guess

データを読み込んで定義を推測する。
推測した定義がConfigDiffとして返ってくるので、元の定義とマージすれば、後続のpreviewrunで使用できる。 [2015-08-30]

import org.embulk.config.ConfigDiff;
		ConfigDiff diff = embulk.guess(config);
		config.merge(diff);
		// configをpreviewやrunで使う

元の定義をとっておきたい場合は、deepCopy()で複製を作れる。[2015-08-30]

		ConfigSource copy = config.deepCopy();
		ConfigDiff diff = embulk.guess(config);
		copy.merge(diff);
		// copyをpreviewやrunで使う

preview

データの読み込みだけ行う。

import org.embulk.exec.PreviewResult;
import org.embulk.spi.Column;
import org.embulk.spi.Page;
import org.embulk.spi.Schema;
		PreviewResult result = embulk.preview(config);

		Schema schema = result.getSchema();
		List<Column> columns = schema.getColumns();
		System.out.println(columns);

		for (Page page : result.getPages()) {
			List<String> list = page.getStringReferences();
			for (String s : list) {
				System.out.println(s);
			}
		}

ただ、page.getStringReferences()で取れる値は、スキーマでstringとして定義されているデータだけのようだ。[2015-08-30]
(レコード(行)の終わりも判別できない)


レコード(行)毎に全カラムのデータを取得するには、PageReaderを使うのが良さそう。[2015-08-30]

import org.embulk.spi.Column;
import org.embulk.spi.ColumnVisitor;
import org.embulk.spi.Page;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.time.Timestamp;
		PreviewResult result = embulk.preview(config);
		Schema schema = result.getSchema();

		// @see org.embulk.standards.StdoutOutputPlugin
		// @see org.embulk.spi.util.PagePrinter
		try (PageReader reader = new PageReader(schema)) {
			MyColumnVisitor visitor = new MyColumnVisitor(reader);

			for (Page page : result.getPages()) {
				reader.setPage(page);

				while (reader.nextRecord()) {
					System.out.println("----");
					schema.visitColumns(visitor);
				}
			}
		}
	static class MyColumnVisitor implements ColumnVisitor {

		private PageReader reader;

		public MyColumnVisitor(PageReader reader) {
			this.reader = reader;
		}

		@Override
		public void timestampColumn(Column column) {
			Timestamp value = reader.getTimestamp(column);
			System.out.println(value);
		}

		@Override
		public void stringColumn(Column column) {
			String value = reader.getString(column);
			System.out.println(value);
		}

		@Override
		public void longColumn(Column column) {
			long value = reader.getLong(column);
			System.out.println(value);
		}

		@Override
		public void doubleColumn(Column column) {
			double value = reader.getDouble(column);
			System.out.println(value);
		}

		@Override
		public void booleanColumn(Column column) {
			boolean value = reader.getBoolean(column);
			System.out.println(value);
		}
	}

schema.visitColumns()では、(YAMLファイルの)columnsで定義された順番で呼ばれる。


run

データを読み込み、出力する。

import org.embulk.exec.ExecutionResult;
		ExecutionResult result = embulk.run(config);
		List<Throwable> exceptions = result.getIgnoredExceptions();
		System.out.println(exceptions);

特にエラーが起きなければ、exceptionsは空リストになっている。[2015-08-30]


runResumable

import org.embulk.EmbulkEmbed.ResumableResult;
import org.embulk.exec.ResumeState;
		ResumableResult result = embulk.runResumable(config);
		ResumeState state = result.getResumeState();

プラグイン呼び出し

EmbulkEmbedを使ってデータを読み込み、出力は自分のプログラムで行う、という例を考えてみる。[2015-09-05]
いわば、ローカルの(自分のプログラムでしか使わない)アウトプットプラグインを作るような感じ。


プラグインの作成

まず、呼び出すプラグインを作る。

MyOutputPlugin.java:

import java.util.List;

import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.Exec;
import org.embulk.spi.OutputPlugin;
import org.embulk.spi.Page;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
public class MyOutputPlugin implements OutputPlugin {

	/** プラグインのタイプ名 */
	public static final String TYPE = "myout";
	/** プラグインで使用するデータ */
	public interface PluginTask extends Task {
	}
	@Override
	public ConfigDiff transaction(ConfigSource config, Schema schema, int taskCount, OutputPlugin.Control control) {
		final PluginTask task = config.loadConfig(PluginTask.class);
		return resume(task.dump(), schema, taskCount, control);
	}
	@Override
	public ConfigDiff resume(TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) {
		control.run(taskSource);
		return Exec.newConfigDiff();
	}
	@Override
	public void cleanup(TaskSource taskSource, Schema schema, int taskCount, List<TaskReport> successTaskReports) {
	}
	@Override
	public TransactionalPageOutput open(TaskSource taskSource, final Schema schema, int taskIndex) {
		// taskSourceはtransaction()のtask.dump()とは別インスタンス
		// taskSourceは複数のOutputPluginスレッドで同じインスタンス

		// final PluginTask task = taskSource.loadTask(PluginTask.class);

		final PageReader reader = new PageReader(schema);
		final MyColumnVisitor visitor = new MyColumnVisitor(reader);

		return new TransactionalPageOutput() {

			@Override
			public void add(Page page) {
				reader.setPage(page);
				while (reader.nextRecord()) {
					// ここで自前の出力を行う
					schema.visitColumns(visitor);
				}
			}

			@Override
			public void finish() {
				// ここで自前の出力のフラッシュを行う
				System.out.flush();
			}

			@Override
			public void close() {
				reader.close();
			}

			@Override
			public void abort() {
			}

			@Override
			public TaskReport commit() {
				return Exec.newTaskReport();
			}
		};
	}
}

アウトプットプラグインはOutputPluginインターフェースを実装する。
主に独自実装しないといけないのはopenメソッド。
この中で、読み込まれたデータを受け取って出力する。

アウトプットプラグインのインスタンスは実行するスレッドの個数分作られる。つまりマルチスレッドで動かせるようになっていなければならない。
スレッド数は、主に入力データを分割した個数から算出されるようだ。(つまりインプットプラグイン依存)
また、これらのスレッドはメインスレッドとも別なので、メインスレッドとデータの受け渡しをしたい場合は考慮する必要がある。


モジュールの作成

自作したプラグインをEmbulkEmbedから呼び出すようにするには、インジェクション機構の「モジュール」を使用する。

import org.embulk.plugin.InjectedPluginSource;

import org.embulk.spi.OutputPlugin;

import com.google.inject.Binder;
import com.google.inject.Module;
	// @see org.embulk.standards.StandardPluginModule
	static class MyModule implements Module {

		@Override
		public void configure(Binder binder) {
			// output plugins
			InjectedPluginSource.registerPluginTo(binder, OutputPlugin.class, MyOutputPlugin.TYPE, MyOutputPlugin.class);
		}
	}

registerPluginToメソッドの第2引数は登録したいプラグインの型(インターフェース)、
第3引数はデータ定義(YAMLファイル)の「type」で指定するタイプ名、
第4引数が登録したいプラグインクラスとなる。


プラグインインスタンスを自分で作りたい場合は、インジェクション機構のプロバイダー機能を使う。

import org.embulk.spi.OutputPlugin;

import com.google.inject.Provider;
	private static final Provider<? extends MyOutputPlugin> MY_OUTPUT_PLUGIN_PROVIDER = new Provider<MyOutputPlugin>() {

		@Override
		public MyOutputPlugin get() {
			return new MyOutputPlugin();
		}
	};
	// @see org.embulk.standards.StandardPluginModule
	static class MyModule implements Module {

		@Override
		public void configure(Binder binder) {
			// output plugins
			InjectedPluginSource.registerPluginTo(binder, OutputPlugin.class, MyOutputPlugin.TYPE, MyOutputPlugin.class);

			binder.bind(MyOutputPlugin.class).toProvider(MY_OUTPUT_PLUGIN_PROVIDER);
		}
	}

プラグインインスタンスの生成を自分の制御下に置けば、メインスレッドとデータの受け渡しをするのに使えそうな気がする。


Bootstrapへの登録

作成したモジュールは、Bootstrap(EmbulkEmbedのインスタンスを作る際に使用する)に登録する。

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.embulk.EmbulkEmbed;
import org.embulk.EmbulkEmbed.Bootstrap;
import org.embulk.config.ConfigLoader;
import org.embulk.config.ConfigSource;
public class EmbulkOutputExample {

	private static final Provider<? extends MyOutputPlugin> MY_OUTPUT_PLUGIN_PROVIDER = 〜;

	static class MyModule implements Module { 〜 }
	public static void main(String... args) throws IOException {
		File file = new File("D:/cygwin/tmp/embulk/embulk-example/config.yml");

		EmbulkEmbed embulk = newEmbulkEmbed();
		try {
			ConfigLoader loader = embulk.newConfigLoader();
			ConfigSource config = loader.fromYamlFile(file);
			Map<String, Object> out = new HashMap<>();
			out.put("type", MyOutputPlugin.TYPE);
			config.set("out", out);

			embulk.run(config);
		} finally {
			embulk.destroy();
		}
	}
	static EmbulkEmbed newEmbulkEmbed() {
		Bootstrap bootstrap = new EmbulkEmbed.Bootstrap();
		bootstrap = bootstrap.addModules(new MyModule());
		EmbulkEmbed embulk = bootstrap.initializeCloseable();
		return embulk;
	}

読み込んだYAMLファイルに対し、outのtypeを自作プラグインのタイプ名に変更している。これで、自分のアウトプットプラグインが呼ばれるようになる。


試していたときに発生したエラー

GuessExecutor生成エラー

Set<PluginType>のインジェクションが出来なくて?、GuessExecutorが生成できないというエラー。(Embulk0.7.3)

Exception in thread "main" com.google.inject.ConfigurationException: Guice configuration errors:

1) No implementation for java.util.Set<org.embulk.plugin.PluginType> annotated with @org.embulk.exec.ForGuess() was bound.
  while locating java.util.Set<org.embulk.plugin.PluginType> annotated with @org.embulk.exec.ForGuess()
    for parameter 1 at org.embulk.exec.GuessExecutor.<init>(GuessExecutor.java:68)
  while locating org.embulk.exec.GuessExecutor

1 error
	at com.google.inject.internal.InjectorImpl.getProvider(InjectorImpl.java:1042)
	at com.google.inject.internal.InjectorImpl.getProvider(InjectorImpl.java:1001)
	at com.google.inject.internal.InjectorImpl.getInstance(InjectorImpl.java:1051)
	at org.embulk.guice.InjectorProxy.getInstance(InjectorProxy.java:113)
	at org.embulk.EmbulkEmbed.<init>(EmbulkEmbed.java:129)
	at org.embulk.EmbulkEmbed$Bootstrap.build(EmbulkEmbed.java:115)
	at org.embulk.EmbulkEmbed$Bootstrap.initializeCloseable(EmbulkEmbed.java:95)
	at com.example.embulk.EmbulkExample.main(EmbulkExample.java:39)

エラー発生箇所はEmbulkEmbed.Bootstrapのinitialize()あるいはinitializeCloseable()を呼び出した所。

build.gradleのdependenciesに「embulk-standards」が入っていないと(embulk-coreのみだと)発生する。


embulk-standardsにはMETA-INF/services/org.embulk.spi.ExtensionというSPIのファイルが入っている。
この中にStandardPluginExtensionというクラスが指定されていて、そのクラスにはインジェクション定義用のModuleとしてStandardPluginModuleを返すように記述されている。
StandardPluginModuleの中でPluginTypeをいくつか定義しているようなので、これが影響しているのだと思われる。


Optional初期化エラー

コマンド実行時(guess, preview, run)に発生したエラー。(Embulk0.7.3)

Exception in thread "main" org.embulk.config.ConfigException: com.fasterxml.jackson.databind.JsonMappingException: Setting null to a task field is not allowed. Use Optional<T> (com.google.common.base.Optional) to represent null.
	at org.embulk.config.ModelManager.readObjectWithConfigSerDe(ModelManager.java:75)
	at org.embulk.config.DataSourceImpl.loadConfig(DataSourceImpl.java:208)
	at org.embulk.standards.LocalFileInputPlugin.transaction(LocalFileInputPlugin.java:60)
	at org.embulk.spi.FileInputRunner.transaction(FileInputRunner.java:64)
	at org.embulk.exec.PreviewExecutor.doPreview(PreviewExecutor.java:93)
	at org.embulk.exec.PreviewExecutor.access$000(PreviewExecutor.java:27)
	at org.embulk.exec.PreviewExecutor$1.run(PreviewExecutor.java:67)
	at org.embulk.exec.PreviewExecutor$1.run(PreviewExecutor.java:63)
	at org.embulk.spi.Exec.doWith(Exec.java:25)
	at org.embulk.exec.PreviewExecutor.preview(PreviewExecutor.java:63)
	at org.embulk.EmbulkEmbed.preview(EmbulkEmbed.java:168)
	at com.example.embulk.EmbulkExample.main(EmbulkExample.java:65)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Setting null to a task field is not allowed. Use Optional<T> (com.google.common.base.Optional) to represent null.
	at org.embulk.config.TaskSerDe$TaskDeserializer.deserialize(TaskSerDe.java:172)
	at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3534)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:1870)
	at org.embulk.config.ModelManager.readObjectWithConfigSerDe(ModelManager.java:72)
	... 11 more

Optional(これはJava8のOptionalではなく、google.commonのライブラリー)にnullをセットしようとしてエラーになっているらしい。
エラーメッセージ上にはどのフィールドでエラーになったのかは出ていないが、デバッグで見てみたところ、last_pathというフィールドのようだった。
ここではexampleコマンドで作ったconfig.ymlを読み込んでいるのだが、確かにconfig.ymlにはlast_pathは指定されていない。


で、config.ymlに「last_path: ''」を追加して再実行したところ、エラー内容が変わった。

Exception in thread "main" org.embulk.config.ConfigException: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of com.google.common.base.Optional, problem: abstract types either need to be mapped to concrete types, have custom deserializer, or be instantiated with additional type information
 at [Source: N/A; line: -1, column: -1]
	at org.embulk.config.ModelManager.readObjectWithConfigSerDe(ModelManager.java:75)
	at org.embulk.config.DataSourceImpl.loadConfig(DataSourceImpl.java:208)
	at org.embulk.standards.LocalFileInputPlugin.transaction(LocalFileInputPlugin.java:60)
	at org.embulk.spi.FileInputRunner.transaction(FileInputRunner.java:64)
	at org.embulk.exec.PreviewExecutor.doPreview(PreviewExecutor.java:93)
	at org.embulk.exec.PreviewExecutor.access$000(PreviewExecutor.java:27)
	at org.embulk.exec.PreviewExecutor$1.run(PreviewExecutor.java:67)
	at org.embulk.exec.PreviewExecutor$1.run(PreviewExecutor.java:63)
	at org.embulk.spi.Exec.doWith(Exec.java:25)
	at org.embulk.exec.PreviewExecutor.preview(PreviewExecutor.java:63)
	at org.embulk.EmbulkEmbed.preview(EmbulkEmbed.java:168)
	at com.example.embulk.EmbulkExample.main(EmbulkExample.java:65)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of com.google.common.base.Optional, problem: abstract types either need to be mapped to concrete types, have custom deserializer, or be instantiated with additional type information
 at [Source: N/A; line: -1, column: -1]
	at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
	at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:857)
	at com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserialize(AbstractDeserializer.java:139)
	at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3534)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:1894)
	at org.embulk.config.TaskSerDe$TaskDeserializer.deserialize(TaskSerDe.java:157)
	at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3534)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:1870)
	at org.embulk.config.ModelManager.readObjectWithConfigSerDe(ModelManager.java:72)
	... 11 more

AbstractDeserializerのdeserialize()に行っちゃって、これが常に例外を発生させるようになっている(要するにそこへ来てはいけない)。
いずれにしてもlast_pathの値をOptionalに入れるところでエラーになってるっぽい。


ConfigSourceを作る為のローダーにbootstrapのSystemConfigLoaderを使っていると、上記のようなエラーになるらしい。(by frsyukiさん)[2015-08-30]
EmbulkEmbedのnewConfigLoader()を使うのが正しい。

		ConfigLoader loader = bootstrap.getSystemConfigLoader(); // NG
		ConfigSource config = loader.fromYamlFile(new File("/tmp/embulk/embulk-example/config.yml"));

		ConfigLoader loader = embulk.newConfigLoader(); // OK
		ConfigSource config = loader.fromYamlFile(new File("/tmp/embulk/embulk-example/config.yml"));

Embulk目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま