Asakusa FrameworkのDirect I/O APIのメモ。
|
|
AsakusaFW 0.7.3で、sandboxにDirect I/O APIというものが追加になった。
これを使うと、Operatorクラスの中から、自由にHDFS上のファイルを読み込むことが出来る。
(ジョブフローを経由せず、オペレーター内で(Direct I/Oのインポーターと同様に)ベースパスとリソースパターンを指定して、データモデルとしてファイルを読み込める)
小さなファイルを読み込んでフィールドに(MapやList等の形で)保持しておき、メソッド内で参照するといった使い方が出来る。
いわゆる「定数表」として使える。
ただし、これはあくまでsandboxで提供されている機能であり、正式なものではない。
(AsakusaFWでは、入出力ファイルは常にジョブフロー上に現れるのだが、Direct I/O APIではファイルがジョブフロー上に出てこないので、一貫性が無いし)
AsakusaFW 0.9.1以降ではGroupViewを使用する。[2017-12-10]
(GroupViewはDirect I/O APIと全く異なるので、ソースコードを書き直す必要がある)
Direct I/O APIはsandboxの機能なので、デフォルトでは使えない。
build.gradleに定義を追加する必要がある。
〜
dependencies {
compile group: 'com.asakusafw.sdk', name: 'asakusa-sdk-core', version: asakusafw.asakusafwVersion
compile group: 'com.asakusafw.sdk', name: 'asakusa-sdk-directio', version: asakusafw.asakusafwVersion
compile group: 'com.asakusafw.sdk', name: 'asakusa-sdk-windgate', version: asakusafw.asakusafwVersion
// Direct I/O sandbox api
compile group: 'com.asakusafw.sandbox', name: 'asakusa-directio-runtime-ext', version: asakusafw.asakusafwVersion
compile group: 'com.asakusafw.sandbox', name: 'asakusa-directio-test-driver-ext', version: asakusafw.asakusafwVersion
〜
}
定義を追加したら、Eclipseのプロジェクトの定義を作り直しておく。
code_convertというデータモデルをDirect I/O APIで読み込む例。
変換対象コードと変換後コードを持っておき、それを使ってUpdate演算子の中でコードを変換する。
"コード変換データ" @directio.csv code_convert = { "変換対象コード" code : TEXT; "変換後コード" converted_code : TEXT; };
このデータモデルをコンパイルすると、データモデルクラスと(@directio.csv属性が付けられているので)Importer/Exporterの雛形クラス(抽象クラス)とCsvFormatクラスが生成される。
Direct I/O APIでファイルを読み込むためには、このCsvFormatクラスが必要になる。
package com.example.operator;
import java.io.IOException; import java.util.HashMap; import java.util.Map; import com.asakusafw.runtime.core.util.Shared; import com.asakusafw.runtime.directio.api.DirectIo; import com.asakusafw.runtime.io.ModelInput; import com.asakusafw.vocabulary.operator.Update; import com.example.modelgen.dmdl.csv.CodeConvertCsvFormat; import com.example.modelgen.dmdl.model.CodeConvert; import com.example.modelgen.dmdl.model.SalesDetail;
public abstract class ConstantExampleOperator { @Update public void updateCode(SalesDetail detail) { Map<String, String> map = SHARED_CODE_CONVERT.get(); String convertedCode = map.get(detail.getStoreCodeAsString()); if (convertedCode != null) { detail.setStoreCodeAsString(convertedCode); } } static final Shared<Map<String, String>> SHARED_CODE_CONVERT = new Shared<Map<String, String>>() { @Override protected Map<String, String> initialValue() throws IOException { Map<String, String> map = new HashMap<>(); String basePath = "constants/code_convert"; String resourcePattern = "*.csv"; try (ModelInput<CodeConvert> input = DirectIo.open(CodeConvertCsvFormat.class, basePath, resourcePattern)) { CodeConvert model = new CodeConvert(); while (input.readTo(model)) { map.put(model.getCodeAsString(), model.getConvertedCodeAsString()); } } if (map.isEmpty()) { throw new IOException("code_convert is empty"); } return map; } }; }
Direct I/O APIでファイルを読み込む為に、Sharedというクラスが提供されている。
(Sharedはサンドボックスではなく、ユーティリティーAPIとして正式機能扱い)
これはThreadLocalクラスと似た形式のクラスで、initialValueメソッドで値の初期化処理を記述し、getメソッドで値を取得することが出来る。
このinitialValueメソッドの中でファイルを読み込んで使用する形式(今回の例ではMap)に変換しておく。
(Sharedを使うことにより、(概ね各タスク毎に)一度だけ初期化される。Sharedを使わずにDirectIo.open()を毎回呼ぶと、とんでもなく遅くなってしまうし!)
DirectIoクラスのopenメソッドでHDFSからファイルを読み込む。
第1引数はCsvFormatクラス、第2引数・第3引数はベースパスとリソースパターン。
これは、普通のインポータークラスで定義するものと同じ。
(ただし、インポータークラスそのものを使うことは出来ない。コンパイルは通るが、実行時にエラーになる。何故なら、インポーターの親クラスはvocabularyに属しており、これはコンパイル時に使われるだけで実行時のライブラリーには含まれないから。
逆に言えば、vocabularyのjarファイルも実行環境に含めてしまえば使えるのだが^^;)
また、インポータークラスのベースパスやリソースパターンではバッチ引数等の置換機能があるが、DirectIo.open()ではそういった変換はされない。
必要であれば、自分でBatchContext(フレームワークAPI)を使って値を取得し、変換する。
DirectIo.open()を呼び出すと、ModelInputが返ってくる。
これのreadToメソッドを呼び出すと、1レコードずつデータモデルを取得できる。
データモデルインスタンスは自分で作っておく。
データモデルから値を取り出して値だけを保持する場合は1つのインスタンスを使い回せばよいが、データモデルのリストを保持するような場合はインスタンスを毎回作る。
インスタンス使い回しの例 | インスタンス毎回生成の例 |
---|---|
List<String> list = new ArrayList<>(); |
List<Hoge> list = new ArrayList<>(); |
なお、読み込むファイルが見つからなくてもopenメソッドで例外は発生せず、readToメソッドを呼んだ時にEOF扱いになるだけ。[2015-07-04]
なので、データが取得できたがどうかのチェックは自分で行う必要がある。
Direct I/O APIを使ったOperatorをテストする方法。
package com.example.operator;
import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import java.util.ArrayList; import java.util.List; import org.junit.Rule; import org.junit.Test; import com.asakusafw.testdriver.OperatorTestEnvironment; import com.asakusafw.testdriver.SharedObjectCleaner; import com.asakusafw.testdriver.directio.api.DirectIoTester; import com.example.jobflow.CodeConvertFromCsv; import com.example.modelgen.dmdl.model.CodeConvert; import com.example.modelgen.dmdl.model.SalesDetail;
public class ConstantExampleOperatorTest { @Rule public final SharedObjectCleaner cleaner = new SharedObjectCleaner().add(ConstantExampleOperator.class);
SharedObjectCleanerを定義しておくと、Operatorクラス内で定義したSharedフィールドのクリーニング(初期化)を行ってくれる。
@Rule public final OperatorTestEnvironment environment = new OperatorTestEnvironment();
@Test public void testUpdateCode() { // ファイルの準備 CodeConvertFromCsv importer = new CodeConvertFromCsv(); DirectIoTester.with(environment) .resource(importer.getFormat(), importer.getBasePath(), importer.getResourcePattern()) .prepare(getCodeConvertList()); // テスト実行 ConstantExampleOperator operator = new ConstantExampleOperatorImpl(); SalesDetail detail = new SalesDetail(); detail.setStoreCodeAsString("123"); operator.updateCode(detail); assertThat(detail.getStoreCodeAsString(), is("abc")); } // ファイル内容の準備 private List<CodeConvert> getCodeConvertList() { List<CodeConvert> list = new ArrayList<>(); CodeConvert object0 = new CodeConvert(); object0.setCodeAsString("123"); object0.setConvertedCodeAsString("abc"); list.add(object0); CodeConvert object1 = new CodeConvert(); object1.setCodeAsString("456"); object1.setConvertedCodeAsString("def"); list.add(object1); return list; } }
Direct I/O APIで読み込むファイルは、DirectIoTesterを使ってOperatorTestEnvironmentに登録しておく。
withメソッドでOperatorTestEnvironmentを指定し、
resourceメソッドでCsvFormatクラス・ベースパス・リソースパターンを指定し、
prepareメソッドで読み込まれるデータ(データモデルのリスト)を指定する。
resourceメソッドにはベースパスやリソースパターンを文字列で指定してもよいが、インポーターのインスタンスを生成してそこから取得することも出来る。
(実行環境にはインポーターの親クラスのライブラリーが無いのでOperator本体でインポーターを使うことは出来ないが、テストクラスでは使える)
package com.example.jobflow; import com.example.modelgen.dmdl.csv.AbstractCodeConvertCsvInputDescription;
public class CodeConvertFromCsv extends AbstractCodeConvertCsvInputDescription { @Override public String getBasePath() { return "constants/code_convert"; } @Override public String getResourcePattern() { return "*.csv"; } @Override public DataSize getDataSize() { return DataSize.TINY; } }
基本的に、AsakusaFWでは全ての入出力ファイルでインポーター・エクスポータークラスを用意するので、Direct I/O APIを使う場合もインポータークラスを用意しておいた方が統一がとれていいのではないかと思う。
Direct I/O APIで読み込む場合はインポーターは必要ない、にも関わらずインポーターを用意しておくべき理由は、後から調査するときに役立つと思うから。[2015-04-24]
あるファイル(データモデル)が使われているかどうかは、インポーター・エクスポーターの有無でだいたい分かる。
インポーター・エクスポータークラスが見つかれば、それを使っているジョブフローを検索すれば、どのジョブで使われているのか確実に分かる。
だったのだが、Direct I/O APIを使っていると、この調査方法からは漏れてしまう。
そのため、Direct I/O APIを使う場合にインポーターが不要であっても、インポータークラスだけは用意しておき、せめてテストクラスで使うようにしておけば、検索で引っかかると思う。
DirectIoTesterを使ったOperatorのテストを行う場合、実行環境のパス(データソース)を定義する必要がある。
データソースを定義せずにテストを実行すると、以下のような例外が発生する。
java.lang.IllegalStateException: failed to initialize Direct I/O environment: CodeConvert@constants/code_convert:*.csv at com.asakusafw.testdriver.directio.api.DirectIoOperatorTester$Resource.prepare0(DirectIoOperatorTester.java:88) at com.asakusafw.testdriver.directio.api.DirectIoOperatorTester$Resource.prepare(DirectIoOperatorTester.java:58) at com.asakusafw.testdriver.directio.api.DirectIoTester$DirectIoResource.prepare(DirectIoTester.java:160) at com.example.operator.ConstantExampleOperatorTest.testUpdateCode(ConstantExampleOperatorTest.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at com.asakusafw.testdriver.OperatorTestEnvironment$1.evaluate(OperatorTestEnvironment.java:141) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197) Caused by: java.io.IOException: Failed to initialize Direct I/O for "constants/code_convert", please check Direct I/O configuration at com.asakusafw.testdriver.directio.DirectIoTestHelper.<init>(DirectIoTestHelper.java:136) at com.asakusafw.testdriver.directio.DirectIoTestHelper.<init>(DirectIoTestHelper.java:111) at com.asakusafw.testdriver.directio.api.DirectIoOperatorTester$Resource.prepare0(DirectIoOperatorTester.java:64) ... 31 more Caused by: java.io.IOException: There are no data sources for path: constants/code_convert at com.asakusafw.runtime.directio.DirectDataSourceRepository.findNode(DirectDataSourceRepository.java:95) at com.asakusafw.runtime.directio.DirectDataSourceRepository.getRelatedId(DirectDataSourceRepository.java:114) at com.asakusafw.testdriver.directio.DirectIoTestHelper.<init>(DirectIoTestHelper.java:132) ... 33 more
DirectIoTesterを使ったOperatorのテストを行う為のデータソースの定義方法は2つある。
1つはasakusa-resources.xmlを用意する方法、もう1つはOperatorTestEnvironmentに設定する方法。
「プロジェクト/src/test/resources」直下にasakura-resources.xmlを配置する。
asakusa-resources.xmlは、ASAKUSA_HOME/core/conf/asakusa-resources.xml(フローのテスト環境用)をそのままコピーすればよい。
ただし、必要なのはデータソース定義のみなので、以下のような定義で充分。
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>com.asakusafw.directio.root</name> <value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value> </property> <property> <name>com.asakusafw.directio.root.path</name> <value>/</value> </property> <property> <name>com.asakusafw.directio.root.fs.path</name> <value>target/testing/operator/directio</value> </property> </configuration>
基本的に、Operatorのテストを行う場合はAsakusa
Frameworkのテスト環境(環境変数ASAKUSA_HOMEの下にインストールするもの)は必要ない。
逆に言えば、ASAKUSA_HOME配下の環境が無くてもOperatorのテストは実行できるべきなので、ASAKUSA_HOME/core/conf/asakusa-resources.xmlを参照する仕組みになっていないのだと思われる。
OperatorTestEnvironment#configure()を使ってデータソース定義を設定する。
import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import com.asakusafw.testdriver.OperatorTestEnvironment;
public class ConstantExampleOperatorTest { 〜 @Rule public final OperatorTestEnvironment environment = new OperatorTestEnvironment(); @Rule public final TemporaryFolder folder = new TemporaryFolder(); @Before public void setUp() throws IOException { environment.configure("com.asakusafw.directio.root", "com.asakusafw.runtime.directio.hadoop.HadoopDataSource"); environment.configure("com.asakusafw.directio.root.path", "/"); environment.configure("com.asakusafw.directio.root.fs.path", folder.newFolder().toURI().toString()); environment.reload(); } 〜 }
com.asakusafw.directio.root.fs.path
にはテストデータを配置するディレクトリーを指定するので、文字列でパスを直接書いてもよい。
が、TemporaryFolderを使うと自動的に一時ディレクトリーが作られ、テストの実行が終わると自動的に削除される。
“「DirectIo.open()でファイルを読み込むオペレーター」を使っているフロー”のテストを行う場合、testerに対してファイルを登録する。
ファイルのデータを用意する方法は、データモデルのリストを登録する方法と、Excelファイルから読み込む方法がある。
Operatorのテストと同様に、DirectIo.open()で読み込むデータをデータモデルのリストとして用意する 。
package com.example.jobflow;
import java.util.ArrayList; import java.util.List; import com.asakusafw.testdriver.JobFlowTester; import com.asakusafw.testdriver.core.PropertyName; import com.asakusafw.testdriver.directio.api.DirectIoTester; import com.example.jobflow.ConstantExampleJob; import com.example.modelgen.dmdl.model.CodeConvert; import com.example.modelgen.dmdl.model.SalesDetail; import org.junit.Test;
/** * {@link ConstantExampleJob}のテスト。 */ public class ConstantExampleJobTest { static { System.setProperty(PropertyName.KEY_SEGMENT_SEPARATOR, "_"); }
@Test public void describe() { JobFlowTester tester = new JobFlowTester(getClass()); // ファイルの準備 CodeConvertFromCsv importer = new CodeConvertFromCsv(); DirectIoTester.with(tester) .resource(importer.getFormat(), importer.getBasePath(), importer.getResourcePattern()) .prepare(getCodeConvertList()); tester.input("salesDetail", SalesDetail.class).prepare("ConstantExampleJobTest.xls#salesDetail"); tester.output("salesDetailOut", SalesDetail.class).verify("ConstantExampleJobTest.xls#salesDetailOut", "ConstantExampleJobTest.xls#salesDetailOut_rule"); tester.runTest(ConstantExampleJob.class); } // ファイル内容の準備 private List<CodeConvert> getCodeConvertList() { List<CodeConvert> list = new ArrayList<>(); CodeConvert object0 = new CodeConvert(); object0.setCodeAsString("123"); object0.setConvertedCodeAsString("abc"); list.add(object0); CodeConvert object1 = new CodeConvert(); object1.setCodeAsString("456"); object1.setConvertedCodeAsString("def"); list.add(object1); return list; } }
DirectIo.open()で読み込むデータをExcelファイルで用意する。
package com.example.jobflow;
import org.junit.Test; import com.asakusafw.testdriver.JobFlowTester; import com.asakusafw.testdriver.core.PropertyName; import com.asakusafw.testdriver.directio.api.DirectIoTester; import com.example.modelgen.dmdl.model.SalesDetail;
/** * {@link ConstantExampleJob}のテスト。 */ public class ConstantExampleJobTest { static { System.setProperty(PropertyName.KEY_SEGMENT_SEPARATOR, "_"); }
@Test public void describe() { JobFlowTester tester = new JobFlowTester(getClass()); CodeConvertFromCsv importer = new CodeConvertFromCsv(); DirectIoTester.with(tester) .resource(importer.getFormat(), importer.getBasePath(), importer.getResourcePattern()) .prepare("ConstantExampleJobTest.xls#codeConvert"); tester.input("salesDetail", SalesDetail.class).prepare("ConstantExampleJobTest.xls#salesDetail"); tester.output("salesDetailOut", SalesDetail.class).verify("ConstantExampleJobTest.xls#salesDetailOut", "ConstantExampleJobTest.xls#salesDetailOut_rule"); tester.runTest(ConstantExampleJob.class); } }
A | B | C | |
1 | code | converted_code | |
2 | 123 | abc | |
3 | 456 | def | |
4 |
Direct I/O APIは、基本的には「Direct I/O APIでないとどうしようもない」という箇所で使うべき。[2015-04-24]
やろうと思えばOperatorの中だけで全てのファイルを読み込めるので、「フローとは何なのか」状態になってしまう(苦笑)
AsakusaFWのフローの最適化も一切効かないだろうし。
それやこれやで、Direct I/O APIはサンドボックスに入っていて正式機能扱いではないのだと思う。
前述の例では、code_convertというファイルに入っている値を使ってトランザクションデータのコードを変換している。
この使い方であれば、MasterJoinUpdate演算子で実現できる。
(マスター側(code_convert)のインポーターでデータサイズをTINYにしておけば、サイドデータ結合(ハッシュジョイン)で結合されるし)
Direct I/O APIを使わないと無理そうなのは、例えばCoGroup演算子で3種類の入力をマッチさせたいが結合キーの種類が異なる場合。
CoGroupの引数では2種類だけを入力とし、3種類目だけをDirect I/O APIで読み込んでおく、というのは考えられる。(ただし当然小さいデータ(マスターデータ)に限る)
(そもそも2つの演算子に分割できないか、という事をまず考えるところだが)
また、MasterJoinUpdate演算子で結合キーが無い状態
「@Key(group = {})
」で結合させて更新しているような処理だと、(キーの値が一種類という状態なので)実行時のHadoopのタスクが分散しない。
こういうときはUpdate演算子に変更してDirect I/O APIでマスターデータを読み込んでおく方が実行効率が良くなるかもしれない。