S-JIS[2015-04-16/2017-12-10] 変更履歴

Asakusa Framework Direct I/O API

Asakusa FrameworkDirect I/O APIのメモ。


概要

AsakusaFW 0.7.3で、sandboxにDirect I/O APIというものが追加になった。
これを使うと、Operatorクラスの中から、自由にHDFS上のファイルを読み込むことが出来る。
(ジョブフローを経由せず、オペレーター内で(Direct I/Oのインポーターと同様に)ベースパスとリソースパターンを指定して、データモデルとしてファイルを読み込める)

小さなファイルを読み込んでフィールドに(MapやList等の形で)保持しておき、メソッド内で参照するといった使い方が出来る。
いわゆる「定数表」として使える。

ただし、これはあくまでsandboxで提供されている機能であり、正式なものではない。
(AsakusaFWでは、入出力ファイルは常にジョブフロー上に現れるのだが、Direct I/O APIではファイルがジョブフロー上に出てこないので、一貫性が無いし)
AsakusaFW 0.9.1以降ではGroupViewを使用する。[2017-12-10]
(GroupViewはDirect I/O APIと全く異なるので、ソースコードを書き直す必要がある)

Direct I/O APIの使いどころ


build.gradleの定義

Direct I/O APIはsandboxの機能なので、デフォルトでは使えない。
build.gradleに定義を追加する必要がある。

build.gradle:

〜
dependencies {
    compile group: 'com.asakusafw.sdk', name: 'asakusa-sdk-core', version: asakusafw.asakusafwVersion
    compile group: 'com.asakusafw.sdk', name: 'asakusa-sdk-directio', version: asakusafw.asakusafwVersion
    compile group: 'com.asakusafw.sdk', name: 'asakusa-sdk-windgate', version: asakusafw.asakusafwVersion

    // Direct I/O sandbox api
    compile group: 'com.asakusafw.sandbox', name: 'asakusa-directio-runtime-ext', version: asakusafw.asakusafwVersion
    compile group: 'com.asakusafw.sandbox', name: 'asakusa-directio-test-driver-ext', version: asakusafw.asakusafwVersion
〜
}

定義を追加したら、Eclipseのプロジェクトの定義を作り直しておく。


コーディング例

code_convertというデータモデルをDirect I/O APIで読み込む例。
変換対象コードと変換後コードを持っておき、それを使ってUpdate演算子の中でコードを変換する。


constant_example.dmdl:

"コード変換データ"
@directio.csv
code_convert = {

    "変換対象コード"
    code : TEXT;

    "変換後コード"
    converted_code : TEXT;
};

このデータモデルをコンパイルすると、データモデルクラスと(@directio.csv属性が付けられているので)Importer/Exporterの雛形クラス(抽象クラス)とCsvFormatクラスが生成される。
Direct I/O APIでファイルを読み込むためには、このCsvFormatクラスが必要になる。


ConstantExampleOperator.java:

package com.example.operator;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import com.asakusafw.runtime.core.util.Shared;
import com.asakusafw.runtime.directio.api.DirectIo;
import com.asakusafw.runtime.io.ModelInput;

import com.asakusafw.vocabulary.operator.Update;

import com.example.modelgen.dmdl.csv.CodeConvertCsvFormat;
import com.example.modelgen.dmdl.model.CodeConvert;
import com.example.modelgen.dmdl.model.SalesDetail;
public abstract class ConstantExampleOperator {

	@Update
	public void updateCode(SalesDetail detail) {
		Map<String, String> map = SHARED_CODE_CONVERT.get();

		String convertedCode = map.get(detail.getStoreCodeAsString());
		if (convertedCode != null) {
			detail.setStoreCodeAsString(convertedCode);
		}
	}

	static final Shared<Map<String, String>> SHARED_CODE_CONVERT = new Shared<Map<String, String>>() {
		@Override
		protected Map<String, String> initialValue() throws IOException {
			Map<String, String> map = new HashMap<>();

			String basePath = "constants/code_convert";
			String resourcePattern = "*.csv";
			try (ModelInput<CodeConvert> input = DirectIo.open(CodeConvertCsvFormat.class, basePath, resourcePattern)) {
				CodeConvert model = new CodeConvert();
				while (input.readTo(model)) {
					map.put(model.getCodeAsString(), model.getConvertedCodeAsString());
				}
			}

			if (map.isEmpty()) {
				throw new IOException("code_convert is empty");
			}

			return map;
		}
	};
}

Direct I/O APIでファイルを読み込む為に、Sharedというクラスが提供されている。
(Sharedはサンドボックスではなく、ユーティリティーAPIとして正式機能扱い)
これはThreadLocalクラスと似た形式のクラスで、initialValueメソッドで値の初期化処理を記述し、getメソッドで値を取得することが出来る。
このinitialValueメソッドの中でファイルを読み込んで使用する形式(今回の例ではMap)に変換しておく。
(Sharedを使うことにより、(概ね各タスク毎に)一度だけ初期化される。Sharedを使わずにDirectIo.open()を毎回呼ぶと、とんでもなく遅くなってしまうし!)

DirectIoクラスのopenメソッドでHDFSからファイルを読み込む。
第1引数はCsvFormatクラス、第2引数・第3引数はベースパスとリソースパターン
これは、普通のインポータークラスで定義するものと同じ。
(ただし、インポータークラスそのものを使うことは出来ない。コンパイルは通るが、実行時にエラーになる。何故なら、インポーターの親クラスはvocabularyに属しており、これはコンパイル時に使われるだけで実行時のライブラリーには含まれないから。
 逆に言えば、vocabularyのjarファイルも実行環境に含めてしまえば使えるのだが^^;)

また、インポータークラスのベースパスやリソースパターンではバッチ引数等の置換機能があるが、DirectIo.open()ではそういった変換はされない。
必要であれば、自分でBatchContext(フレームワークAPI)を使って値を取得し、変換する。

DirectIo.open()を呼び出すと、ModelInputが返ってくる。
これのreadToメソッドを呼び出すと、1レコードずつデータモデルを取得できる。
データモデルインスタンスは自分で作っておく。
データモデルから値を取り出して値だけを保持する場合は1つのインスタンスを使い回せばよいが、データモデルのリストを保持するような場合はインスタンスを毎回作る。

インスタンス使い回しの例 インスタンス毎回生成の例
List<String> list = new ArrayList<>();
try (ModelInput<Hoge> input = DirectIo.open(〜)) {
    Hoge object = new Hoge();
    while (input.readTo(object)) {
        list.add(object.getValueAsString());
    }
}
List<Hoge> list = new ArrayList<>();
try (ModelInput<Hoge> input = DirectIo.open(〜)) {
    for (;;) {
        Hoge object = new Hoge();
        if (!input.readTo(object)) {
            break;
        }
        list.add(object);
    }
}

なお、読み込むファイルが見つからなくてもopenメソッドで例外は発生せず、readToメソッドを呼んだ時にEOF扱いになるだけ。[2015-07-04]
なので、データが取得できたがどうかのチェックは自分で行う必要がある。


Operatorのテストの例

Direct I/O APIを使ったOperatorをテストする方法。

package com.example.operator;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import java.util.ArrayList;
import java.util.List;

import org.junit.Rule;
import org.junit.Test;

import com.asakusafw.testdriver.OperatorTestEnvironment;
import com.asakusafw.testdriver.SharedObjectCleaner;
import com.asakusafw.testdriver.directio.api.DirectIoTester;

import com.example.jobflow.CodeConvertFromCsv;
import com.example.modelgen.dmdl.model.CodeConvert;
import com.example.modelgen.dmdl.model.SalesDetail;
public class ConstantExampleOperatorTest {

	@Rule
	public final SharedObjectCleaner cleaner = new SharedObjectCleaner().add(ConstantExampleOperator.class);

SharedObjectCleanerを定義しておくと、Operatorクラス内で定義したSharedフィールドのクリーニング(初期化)を行ってくれる。

	@Rule
	public final OperatorTestEnvironment environment = new OperatorTestEnvironment();
	@Test
	public void testUpdateCode() {
		// ファイルの準備
		CodeConvertFromCsv importer = new CodeConvertFromCsv();
		DirectIoTester.with(environment)
			.resource(importer.getFormat(), importer.getBasePath(), importer.getResourcePattern())
			.prepare(getCodeConvertList());

		// テスト実行
		ConstantExampleOperator operator = new ConstantExampleOperatorImpl();
		SalesDetail detail = new SalesDetail();
		detail.setStoreCodeAsString("123");
		operator.updateCode(detail);
		assertThat(detail.getStoreCodeAsString(), is("abc"));
	}

	// ファイル内容の準備
	private List<CodeConvert> getCodeConvertList() {
		List<CodeConvert> list = new ArrayList<>();

		CodeConvert object0 = new CodeConvert();
		object0.setCodeAsString("123");
		object0.setConvertedCodeAsString("abc");
		list.add(object0);

		CodeConvert object1 = new CodeConvert();
		object1.setCodeAsString("456");
		object1.setConvertedCodeAsString("def");
		list.add(object1);

		return list;
	}
}

Direct I/O APIで読み込むファイルは、DirectIoTesterを使ってOperatorTestEnvironmentに登録しておく。
withメソッドでOperatorTestEnvironmentを指定し、
resourceメソッドでCsvFormatクラス・ベースパス・リソースパターンを指定し、
prepareメソッドで読み込まれるデータ(データモデルのリスト)を指定する。


resourceメソッドにはベースパスやリソースパターンを文字列で指定してもよいが、インポーターのインスタンスを生成してそこから取得することも出来る。
(実行環境にはインポーターの親クラスのライブラリーが無いのでOperator本体でインポーターを使うことは出来ないが、テストクラスでは使える)

CodeConvertFromCsv.java:

package com.example.jobflow;

import com.example.modelgen.dmdl.csv.AbstractCodeConvertCsvInputDescription;
public class CodeConvertFromCsv extends AbstractCodeConvertCsvInputDescription {

	@Override
	public String getBasePath() {
		return "constants/code_convert";
	}

	@Override
	public String getResourcePattern() {
		return "*.csv";
	}

	@Override
	public DataSize getDataSize() {
		return DataSize.TINY;
	}
}

基本的に、AsakusaFWでは全ての入出力ファイルでインポーター・エクスポータークラスを用意するので、Direct I/O APIを使う場合もインポータークラスを用意しておいた方が統一がとれていいのではないかと思う。

Direct I/O APIで読み込む場合はインポーターは必要ない、にも関わらずインポーターを用意しておくべき理由は、後から調査するときに役立つと思うから。[2015-04-24]
あるファイル(データモデル)が使われているかどうかは、インポーター・エクスポーターの有無でだいたい分かる。
インポーター・エクスポータークラスが見つかれば、それを使っているジョブフローを検索すれば、どのジョブで使われているのか確実に分かる。
だったのだが、Direct I/O APIを使っていると、この調査方法からは漏れてしまう。
そのため、Direct I/O APIを使う場合にインポーターが不要であっても、インポータークラスだけは用意しておき、せめてテストクラスで使うようにしておけば、検索で引っかかると思う。


Operatorテスト用のデータソースの用意

DirectIoTesterを使ったOperatorのテストを行う場合、実行環境のパス(データソース)を定義する必要がある。
データソースを定義せずにテストを実行すると、以下のような例外が発生する。

java.lang.IllegalStateException: failed to initialize Direct I/O environment: CodeConvert@constants/code_convert:*.csv
	at com.asakusafw.testdriver.directio.api.DirectIoOperatorTester$Resource.prepare0(DirectIoOperatorTester.java:88)
	at com.asakusafw.testdriver.directio.api.DirectIoOperatorTester$Resource.prepare(DirectIoOperatorTester.java:58)
	at com.asakusafw.testdriver.directio.api.DirectIoTester$DirectIoResource.prepare(DirectIoTester.java:160)
	at com.example.operator.ConstantExampleOperatorTest.testUpdateCode(ConstantExampleOperatorTest.java:49)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
	at com.asakusafw.testdriver.OperatorTestEnvironment$1.evaluate(OperatorTestEnvironment.java:141)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: java.io.IOException: Failed to initialize Direct I/O for "constants/code_convert", please check Direct I/O configuration
	at com.asakusafw.testdriver.directio.DirectIoTestHelper.<init>(DirectIoTestHelper.java:136)
	at com.asakusafw.testdriver.directio.DirectIoTestHelper.<init>(DirectIoTestHelper.java:111)
	at com.asakusafw.testdriver.directio.api.DirectIoOperatorTester$Resource.prepare0(DirectIoOperatorTester.java:64)
	... 31 more
