S-JIS[2015-04-15] 変更履歴

Asakusa Framework Direct I/O DataFilter

Asakusa FrameworkDirect I/OのDataFilterのメモ。


概要

AsakusaFW 0.7.3で、Direct I/Oのインポーターにフィルターを指定できるようになった。
これにより、入力ファイルのパスを条件にしてファイルを選択したり(パスフィルター)、必要なデータだけを取得したり(データフィルター)することが出来る。
(データフィルターは、WindGate JDBCにおけるgetConditionメソッドによるWHERE条件と似たような位置付けの機能)

なお、ジョブフローのテストでは、フィルターは適用されない。


インポーター記述の例

フィルターを使う場合、インポーターにフィルタークラスを指定する。
そのため、フィルタークラスを事前に実装しておく必要がある。

フィルターはDataFilterクラスを継承して作成する。
DataFilterには、どのデータモデルを対象とするかをジェネリクスで指定する。

以下の例は、データモデルItemInfoを対象とするDataFilter。

ItemInfoDataFilter.java:

package com.example.jobflow;

import com.asakusafw.runtime.directio.DataFilter;

import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilter extends DataFilter<ItemInfo> {
	// バッチ引数の取得
	// パスフィルター
	// データフィルター
}

ItemInfoFromCsv.java(インポーター):

package com.example.jobflow;

import com.asakusafw.runtime.directio.DataFilter;

import com.example.modelgen.dmdl.csv.AbstractItemInfoCsvInputDescription;
/**
 * 商品マスターをDirect I/Oで入力する。
 */
public class ItemInfoFromCsv extends AbstractItemInfoCsvInputDescription {

	@Override
	public String getBasePath() {
		return "master";
	}

	@Override
	public String getResourcePattern() {
		return "*/item_info.csv";
	}

	@Override
	public Class<? extends DataFilter<?>> getFilter() {
		return ItemInfoDataFilter.class;
	}

	@Override
	public DataSize getDataSize() {
		return DataSize.SMALL;
	}
}

インポーターで指定するDataSizeは、フィルタリング後のデータのサイズを示すことになる。


なお、ジョブフローのテストでは、ここで指定したフィルタークラスは一切使用されないので注意。

パスフィルターが適用されない理由は、フローのテストを実行する時には、入出力ファイルのパスをAsakusaFWが自動的に設定するので、フィルターに適するパスになるとは限らない為。
 プログラマーがパスを指定するのも面倒そうだし)

データフィルターも適用されないので、ジョブフローテスト用の入力データはフィルター済みのものを用意する必要がある。


パスフィルターの例

入力ファイルのパスを使ってフィルタリングする例。

ItemInfoDataFilter.java:

package com.example.jobflow;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.asakusafw.runtime.directio.DataFilter;

import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilter extends DataFilter<Object> {

	private static final Pattern PATH_PATTERN = Pattern.compile(".*/(?<dir>[^/]*)/.*\\.csv");

	@Override
	public boolean acceptsPath(String path) {
		Matcher matcher = PATH_PATTERN.matcher(path);
		if (matcher.matches()) {
			String dir = matcher.group("dir");
			return dir.equals("latest");
		}
		return false;
	}
}

パスフィルターではデータモデルを使わないので、DataFilterの型引数には適当なクラス(Objectでよい)を指定しておく。

パスフィルターでは、DataFilterのacceptsPathメソッドをオーバーライドする。
引数のpathが入力ファイルのパス。
このメソッドでtrueを返すと、そのファイルは読み込み対象になる。

この例では正規表現を使い、パスをスラッシュ「/」で区切ってファイル名の直前のディレクトリー名を取得し、条件判定をしている。


上記の例だと、インポーターのgetResourcePatternメソッドに「latest/item_info.csv」と書いておけば、パスフィルターで判定する意味は無い^^;
getResourcePattern()ではバッチ引数を使うことも出来るので、「${date}/item_info.csv」のように書けば、バッチ引数dateで指定したディレクトリーのファイルだけを対象とすることも出来る。
しかし、ディレクトリー名を範囲で指定したいような場合は、このような記述方法では対応できない。パスフィルターの出番である。


データフィルターの例

入力データでフィルタリングする例。
条件に合致するデータだけを入力データとして有効にする。

ItemInfoDataFilter.java:

package com.example.jobflow;

import com.asakusafw.runtime.directio.DataFilter;

import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilter extends DataFilter<ItemInfo> {

	@Override
	public boolean acceptsData(ItemInfo data) {
		// 削除日時が入っていないものだけ抽出
		return data.getDeleteDateOption().isNull();
	}
}

データフィルターではデータモデルを使うので、DataFilterの型引数には対象のデータモデルクラスを指定しておく。

データフィルターでは、DataFilterのacceptsDataメソッドをオーバーライドする。
引数のdataがデータモデル。
このメソッドでtrueを返すと、そのデータは有効になる。


