Embulk(0.7.5)のパーサープラグインのテスト用クラスのメモ。
|
プラグインの単体テスト用のクラスはEmbulk標準のものは提供されていないので、embulk-input-mysqlのEmbulkPluginTester.javaを参考にしてJavaパーサープラグイン用のテストクラス作ってみた。
EmbulkEmbedに自作プラグインを登録してEmbulkを実行する
という実装方法。
→EmbulkPluginTester
今回作ったEmbulkPluginTesterの特徴は、YAMLファイルを使用しないこと。
パーサープラグインの場合、入力はファイルとなるが、YAMLファイル内にフルパスを記述したくない為。
データの出力先もファイルではなく、Javaオブジェクトのリストとして取得できる。
これを使って期待値との比較を行う。
入力データはファイルにすることも出来るし、List<String>を使うことも出来る。
EmbulkEmbedを使うので、依存ライブラリーにembulk-standardsが必要となる。
〜 dependencies { compile "org.embulk:embulk-core:0.7.5" provided "org.embulk:embulk-core:0.7.5" compile "org.embulk:embulk-standards:0.7.5" // compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION" testCompile "junit:junit:4.+" } 〜
今回作ったJavaパーサープラグイン用EmbulkPluginTesterは以下のようにして使用する。
プロジェクト内のsrc/test/resourcesに配置されたファイルを入力としてテストを実行する例。
import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import java.net.URL; import org.embulk.parser.EmbulkPluginTester; import org.embulk.parser.EmbulkTestOutputPlugin.OutputRecord; import org.embulk.parser.EmbulkTestParserConfig; import org.junit.Test;
public class TestExampleMyformatParserPlugin { @Test public void testFile() { try (EmbulkPluginTester tester = new EmbulkPluginTester()) { // テスト対象プラグインの登録 tester.addParserPlugin("example_myformat", ExampleMyformatParserPlugin.class); // parserの定義(YAMLファイルの代わり) EmbulkTestParserConfig parser = tester.newParserConfig("example_myformat"); parser.addColumn("prefix", "string").set("foo", "abc"); parser.addColumn("suffix", "string"); // 読み込むファイル URL inFile = getClass().getResource("test1.txt"); // テストの実行 List<OutputRecord> result = tester.runParser(inFile, parser); assertThat(result.size(), is(2)); OutputRecord r0 = result.get(0); assertThat(r0.getAsString("prefix"), is("12345678")); assertThat(r0.getAsString("suffix"), is("90123456789")); OutputRecord r1 = result.get(1); assertThat(r1.getAsString("prefix"), is("abcdefgh")); assertThat(r1.getAsString("suffix"), is("ijklmn")); } } }
「example_myformat」は、プラグインのtype。
EmbulkTestParserConfigは、YAMLファイルのparser部分を表すクラス。
上記のコードは、以下のようなイメージ。
parser: type: example_myformat columns: - {name: prefix, type: string, foo: abc} - {name: suffix, type: string}
1234567890123456789 abcdefghijklmn
ファイルを使わず、文字列を入力としてテストを実行する例。
import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import org.embulk.parser.EmbulkPluginTester; import org.embulk.parser.EmbulkTestOutputPlugin.OutputRecord; import org.embulk.parser.EmbulkTestParserConfig; import org.junit.Test;
public class TestExampleMyformatParserPlugin { @Test public void testText() { try (EmbulkPluginTester tester = new EmbulkPluginTester()) { // テスト対象プラグインの登録 tester.addParserPlugin("example_myformat", ExampleMyformatParserPlugin.class); // parserの定義(YAMLファイルの代わり) EmbulkTestParserConfig parser = tester.newParserConfig("example_myformat"); parser.addColumn("prefix", "string").set("foo", "abc"); parser.addColumn("suffix", "string"); // 読み込むデータ List<String> list = new ArrayList<>(); list.add("0123456789012345"); list.add("0123456789abcdef"); // テストの実行 List<OutputRecord> result = tester.runParser(list, parser); assertThat(result.size(), is(2)); OutputRecord r0 = result.get(0); assertThat(r0.getAsString("prefix"), is("01234567")); assertThat(r0.getAsString("suffix"), is("89012345")); OutputRecord r1 = result.get(1); assertThat(r1.getAsString("prefix"), is("01234567")); assertThat(r1.getAsString("suffix"), is("89abcdef")); } } }
「example_myformat」は、プラグインのtype。
EmbulkTestParserConfigは、YAMLファイルのparser部分を表すクラス。
上記のコードは、以下のようなイメージ。
parser: type: example_myformat columns: - {name: prefix, type: string, foo: abc} - {name: suffix, type: string}
今回作ったEmbulkPluginTesterは以下のようなもの。
テストを実行する入り口。
package org.embulk.parser;
import java.io.Closeable; import java.io.File; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; import java.util.List; import org.embulk.EmbulkEmbed; import org.embulk.EmbulkEmbed.Bootstrap; import org.embulk.config.ConfigLoader; import org.embulk.config.ConfigSource; import org.embulk.parser.EmbulkTestOutputPlugin.OutputRecord; import org.embulk.plugin.InjectedPluginSource; import org.embulk.spi.InputPlugin; import org.embulk.spi.OutputPlugin; import org.embulk.spi.ParserPlugin; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provider;
// @see https://github.com/embulk/embulk-input-jdbc/blob/master/embulk-input-mysql/src/test/java/org/embulk/input/mysql/EmbulkPluginTester.java public class EmbulkPluginTester implements Closeable {
private static class PluginDefinition { public final Class<?> iface; public final String name; public final Class<?> impl; public PluginDefinition(Class<?> iface, String name, Class<?> impl) { this.iface = iface; this.name = name; this.impl = impl; } }
private final List<PluginDefinition> plugins = new ArrayList<>(); private EmbulkEmbed embulk; private ConfigLoader configLoader; private EmbulkTestFileInputPlugin embulkTestFileInputPlugin = new EmbulkTestFileInputPlugin(); private EmbulkTestOutputPlugin embulkTestOutputPlugin = new EmbulkTestOutputPlugin();
public EmbulkPluginTester() { } public EmbulkPluginTester(Class<?> iface, String name, Class<?> impl) { addPlugin(iface, name, impl); }
public void addPlugin(Class<?> iface, String name, Class<?> impl) { plugins.add(new PluginDefinition(iface, name, impl)); } public void addParserPlugin(String name, Class<? extends ParserPlugin> impl) { addPlugin(ParserPlugin.class, name, impl); }
protected EmbulkEmbed getEmbulkEmbed() { if (embulk == null) { Bootstrap bootstrap = new EmbulkEmbed.Bootstrap(); bootstrap.addModules(new Module() { @Override public void configure(Binder binder) { EmbulkPluginTester.this.configurePlugin(binder); for (PluginDefinition plugin : plugins) { InjectedPluginSource.registerPluginTo(binder, plugin.iface, plugin.name, plugin.impl); } } }); embulk = bootstrap.initializeCloseable(); } return embulk; } protected void configurePlugin(Binder binder) { // input plugins InjectedPluginSource.registerPluginTo(binder, InputPlugin.class, EmbulkTestFileInputPlugin.TYPE, EmbulkTestFileInputPlugin.class); binder.bind(EmbulkTestFileInputPlugin.class).toProvider(new Provider<EmbulkTestFileInputPlugin>() { @Override public EmbulkTestFileInputPlugin get() { return this.embulkTestFileInputPlugin; } }); // output plugins InjectedPluginSource.registerPluginTo(binder, OutputPlugin.class, EmbulkTestOutputPlugin.TYPE, EmbulkTestOutputPlugin.class); binder.bind(EmbulkTestOutputPlugin.class).toProvider(new Provider<EmbulkTestOutputPlugin>() { @Override public EmbulkTestOutputPlugin get() { return this.embulkTestOutputPlugin; } }); }
public ConfigLoader getConfigLoader() { if (configLoader == null) { configLoader = getEmbulkEmbed().newConfigLoader(); } return configLoader; } public ConfigSource newConfigSource() { return getConfigLoader().newConfigSource(); } public EmbulkTestParserConfig newParserConfig(String type) { EmbulkTestParserConfig parser = new EmbulkTestParserConfig(); parser.setType(type); return parser; }
public List<OutputRecord> runParser(URL inFile, EmbulkTestParserConfig parser) { File file; try { file = new File(inFile.toURI()); } catch (URISyntaxException e) { throw new RuntimeException(e); } return runParser(file, parser); } public List<OutputRecord> runParser(File inFile, EmbulkTestParserConfig parser) { ConfigSource in = newConfigSource(); in.set("type", "file"); in.set("path_prefix", inFile.getAbsolutePath()); in.set("parser", parser); return runInput(in); } public List<OutputRecord> runParser(List<String> list, EmbulkTestParserConfig parser) { ConfigSource in = newConfigSource(); in.set("type", EmbulkTestFileInputPlugin.TYPE); in.set("parser", parser); embulkTestFileInputPlugin.setText(list); return runInput(in); } public List<OutputRecord> runInput(ConfigSource in) { ConfigSource out = newConfigSource(); out.set("type", EmbulkTestOutputPlugin.TYPE); embulkTestOutputPlugin.clearResult(); run(in, out); return embulkTestOutputPlugin.getResult(); } public void run(ConfigSource in, ConfigSource out) { ConfigSource config = newConfigSource(); config.set("in", in); config.set("out", out); run(config); } public void run(ConfigSource config) { getEmbulkEmbed().run(config); }
@Override public void close() { if (embulk != null) { embulk.destroy(); } } }
YAMLファイルのparser部分を表すクラス。
package org.embulk.parser;
import java.util.ArrayList; import java.util.HashMap; import java.util.List;
@SuppressWarnings("serial") public class EmbulkTestParserConfig extends HashMap<String, Object> {
public void setType(String type) { set("type", type); } public void set(String key, Object value) { if (value == null) { super.remove(key); } else { super.put(key, value); } }
public List<EmbulkTestColumn> getColumns() { @SuppressWarnings("unchecked") List<EmbulkTestColumn> columns = (List<EmbulkTestColumn>) super.get("columns"); if (columns == null) { columns = new ArrayList<>(); super.put("columns", columns); } return columns; } public EmbulkTestColumn addColumn(String name, String type) { EmbulkTestColumn column = new EmbulkTestColumn(); column.set("name", name); column.set("type", type); getColumns().add(column); return column; }
public static class EmbulkTestColumn extends HashMap<String, Object> { public EmbulkTestColumn set(String key, Object value) { if (value == null) { super.remove(key); } else { super.put(key, value); } return this; } } }
入力データをList<String>で指定できるFileInputPlugin。
package org.embulk.parser;
import java.nio.charset.StandardCharsets; import java.util.List; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Buffer; import org.embulk.spi.Exec; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.TransactionalFileInput;
public class EmbulkTestFileInputPlugin implements FileInputPlugin { public static final String TYPE = "EmbulkTestFileInputPlugin";
public interface PluginTask extends Task { }
private List<String> list; public void setText(List<String> list) { this.list = list; }
@Override public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); int taskCount = 1; return resume(task.dump(), taskCount, control); } @Override public ConfigDiff resume(TaskSource taskSource, int taskCount, FileInputPlugin.Control control) { control.run(taskSource, taskCount); return Exec.newConfigDiff(); } @Override public void cleanup(TaskSource taskSource, int taskCount, List<TaskReport> successTaskReports) { } @Override public TransactionalFileInput open(TaskSource taskSource, int taskIndex) { return new TransactionalFileInput() { private boolean eof = false; private int index = 0; @Override public Buffer poll() { if (index < list.size()) { String s = list.get(index++) + "\n"; return Buffer.copyOf(s.getBytes(StandardCharsets.UTF_8)); } eof = true; return null; } @Override public boolean nextFile() { return !eof; } @Override public void close() { } @Override public void abort() { } @Override public TaskReport commit() { return Exec.newTaskReport(); } }; } }
出力されたデータをList<OutputRecord>で取得することが出来るOutputPlugin。
package org.embulk.parser;
import java.text.MessageFormat; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Column; import org.embulk.spi.ColumnVisitor; import org.embulk.spi.Exec; import org.embulk.spi.OutputPlugin; import org.embulk.spi.Page; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput; import org.embulk.spi.time.Timestamp; import org.embulk.spi.time.TimestampFormatter;
public class EmbulkTestOutputPlugin implements OutputPlugin { public static final String TYPE = "EmbulkTestOutputPlugin";
public interface PluginTask extends Task, TimestampFormatter.Task { }
public static class OutputRecord { private Map<String, Object> map = new LinkedHashMap<>(); public void set(String name, Object value) { map.put(name, value); } public String getAsString(String name) { try { return (String) map.get(name); } catch (Exception e) { throw new RuntimeException(MessageFormat.format("name={0}", name), e); } } public Long getAsLong(String name) { try { return (Long) map.get(name); } catch (Exception e) { throw new RuntimeException(MessageFormat.format("name={0}", name), e); } } public Double getAsDouble(String name) { try { return (Double) map.get(name); } catch (Exception e) { throw new RuntimeException(MessageFormat.format("name={0}", name), e); } } public Boolean getAsBoolean(String name) { try { return (Boolean) map.get(name); } catch (Exception e) { throw new RuntimeException(MessageFormat.format("name={0}", name), e); } } public Timestamp getAsTimestamp(String name) { try { return (Timestamp) map.get(name); } catch (Exception e) { throw new RuntimeException(MessageFormat.format("name={0}", name), e); } } @Override public String toString() { return map.toString(); } }
private final List<OutputRecord> result = new CopyOnWriteArrayList<>();
@Override public ConfigDiff transaction(ConfigSource config, Schema schema, int taskCount, OutputPlugin.Control control) { final PluginTask task = config.loadConfig(PluginTask.class); return resume(task.dump(), schema, taskCount, control); } @Override public ConfigDiff resume(TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) { control.run(taskSource); return Exec.newConfigDiff(); } @Override public void cleanup(TaskSource taskSource, Schema schema, int taskCount, List<TaskReport> successTaskReports) { } @Override public TransactionalPageOutput open(TaskSource taskSource, final Schema schema, int taskIndex) { return new TransactionalPageOutput() { private final PageReader reader = new PageReader(schema); @Override public void add(Page page) { reader.setPage(page); while (reader.nextRecord()) { final OutputRecord record = new OutputRecord(); for (Column column : schema.getColumns()) { column.visit(new ColumnVisitor() { @Override public void timestampColumn(Column column) { if (reader.isNull(column)) { record.set(column.getName(), null); return; } record.set(column.getName(), reader.getTimestamp(column)); } @Override public void stringColumn(Column column) { if (reader.isNull(column)) { record.set(column.getName(), null); return; } record.set(column.getName(), reader.getString(column)); } @Override public void longColumn(Column column) { if (reader.isNull(column)) { record.set(column.getName(), null); return; } record.set(column.getName(), reader.getLong(column)); } @Override public void doubleColumn(Column column) { if (reader.isNull(column)) { record.set(column.getName(), null); return; } record.set(column.getName(), reader.getDouble(column)); } @Override public void booleanColumn(Column column) { if (reader.isNull(column)) { record.set(column.getName(), null); return; } record.set(column.getName(), reader.getBoolean(column)); } }); } result.add(record); } } @Override public void finish() { } @Override public void close() { reader.close(); } @Override public void abort() { } @Override public TaskReport commit() { return Exec.newTaskReport(); } }; }
public void clearResult() { result.clear(); } public List<OutputRecord> getResult() { return result; } }