Caused by: java.io.IOException: There are no data sources for path: constants/code_convert
	at com.asakusafw.runtime.directio.DirectDataSourceRepository.findNode(DirectDataSourceRepository.java:95)
	at com.asakusafw.runtime.directio.DirectDataSourceRepository.getRelatedId(DirectDataSourceRepository.java:114)
	at com.asakusafw.testdriver.directio.DirectIoTestHelper.<init>(DirectIoTestHelper.java:132)
	... 33 more

DirectIoTesterを使ったOperatorのテストを行う為のデータソースの定義方法は2つある。
1つはasakusa-resources.xmlを用意する方法、もう1つはOperatorTestEnvironmentに設定する方法


asakura-resources.xmlを用意する方法

「プロジェクト/src/test/resources」直下にasakura-resources.xmlを配置する。
asakusa-resources.xmlは、ASAKUSA_HOME/core/conf/asakusa-resources.xml(フローのテスト環境用)をそのままコピーすればよい。
ただし、必要なのはデータソース定義のみなので、以下のような定義で充分。

プロジェクト/src/test/resources/asakusa-resources.xml:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
	<property>
		<name>com.asakusafw.directio.root</name>
		<value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value>
	</property>
	<property>
		<name>com.asakusafw.directio.root.path</name>
		<value>/</value>
	</property>
	<property>
		<name>com.asakusafw.directio.root.fs.path</name>
		<value>target/testing/operator/directio</value>
	</property>
</configuration>

基本的に、Operatorのテストを行う場合はAsakusa Frameworkのテスト環境(環境変数ASAKUSA_HOMEの下にインストールするもの)は必要ない。
逆に言えば、ASAKUSA_HOME配下の環境が無くてもOperatorのテストは実行できるべきなので、ASAKUSA_HOME/core/conf/asakusa-resources.xmlを参照する仕組みになっていないのだと思われる。


OperatorTestEnvironmentに設定する方法

OperatorTestEnvironment#configure()を使ってデータソース定義を設定する。

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import com.asakusafw.testdriver.OperatorTestEnvironment;
public class ConstantExampleOperatorTest {
〜
	@Rule
	public final OperatorTestEnvironment environment = new OperatorTestEnvironment();

	@Rule
	public final TemporaryFolder folder = new TemporaryFolder();

	@Before
	public void setUp() throws IOException {
		environment.configure("com.asakusafw.directio.root", "com.asakusafw.runtime.directio.hadoop.HadoopDataSource");
		environment.configure("com.asakusafw.directio.root.path", "/");
		environment.configure("com.asakusafw.directio.root.fs.path", folder.newFolder().toURI().toString());
		environment.reload();
	}
〜
}

com.asakusafw.directio.root.fs.pathにはテストデータを配置するディレクトリーを指定するので、文字列でパスを直接書いてもよい。
が、TemporaryFolderを使うと自動的に一時ディレクトリーが作られ、テストの実行が終わると自動的に削除される。


フローのテストの例

“「DirectIo.open()でファイルを読み込むオペレーター」を使っているフロー”のテストを行う場合、testerに対してファイルを登録する。
ファイルのデータを用意する方法は、データモデルのリストを登録する方法と、Excelファイルから読み込む方法がある。


データモデルのリストとして用意する方法

Operatorのテストと同様に、DirectIo.open()で読み込むデータをデータモデルのリストとして用意する 。

ConstantExampleJobTest.java:

package com.example.jobflow;
import java.util.ArrayList;
import java.util.List;

import com.asakusafw.testdriver.JobFlowTester;
import com.asakusafw.testdriver.core.PropertyName;
import com.asakusafw.testdriver.directio.api.DirectIoTester;

import com.example.jobflow.ConstantExampleJob;
import com.example.modelgen.dmdl.model.CodeConvert;
import com.example.modelgen.dmdl.model.SalesDetail;

import org.junit.Test;
/**
 * {@link ConstantExampleJob}のテスト。
 */
public class ConstantExampleJobTest {