ジョブフローのテストではフィルターは呼ばれないので、フィルター済みのデータを用意する必要がある。


このデータフィルター機能を使わなくても、自分のジョブフローの先頭でBranch等の演算子を使ってデータを絞る(フィルターする)ことは出来る。

しかし現状のAsakusaFWでは、演算子を通すとデータサイズの情報が消えてしまう。
すなわち、インポーターのデータサイズにTINYを指定している場合、そのデータを直接使うMasterJoin系演算子では(効率の良い)サイドデータ結合(ハッシュジョイン)になるのだが、他の演算子をはさんでしまうとサイドデータ結合にならない。
(サイドデータジョインにしたい場合、別ジョブで前処理して小さなデータとして出力し、インポーターでTINYを指定して読み込む、というのも定石のひとつだった)

データフィルター機能を使うと、データをフィルターした後でもインポーターで指定したデータサイズ情報が残る。


バッチ引数を使用する例

バッチ引数を取得する例。
DataFilterではBatchContext(フレームワークAPI)を使用することは出来ないが、initializeメソッドでバッチ引数を受け取ることが出来る。

ItemInfoDataFilter.java:

package com.example.jobflow;

import java.text.MessageFormat;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.asakusafw.runtime.directio.DataFilter;
import com.asakusafw.runtime.value.StringOption;

import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilter extends DataFilter<ItemInfo> {

	private String startDate;
	private String endDate;

	private StringOption category;

	@Override
	public void initialize(Context context) {
		Map<String, String> args = context.getBatchArguments();
		this.startDate = getValue(args, "startDate");
		this.endDate   = getValue(args, "endDate");
		this.category = new StringOption(getValue(args, "category"));
	}

	private String getValue(Map<String, String> args, String key) {
		String value = args.get(key);
		if (value == null) {
			throw new IllegalArgumentException(MessageFormat.format("-A {0} not specified", key));
		}
		return value;
	}
	private static final Pattern PATH_PATTERN = Pattern.compile(".*/(?<dir>[^/]*)/.*\\.csv");

	@Override
	public boolean acceptsPath(String path) {
		Matcher matcher = PATH_PATTERN.matcher(path);
		if (matcher.matches()) {
			String dir = matcher.group("dir");
			return startDate.compareTo(dir) <= 0 && dir.compareTo(endDate) <= 0;
		}
		return false;
	}
	@Override
	public boolean acceptsData(ItemInfo data) {
		return category.equals(data.getCategoryCodeOption());
	}
}

initializeメソッドでContextを受け取り、そこからバッチ引数が取得できる。(ContextはDataFilter内で定義されている内部クラスなので、importせずに使える)
これをフィールドに保持しておき、パスフィルターメソッドやデータフィルターメソッドから使用する。

ただしこれらのフィルターメソッドはスレッドセーフではない(一応、マルチスレッドで呼ばれる可能性がある)ので、フィールドに保持するものはスレッドセーフなものだけにするよう気をつける必要がある。
Patternのインスタンスはスレッドセーフなのでフィールドに保持していても問題ない。StringOptionも大丈夫)


DataFilterのテストは、通常のJavaのクラスのテストと同じ。

ItemInfoDataFilterTest.java:

package com.example.jobflow;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import java.util.HashMap;
import java.util.Map;

import org.junit.Test;

import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilterTest {

	@Test
	public void testAcceptsPath() {
		ItemInfoDataFilter filter = new ItemInfoDataFilter();

		Map<String, String> args = new HashMap<>();
		args.put("startDate", "20150301");
		args.put("endDate",   "20150430");
		args.put("category", "123");
		filter.initialize(new ItemInfoDataFilter.Context(args));

		assertThat(filter.acceptsPath("hadoop/20150228/item_info.csv"), is(false));
		assertThat(filter.acceptsPath("hadoop/20150301/item_info.csv"), is(true));
		assertThat(filter.acceptsPath("hadoop/20150331/item_info.csv"), is(true));
		assertThat(filter.acceptsPath("hadoop/20150401/item_info.csv"), is(true));
		assertThat(filter.acceptsPath("hadoop/20150430/item_info.csv"), is(true));
		assertThat(filter.acceptsPath("hadoop/20150501/item_info.csv"), is(false));
	}
	@Test
	public void testAcceptsData() {
		ItemInfoDataFilter filter = new ItemInfoDataFilter();

		Map<String, String> args = new HashMap<>();
		args.put("startDate", "20150301");
		args.put("endDate",   "20150430");
		args.put("category", "123");
		filter.initialize(new ItemInfoDataFilter.Context(args));

		ItemInfo data = new ItemInfo();
		data.setCategoryCodeAsString("123");
		assertThat(filter.acceptsData(data), is(true));
		data.setCategoryCodeAsString("456");
		assertThat(filter.acceptsData(data), is(false));
	}
}

Direct I/Oへ戻る / AsakusaFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま