Asakusa FrameworkのDirect I/OのDataFilterのメモ。
|
AsakusaFW 0.7.3で、Direct I/Oのインポーターにフィルターを指定できるようになった。
これにより、入力ファイルのパスを条件にしてファイルを選択したり(パスフィルター)、必要なデータだけを取得したり(データフィルター)することが出来る。
(データフィルターは、WindGate JDBCにおけるgetConditionメソッドによるWHERE条件と似たような位置付けの機能)
なお、ジョブフローのテストでは、フィルターは適用されない。
フィルターを使う場合、インポーターにフィルタークラスを指定する。
そのため、フィルタークラスを事前に実装しておく必要がある。
フィルターはDataFilterクラスを継承して作成する。
DataFilterには、どのデータモデルを対象とするかをジェネリクスで指定する。
以下の例は、データモデルItemInfoを対象とするDataFilter。
package com.example.jobflow; import com.asakusafw.runtime.directio.DataFilter; import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilter extends DataFilter<ItemInfo> {
// バッチ引数の取得
// パスフィルター
// データフィルター
}
package com.example.jobflow; import com.asakusafw.runtime.directio.DataFilter; import com.example.modelgen.dmdl.csv.AbstractItemInfoCsvInputDescription;
/**
* 商品マスターをDirect I/Oで入力する。
*/
public class ItemInfoFromCsv extends AbstractItemInfoCsvInputDescription {
@Override
public String getBasePath() {
return "master";
}
@Override
public String getResourcePattern() {
return "*/item_info.csv";
}
@Override
public Class<? extends DataFilter<?>> getFilter() {
return ItemInfoDataFilter.class;
}
@Override
public DataSize getDataSize() {
return DataSize.SMALL;
}
}
インポーターで指定するDataSizeは、フィルタリング後のデータのサイズを示すことになる。
なお、ジョブフローのテストでは、ここで指定したフィルタークラスは一切使用されないので注意。
(パスフィルターが適用されない理由は、フローのテストを実行する時には、入出力ファイルのパスをAsakusaFWが自動的に設定するので、フィルターに適するパスになるとは限らない為。
プログラマーがパスを指定するのも面倒そうだし)
データフィルターも適用されないので、ジョブフローテスト用の入力データはフィルター済みのものを用意する必要がある。
入力ファイルのパスを使ってフィルタリングする例。
package com.example.jobflow; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.asakusafw.runtime.directio.DataFilter; import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilter extends DataFilter<Object> {
private static final Pattern PATH_PATTERN = Pattern.compile(".*/(?<dir>[^/]*)/.*\\.csv");
@Override
public boolean acceptsPath(String path) {
Matcher matcher = PATH_PATTERN.matcher(path);
if (matcher.matches()) {
String dir = matcher.group("dir");
return dir.equals("latest");
}
return false;
}
}
パスフィルターではデータモデルを使わないので、DataFilterの型引数には適当なクラス(Objectでよい)を指定しておく。
パスフィルターでは、DataFilterのacceptsPathメソッドをオーバーライドする。
引数のpathが入力ファイルのパス。
このメソッドでtrueを返すと、そのファイルは読み込み対象になる。
この例では正規表現を使い、パスをスラッシュ「/」で区切ってファイル名の直前のディレクトリー名を取得し、条件判定をしている。
上記の例だと、インポーターのgetResourcePatternメソッドに「latest/item_info.csv」と書いておけば、パスフィルターで判定する意味は無い^^;
getResourcePattern()ではバッチ引数を使うことも出来るので、「${date}/item_info.csv」のように書けば、バッチ引数dateで指定したディレクトリーのファイルだけを対象とすることも出来る。
しかし、ディレクトリー名を範囲で指定したいような場合は、このような記述方法では対応できない。パスフィルターの出番である。
入力データでフィルタリングする例。
条件に合致するデータだけを入力データとして有効にする。
package com.example.jobflow; import com.asakusafw.runtime.directio.DataFilter; import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilter extends DataFilter<ItemInfo> {
@Override
public boolean acceptsData(ItemInfo data) {
// 削除日時が入っていないものだけ抽出
return data.getDeleteDateOption().isNull();
}
}
データフィルターではデータモデルを使うので、DataFilterの型引数には対象のデータモデルクラスを指定しておく。
データフィルターでは、DataFilterのacceptsDataメソッドをオーバーライドする。
引数のdataがデータモデル。
このメソッドでtrueを返すと、そのデータは有効になる。
ジョブフローのテストではフィルターは呼ばれないので、フィルター済みのデータを用意する必要がある。
このデータフィルター機能を使わなくても、自分のジョブフローの先頭でBranch等の演算子を使ってデータを絞る(フィルターする)ことは出来る。
しかし現状のAsakusaFWでは、演算子を通すとデータサイズの情報が消えてしまう。
すなわち、インポーターのデータサイズにTINYを指定している場合、そのデータを直接使うMasterJoin系演算子では(効率の良い)サイドデータ結合(ハッシュジョイン)になるのだが、他の演算子をはさんでしまうとサイドデータ結合にならない。
(サイドデータジョインにしたい場合、別ジョブで前処理して小さなデータとして出力し、インポーターでTINYを指定して読み込む、というのも定石のひとつだった)
データフィルター機能を使うと、データをフィルターした後でもインポーターで指定したデータサイズ情報が残る。
バッチ引数を取得する例。
DataFilterではBatchContext(フレームワークAPI)を使用することは出来ないが、initializeメソッドでバッチ引数を受け取ることが出来る。
package com.example.jobflow; import java.text.MessageFormat; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.asakusafw.runtime.directio.DataFilter; import com.asakusafw.runtime.value.StringOption; import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilter extends DataFilter<ItemInfo> {
private String startDate;
private String endDate;
private StringOption category;
@Override
public void initialize(Context context) {
Map<String, String> args = context.getBatchArguments();
this.startDate = getValue(args, "startDate");
this.endDate = getValue(args, "endDate");
this.category = new StringOption(getValue(args, "category"));
}
private String getValue(Map<String, String> args, String key) {
String value = args.get(key);
if (value == null) {
throw new IllegalArgumentException(MessageFormat.format("-A {0} not specified", key));
}
return value;
}
private static final Pattern PATH_PATTERN = Pattern.compile(".*/(?<dir>[^/]*)/.*\\.csv");
@Override
public boolean acceptsPath(String path) {
Matcher matcher = PATH_PATTERN.matcher(path);
if (matcher.matches()) {
String dir = matcher.group("dir");
return startDate.compareTo(dir) <= 0 && dir.compareTo(endDate) <= 0;
}
return false;
}
@Override
public boolean acceptsData(ItemInfo data) {
return category.equals(data.getCategoryCodeOption());
}
}
initializeメソッドでContextを受け取り、そこからバッチ引数が取得できる。(ContextはDataFilter内で定義されている内部クラスなので、importせずに使える)
これをフィールドに保持しておき、パスフィルターメソッドやデータフィルターメソッドから使用する。
ただしこれらのフィルターメソッドはスレッドセーフではない(一応、マルチスレッドで呼ばれる可能性がある)ので、フィールドに保持するものはスレッドセーフなものだけにするよう気をつける必要がある。
(Patternのインスタンスはスレッドセーフなのでフィールドに保持していても問題ない。StringOptionも大丈夫)
DataFilterのテストは、通常のJavaのクラスのテストと同じ。
package com.example.jobflow; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import java.util.HashMap; import java.util.Map; import org.junit.Test; import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilterTest {
@Test
public void testAcceptsPath() {
ItemInfoDataFilter filter = new ItemInfoDataFilter();
Map<String, String> args = new HashMap<>();
args.put("startDate", "20150301");
args.put("endDate", "20150430");
args.put("category", "123");
filter.initialize(new ItemInfoDataFilter.Context(args));
assertThat(filter.acceptsPath("hadoop/20150228/item_info.csv"), is(false));
assertThat(filter.acceptsPath("hadoop/20150301/item_info.csv"), is(true));
assertThat(filter.acceptsPath("hadoop/20150331/item_info.csv"), is(true));
assertThat(filter.acceptsPath("hadoop/20150401/item_info.csv"), is(true));
assertThat(filter.acceptsPath("hadoop/20150430/item_info.csv"), is(true));
assertThat(filter.acceptsPath("hadoop/20150501/item_info.csv"), is(false));
}
@Test
public void testAcceptsData() {
ItemInfoDataFilter filter = new ItemInfoDataFilter();
Map<String, String> args = new HashMap<>();
args.put("startDate", "20150301");
args.put("endDate", "20150430");
args.put("category", "123");
filter.initialize(new ItemInfoDataFilter.Context(args));
ItemInfo data = new ItemInfo();
data.setCategoryCodeAsString("123");
assertThat(filter.acceptsData(data), is(true));
data.setCategoryCodeAsString("456");
assertThat(filter.acceptsData(data), is(false));
}
}