	static {
		System.setProperty(PropertyName.KEY_SEGMENT_SEPARATOR, "_");
	}
	@Test
	public void describe() {
		JobFlowTester tester = new JobFlowTester(getClass());

		// ファイルの準備
		CodeConvertFromCsv importer = new CodeConvertFromCsv();
		DirectIoTester.with(tester)
			.resource(importer.getFormat(), importer.getBasePath(), importer.getResourcePattern())
			.prepare(getCodeConvertList());

		tester.input("salesDetail", SalesDetail.class).prepare("ConstantExampleJobTest.xls#salesDetail");
		tester.output("salesDetailOut", SalesDetail.class).verify("ConstantExampleJobTest.xls#salesDetailOut", "ConstantExampleJobTest.xls#salesDetailOut_rule");

		tester.runTest(ConstantExampleJob.class);
	}

	// ファイル内容の準備
	private List<CodeConvert> getCodeConvertList() {
		List<CodeConvert> list = new ArrayList<>();

		CodeConvert object0 = new CodeConvert();
		object0.setCodeAsString("123");
		object0.setConvertedCodeAsString("abc");
		list.add(object0);

		CodeConvert object1 = new CodeConvert();
		object1.setCodeAsString("456");
		object1.setConvertedCodeAsString("def");
		list.add(object1);

		return list;
	}
}

Excelファイルのシートとして用意する方法

DirectIo.open()で読み込むデータをExcelファイルで用意する。

ConstantExampleJobTest.java:

package com.example.jobflow;
import org.junit.Test;

import com.asakusafw.testdriver.JobFlowTester;
import com.asakusafw.testdriver.core.PropertyName;
import com.asakusafw.testdriver.directio.api.DirectIoTester;

import com.example.modelgen.dmdl.model.SalesDetail;
/**
 * {@link ConstantExampleJob}のテスト。
 */
public class ConstantExampleJobTest {

	static {
		System.setProperty(PropertyName.KEY_SEGMENT_SEPARATOR, "_");
	}
	@Test
	public void describe() {
		JobFlowTester tester = new JobFlowTester(getClass());

		CodeConvertFromCsv importer = new CodeConvertFromCsv();
		DirectIoTester.with(tester)
			.resource(importer.getFormat(), importer.getBasePath(), importer.getResourcePattern())
			.prepare("ConstantExampleJobTest.xls#codeConvert");

		tester.input("salesDetail", SalesDetail.class).prepare("ConstantExampleJobTest.xls#salesDetail");
		tester.output("salesDetailOut", SalesDetail.class).verify("ConstantExampleJobTest.xls#salesDetailOut", "ConstantExampleJobTest.xls#salesDetailOut_rule");

		tester.runTest(ConstantExampleJob.class);
	}
}

src/test/resources/com/example/jobflow/ConstantExampleJobTest.xlsのcodeConvertシート:

  A B C
1 code converted_code  
2 123 abc  
3 456 def  
4      

使いどころ

Direct I/O APIは、基本的には「Direct I/O APIでないとどうしようもない」という箇所で使うべき。[2015-04-24]

やろうと思えばOperatorの中だけで全てのファイルを読み込めるので、「フローとは何なのか」状態になってしまう(苦笑)
AsakusaFWのフローの最適化も一切効かないだろうし。

それやこれやで、Direct I/O APIはサンドボックスに入っていて正式機能扱いではないのだと思う。


前述の例では、code_convertというファイルに入っている値を使ってトランザクションデータのコードを変換している。
この使い方であれば、MasterJoinUpdate演算子で実現できる。
(マスター側(code_convert)のインポーターでデータサイズをTINYにしておけば、サイドデータ結合(ハッシュジョイン)で結合されるし)

Direct I/O APIを使わないと無理そうなのは、例えばCoGroup演算子で3種類の入力をマッチさせたいが結合キーの種類が異なる場合。
CoGroupの引数では2種類だけを入力とし、3種類目だけをDirect I/O APIで読み込んでおく、というのは考えられる。(ただし当然小さいデータ(マスターデータ)に限る)
(そもそも2つの演算子に分割できないか、という事をまず考えるところだが)

また、MasterJoinUpdate演算子で結合キーが無い状態 「@Key(group = {})」で結合させて更新しているような処理だと、(キーの値が一種類という状態なので)実行時のHadoopのタスクが分散しない。
こういうときはUpdate演算子に変更してDirect I/O APIでマスターデータを読み込んでおく方が実行効率が良くなるかもしれない。


Direct I/Oへ戻る / AsakusaFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま