Embulk(0.7.5)のパーサープラグインのテスト用クラスのメモ。
|
プラグインの単体テスト用のクラスはEmbulk標準のものは提供されていないので、embulk-input-mysqlのEmbulkPluginTester.javaを参考にしてJavaパーサープラグイン用のテストクラス作ってみた。
EmbulkEmbedに自作プラグインを登録してEmbulkを実行する
という実装方法。
→EmbulkPluginTester
今回作ったEmbulkPluginTesterの特徴は、YAMLファイルを使用しないこと。
パーサープラグインの場合、入力はファイルとなるが、YAMLファイル内にフルパスを記述したくない為。
データの出力先もファイルではなく、Javaオブジェクトのリストとして取得できる。
これを使って期待値との比較を行う。
入力データはファイルにすることも出来るし、List<String>を使うことも出来る。
EmbulkEmbedを使うので、依存ライブラリーにembulk-standardsが必要となる。
〜
dependencies {
compile "org.embulk:embulk-core:0.7.5"
provided "org.embulk:embulk-core:0.7.5"
compile "org.embulk:embulk-standards:0.7.5"
// compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION"
testCompile "junit:junit:4.+"
}
〜
今回作ったJavaパーサープラグイン用EmbulkPluginTesterは以下のようにして使用する。
プロジェクト内のsrc/test/resourcesに配置されたファイルを入力としてテストを実行する例。
import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import java.net.URL; import org.embulk.parser.EmbulkPluginTester; import org.embulk.parser.EmbulkTestOutputPlugin.OutputRecord; import org.embulk.parser.EmbulkTestParserConfig; import org.junit.Test;
public class TestExampleMyformatParserPlugin {
@Test
public void testFile() {
try (EmbulkPluginTester tester = new EmbulkPluginTester()) {
// テスト対象プラグインの登録
tester.addParserPlugin("example_myformat", ExampleMyformatParserPlugin.class);
// parserの定義(YAMLファイルの代わり)
EmbulkTestParserConfig parser = tester.newParserConfig("example_myformat");
parser.addColumn("prefix", "string").set("foo", "abc");
parser.addColumn("suffix", "string");
// 読み込むファイル
URL inFile = getClass().getResource("test1.txt");
// テストの実行
List<OutputRecord> result = tester.runParser(inFile, parser);
assertThat(result.size(), is(2));
OutputRecord r0 = result.get(0);
assertThat(r0.getAsString("prefix"), is("12345678"));
assertThat(r0.getAsString("suffix"), is("90123456789"));
OutputRecord r1 = result.get(1);
assertThat(r1.getAsString("prefix"), is("abcdefgh"));
assertThat(r1.getAsString("suffix"), is("ijklmn"));
}
}
}
「example_myformat」は、プラグインのtype。
EmbulkTestParserConfigは、YAMLファイルのparser部分を表すクラス。
上記のコードは、以下のようなイメージ。
parser:
type: example_myformat
columns:
- {name: prefix, type: string, foo: abc}
- {name: suffix, type: string}
1234567890123456789 abcdefghijklmn
ファイルを使わず、文字列を入力としてテストを実行する例。
import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import org.embulk.parser.EmbulkPluginTester; import org.embulk.parser.EmbulkTestOutputPlugin.OutputRecord; import org.embulk.parser.EmbulkTestParserConfig; import org.junit.Test;
public class TestExampleMyformatParserPlugin {
@Test
public void testText() {
try (EmbulkPluginTester tester = new EmbulkPluginTester()) {
// テスト対象プラグインの登録
tester.addParserPlugin("example_myformat", ExampleMyformatParserPlugin.class);
// parserの定義(YAMLファイルの代わり)
EmbulkTestParserConfig parser = tester.newParserConfig("example_myformat");
parser.addColumn("prefix", "string").set("foo", "abc");
parser.addColumn("suffix", "string");
// 読み込むデータ
List<String> list = new ArrayList<>();
list.add("0123456789012345");
list.add("0123456789abcdef");
// テストの実行
List<OutputRecord> result = tester.runParser(list, parser);
assertThat(result.size(), is(2));
OutputRecord r0 = result.get(0);
assertThat(r0.getAsString("prefix"), is("01234567"));
assertThat(r0.getAsString("suffix"), is("89012345"));
OutputRecord r1 = result.get(1);
assertThat(r1.getAsString("prefix"), is("01234567"));
assertThat(r1.getAsString("suffix"), is("89abcdef"));
}
}
}
「example_myformat」は、プラグインのtype。
EmbulkTestParserConfigは、YAMLファイルのparser部分を表すクラス。
上記のコードは、以下のようなイメージ。
parser:
type: example_myformat
columns:
- {name: prefix, type: string, foo: abc}
- {name: suffix, type: string}
今回作ったEmbulkPluginTesterは以下のようなもの。
テストを実行する入り口。
package org.embulk.parser;
import java.io.Closeable; import java.io.File; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; import java.util.List; import org.embulk.EmbulkEmbed; import org.embulk.EmbulkEmbed.Bootstrap; import org.embulk.config.ConfigLoader; import org.embulk.config.ConfigSource; import org.embulk.parser.EmbulkTestOutputPlugin.OutputRecord; import org.embulk.plugin.InjectedPluginSource; import org.embulk.spi.InputPlugin; import org.embulk.spi.OutputPlugin; import org.embulk.spi.ParserPlugin; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provider;
// @see https://github.com/embulk/embulk-input-jdbc/blob/master/embulk-input-mysql/src/test/java/org/embulk/input/mysql/EmbulkPluginTester.java
public class EmbulkPluginTester implements Closeable {
private static class PluginDefinition {
public final Class<?> iface;
public final String name;
public final Class<?> impl;
public PluginDefinition(Class<?> iface, String name, Class<?> impl) {
this.iface = iface;
this.name = name;
this.impl = impl;
}
}
private final List<PluginDefinition> plugins = new ArrayList<>(); private EmbulkEmbed embulk; private ConfigLoader configLoader; private EmbulkTestFileInputPlugin embulkTestFileInputPlugin = new EmbulkTestFileInputPlugin(); private EmbulkTestOutputPlugin embulkTestOutputPlugin = new EmbulkTestOutputPlugin();
public EmbulkPluginTester() {
}
public EmbulkPluginTester(Class<?> iface, String name, Class<?> impl) {
addPlugin(iface, name, impl);
}
public void addPlugin(Class<?> iface, String name, Class<?> impl) {
plugins.add(new PluginDefinition(iface, name, impl));
}
public void addParserPlugin(String name, Class<? extends ParserPlugin> impl) {
addPlugin(ParserPlugin.class, name, impl);
}
protected EmbulkEmbed getEmbulkEmbed() {
if (embulk == null) {
Bootstrap bootstrap = new EmbulkEmbed.Bootstrap();
bootstrap.addModules(new Module() {
@Override
public void configure(Binder binder) {
EmbulkPluginTester.this.configurePlugin(binder);
for (PluginDefinition plugin : plugins) {
InjectedPluginSource.registerPluginTo(binder, plugin.iface, plugin.name, plugin.impl);
}
}
});
embulk = bootstrap.initializeCloseable();
}
return embulk;
}
protected void configurePlugin(Binder binder) {
// input plugins
InjectedPluginSource.registerPluginTo(binder, InputPlugin.class, EmbulkTestFileInputPlugin.TYPE, EmbulkTestFileInputPlugin.class);
binder.bind(EmbulkTestFileInputPlugin.class).toProvider(new Provider<EmbulkTestFileInputPlugin>() {
@Override
public EmbulkTestFileInputPlugin get() {
return this.embulkTestFileInputPlugin;
}
});
// output plugins
InjectedPluginSource.registerPluginTo(binder, OutputPlugin.class, EmbulkTestOutputPlugin.TYPE, EmbulkTestOutputPlugin.class);
binder.bind(EmbulkTestOutputPlugin.class).toProvider(new Provider<EmbulkTestOutputPlugin>() {
@Override
public EmbulkTestOutputPlugin get() {
return this.embulkTestOutputPlugin;
}
});
}
public ConfigLoader getConfigLoader() {
if (configLoader == null) {
configLoader = getEmbulkEmbed().newConfigLoader();
}
return configLoader;
}
public ConfigSource newConfigSource() {
return getConfigLoader().newConfigSource();
}
public EmbulkTestParserConfig newParserConfig(String type) {
EmbulkTestParserConfig parser = new EmbulkTestParserConfig();
parser.setType(type);
return parser;
}
public List<OutputRecord> runParser(URL inFile, EmbulkTestParserConfig parser) {
File file;
try {
file = new File(inFile.toURI());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
return runParser(file, parser);
}
public List<OutputRecord> runParser(File inFile, EmbulkTestParserConfig parser) {
ConfigSource in = newConfigSource();
in.set("type", "file");
in.set("path_prefix", inFile.getAbsolutePath());
in.set("parser", parser);
return runInput(in);
}
public List<OutputRecord> runParser(List<String> list, EmbulkTestParserConfig parser) {
ConfigSource in = newConfigSource();
in.set("type", EmbulkTestFileInputPlugin.TYPE);
in.set("parser", parser);
embulkTestFileInputPlugin.setText(list);
return runInput(in);
}
public List<OutputRecord> runInput(ConfigSource in) {
ConfigSource out = newConfigSource();
out.set("type", EmbulkTestOutputPlugin.TYPE);
embulkTestOutputPlugin.clearResult();
run(in, out);
return embulkTestOutputPlugin.getResult();
}
public void run(ConfigSource in, ConfigSource out) {
ConfigSource config = newConfigSource();
config.set("in", in);
config.set("out", out);
run(config);
}
public void run(ConfigSource config) {
getEmbulkEmbed().run(config);
}
@Override
public void close() {
if (embulk != null) {
embulk.destroy();
}
}
}
YAMLファイルのparser部分を表すクラス。
package org.embulk.parser;
import java.util.ArrayList; import java.util.HashMap; import java.util.List;
@SuppressWarnings("serial")
public class EmbulkTestParserConfig extends HashMap<String, Object> {
public void setType(String type) {
set("type", type);
}
public void set(String key, Object value) {
if (value == null) {
super.remove(key);
} else {
super.put(key, value);
}
}
public List<EmbulkTestColumn> getColumns() {
@SuppressWarnings("unchecked")
List<EmbulkTestColumn> columns = (List<EmbulkTestColumn>) super.get("columns");
if (columns == null) {
columns = new ArrayList<>();
super.put("columns", columns);
}
return columns;
}
public EmbulkTestColumn addColumn(String name, String type) {
EmbulkTestColumn column = new EmbulkTestColumn();
column.set("name", name);
column.set("type", type);
getColumns().add(column);
return column;
}
public static class EmbulkTestColumn extends HashMap<String, Object> {
public EmbulkTestColumn set(String key, Object value) {
if (value == null) {
super.remove(key);
} else {
super.put(key, value);
}
return this;
}
}
}
入力データをList<String>で指定できるFileInputPlugin。
package org.embulk.parser;
import java.nio.charset.StandardCharsets; import java.util.List; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Buffer; import org.embulk.spi.Exec; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.TransactionalFileInput;
public class EmbulkTestFileInputPlugin implements FileInputPlugin {
public static final String TYPE = "EmbulkTestFileInputPlugin";
public interface PluginTask extends Task {
}
private List<String> list;
public void setText(List<String> list) {
this.list = list;
}
@Override
public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) {
PluginTask task = config.loadConfig(PluginTask.class);
int taskCount = 1;
return resume(task.dump(), taskCount, control);
}
@Override
public ConfigDiff resume(TaskSource taskSource, int taskCount, FileInputPlugin.Control control) {
control.run(taskSource, taskCount);
return Exec.newConfigDiff();
}
@Override
public void cleanup(TaskSource taskSource, int taskCount, List<TaskReport> successTaskReports) {
}
@Override
public TransactionalFileInput open(TaskSource taskSource, int taskIndex) {
return new TransactionalFileInput() {
private boolean eof = false;
private int index = 0;
@Override
public Buffer poll() {
if (index < list.size()) {
String s = list.get(index++) + "\n";
return Buffer.copyOf(s.getBytes(StandardCharsets.UTF_8));
}
eof = true;
return null;
}
@Override
public boolean nextFile() {
return !eof;
}
@Override
public void close() {
}
@Override
public void abort() {
}
@Override
public TaskReport commit() {
return Exec.newTaskReport();
}
};
}
}
出力されたデータをList<OutputRecord>で取得することが出来るOutputPlugin。
package org.embulk.parser;
import java.text.MessageFormat; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Column; import org.embulk.spi.ColumnVisitor; import org.embulk.spi.Exec; import org.embulk.spi.OutputPlugin; import org.embulk.spi.Page; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput; import org.embulk.spi.time.Timestamp; import org.embulk.spi.time.TimestampFormatter;
public class EmbulkTestOutputPlugin implements OutputPlugin {
public static final String TYPE = "EmbulkTestOutputPlugin";
public interface PluginTask extends Task, TimestampFormatter.Task {
}
public static class OutputRecord {
private Map<String, Object> map = new LinkedHashMap<>();
public void set(String name, Object value) {
map.put(name, value);
}
public String getAsString(String name) {
try {
return (String) map.get(name);
} catch (Exception e) {
throw new RuntimeException(MessageFormat.format("name={0}", name), e);
}
}
public Long getAsLong(String name) {
try {
return (Long) map.get(name);
} catch (Exception e) {
throw new RuntimeException(MessageFormat.format("name={0}", name), e);
}
}
public Double getAsDouble(String name) {
try {
return (Double) map.get(name);
} catch (Exception e) {
throw new RuntimeException(MessageFormat.format("name={0}", name), e);
}
}
public Boolean getAsBoolean(String name) {
try {
return (Boolean) map.get(name);
} catch (Exception e) {
throw new RuntimeException(MessageFormat.format("name={0}", name), e);
}
}
public Timestamp getAsTimestamp(String name) {
try {
return (Timestamp) map.get(name);
} catch (Exception e) {
throw new RuntimeException(MessageFormat.format("name={0}", name), e);
}
}
@Override
public String toString() {
return map.toString();
}
}
private final List<OutputRecord> result = new CopyOnWriteArrayList<>();
@Override
public ConfigDiff transaction(ConfigSource config, Schema schema, int taskCount, OutputPlugin.Control control) {
final PluginTask task = config.loadConfig(PluginTask.class);
return resume(task.dump(), schema, taskCount, control);
}
@Override
public ConfigDiff resume(TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) {
control.run(taskSource);
return Exec.newConfigDiff();
}
@Override
public void cleanup(TaskSource taskSource, Schema schema, int taskCount, List<TaskReport> successTaskReports) {
}
@Override
public TransactionalPageOutput open(TaskSource taskSource, final Schema schema, int taskIndex) {
return new TransactionalPageOutput() {
private final PageReader reader = new PageReader(schema);
@Override
public void add(Page page) {
reader.setPage(page);
while (reader.nextRecord()) {
final OutputRecord record = new OutputRecord();
for (Column column : schema.getColumns()) {
column.visit(new ColumnVisitor() {
@Override
public void timestampColumn(Column column) {
if (reader.isNull(column)) {
record.set(column.getName(), null);
return;
}
record.set(column.getName(), reader.getTimestamp(column));
}
@Override
public void stringColumn(Column column) {
if (reader.isNull(column)) {
record.set(column.getName(), null);
return;
}
record.set(column.getName(), reader.getString(column));
}
@Override
public void longColumn(Column column) {
if (reader.isNull(column)) {
record.set(column.getName(), null);
return;
}
record.set(column.getName(), reader.getLong(column));
}
@Override
public void doubleColumn(Column column) {
if (reader.isNull(column)) {
record.set(column.getName(), null);
return;
}
record.set(column.getName(), reader.getDouble(column));
}
@Override
public void booleanColumn(Column column) {
if (reader.isNull(column)) {
record.set(column.getName(), null);
return;
}
record.set(column.getName(), reader.getBoolean(column));
}
});
}
result.add(record);
}
}
@Override
public void finish() {
}
@Override
public void close() {
reader.close();
}
@Override
public void abort() {
}
@Override
public TaskReport commit() {
return Exec.newTaskReport();
}
};
}
public void clearResult() {
result.clear();
}
public List<OutputRecord> getResult() {
return result;
}
}