Asakusa FrameworkのDirect I/OのDataFilterのメモ。
|
AsakusaFW 0.7.3で、Direct I/Oのインポーターにフィルターを指定できるようになった。
これにより、入力ファイルのパスを条件にしてファイルを選択したり(パスフィルター)、必要なデータだけを取得したり(データフィルター)することが出来る。
(データフィルターは、WindGate JDBCにおけるgetConditionメソッドによるWHERE条件と似たような位置付けの機能)
なお、ジョブフローのテストでは、フィルターは適用されない。
フィルターを使う場合、インポーターにフィルタークラスを指定する。
そのため、フィルタークラスを事前に実装しておく必要がある。
フィルターはDataFilterクラスを継承して作成する。
DataFilterには、どのデータモデルを対象とするかをジェネリクスで指定する。
以下の例は、データモデルItemInfoを対象とするDataFilter。
package com.example.jobflow; import com.asakusafw.runtime.directio.DataFilter; import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilter extends DataFilter<ItemInfo> { // バッチ引数の取得 // パスフィルター // データフィルター }
package com.example.jobflow; import com.asakusafw.runtime.directio.DataFilter; import com.example.modelgen.dmdl.csv.AbstractItemInfoCsvInputDescription;
/** * 商品マスターをDirect I/Oで入力する。 */ public class ItemInfoFromCsv extends AbstractItemInfoCsvInputDescription { @Override public String getBasePath() { return "master"; } @Override public String getResourcePattern() { return "*/item_info.csv"; } @Override public Class<? extends DataFilter<?>> getFilter() { return ItemInfoDataFilter.class; } @Override public DataSize getDataSize() { return DataSize.SMALL; } }
インポーターで指定するDataSizeは、フィルタリング後のデータのサイズを示すことになる。
なお、ジョブフローのテストでは、ここで指定したフィルタークラスは一切使用されないので注意。
(パスフィルターが適用されない理由は、フローのテストを実行する時には、入出力ファイルのパスをAsakusaFWが自動的に設定するので、フィルターに適するパスになるとは限らない為。
プログラマーがパスを指定するのも面倒そうだし)
データフィルターも適用されないので、ジョブフローテスト用の入力データはフィルター済みのものを用意する必要がある。
入力ファイルのパスを使ってフィルタリングする例。
package com.example.jobflow; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.asakusafw.runtime.directio.DataFilter; import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilter extends DataFilter<Object> { private static final Pattern PATH_PATTERN = Pattern.compile(".*/(?<dir>[^/]*)/.*\\.csv"); @Override public boolean acceptsPath(String path) { Matcher matcher = PATH_PATTERN.matcher(path); if (matcher.matches()) { String dir = matcher.group("dir"); return dir.equals("latest"); } return false; } }
パスフィルターではデータモデルを使わないので、DataFilterの型引数には適当なクラス(Objectでよい)を指定しておく。
パスフィルターでは、DataFilterのacceptsPathメソッドをオーバーライドする。
引数のpathが入力ファイルのパス。
このメソッドでtrueを返すと、そのファイルは読み込み対象になる。
この例では正規表現を使い、パスをスラッシュ「/」で区切ってファイル名の直前のディレクトリー名を取得し、条件判定をしている。
上記の例だと、インポーターのgetResourcePatternメソッドに「latest/item_info.csv
」と書いておけば、パスフィルターで判定する意味は無い^^;
getResourcePattern()ではバッチ引数を使うことも出来るので、「${date}/item_info.csv
」のように書けば、バッチ引数dateで指定したディレクトリーのファイルだけを対象とすることも出来る。
しかし、ディレクトリー名を範囲で指定したいような場合は、このような記述方法では対応できない。パスフィルターの出番である。
入力データでフィルタリングする例。
条件に合致するデータだけを入力データとして有効にする。
package com.example.jobflow; import com.asakusafw.runtime.directio.DataFilter; import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilter extends DataFilter<ItemInfo> { @Override public boolean acceptsData(ItemInfo data) { // 削除日時が入っていないものだけ抽出 return data.getDeleteDateOption().isNull(); } }
データフィルターではデータモデルを使うので、DataFilterの型引数には対象のデータモデルクラスを指定しておく。
データフィルターでは、DataFilterのacceptsDataメソッドをオーバーライドする。
引数のdataがデータモデル。
このメソッドでtrueを返すと、そのデータは有効になる。
ジョブフローのテストではフィルターは呼ばれないので、フィルター済みのデータを用意する必要がある。
このデータフィルター機能を使わなくても、自分のジョブフローの先頭でBranch等の演算子を使ってデータを絞る(フィルターする)ことは出来る。
しかし現状のAsakusaFWでは、演算子を通すとデータサイズの情報が消えてしまう。
すなわち、インポーターのデータサイズにTINYを指定している場合、そのデータを直接使うMasterJoin系演算子では(効率の良い)サイドデータ結合(ハッシュジョイン)になるのだが、他の演算子をはさんでしまうとサイドデータ結合にならない。
(サイドデータジョインにしたい場合、別ジョブで前処理して小さなデータとして出力し、インポーターでTINYを指定して読み込む、というのも定石のひとつだった)
データフィルター機能を使うと、データをフィルターした後でもインポーターで指定したデータサイズ情報が残る。
バッチ引数を取得する例。
DataFilterではBatchContext(フレームワークAPI)を使用することは出来ないが、initializeメソッドでバッチ引数を受け取ることが出来る。
package com.example.jobflow; import java.text.MessageFormat; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.asakusafw.runtime.directio.DataFilter; import com.asakusafw.runtime.value.StringOption; import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilter extends DataFilter<ItemInfo> { private String startDate; private String endDate; private StringOption category; @Override public void initialize(Context context) { Map<String, String> args = context.getBatchArguments(); this.startDate = getValue(args, "startDate"); this.endDate = getValue(args, "endDate"); this.category = new StringOption(getValue(args, "category")); } private String getValue(Map<String, String> args, String key) { String value = args.get(key); if (value == null) { throw new IllegalArgumentException(MessageFormat.format("-A {0} not specified", key)); } return value; }
private static final Pattern PATH_PATTERN = Pattern.compile(".*/(?<dir>[^/]*)/.*\\.csv"); @Override public boolean acceptsPath(String path) { Matcher matcher = PATH_PATTERN.matcher(path); if (matcher.matches()) { String dir = matcher.group("dir"); return startDate.compareTo(dir) <= 0 && dir.compareTo(endDate) <= 0; } return false; }
@Override public boolean acceptsData(ItemInfo data) { return category.equals(data.getCategoryCodeOption()); } }
initializeメソッドでContextを受け取り、そこからバッチ引数が取得できる。(ContextはDataFilter内で定義されている内部クラスなので、importせずに使える)
これをフィールドに保持しておき、パスフィルターメソッドやデータフィルターメソッドから使用する。
ただしこれらのフィルターメソッドはスレッドセーフではない(一応、マルチスレッドで呼ばれる可能性がある)ので、フィールドに保持するものはスレッドセーフなものだけにするよう気をつける必要がある。
(Patternのインスタンスはスレッドセーフなのでフィールドに保持していても問題ない。StringOptionも大丈夫)
DataFilterのテストは、通常のJavaのクラスのテストと同じ。
package com.example.jobflow; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import java.util.HashMap; import java.util.Map; import org.junit.Test; import com.example.modelgen.dmdl.model.ItemInfo;
public class ItemInfoDataFilterTest { @Test public void testAcceptsPath() { ItemInfoDataFilter filter = new ItemInfoDataFilter(); Map<String, String> args = new HashMap<>(); args.put("startDate", "20150301"); args.put("endDate", "20150430"); args.put("category", "123"); filter.initialize(new ItemInfoDataFilter.Context(args)); assertThat(filter.acceptsPath("hadoop/20150228/item_info.csv"), is(false)); assertThat(filter.acceptsPath("hadoop/20150301/item_info.csv"), is(true)); assertThat(filter.acceptsPath("hadoop/20150331/item_info.csv"), is(true)); assertThat(filter.acceptsPath("hadoop/20150401/item_info.csv"), is(true)); assertThat(filter.acceptsPath("hadoop/20150430/item_info.csv"), is(true)); assertThat(filter.acceptsPath("hadoop/20150501/item_info.csv"), is(false)); }
@Test public void testAcceptsData() { ItemInfoDataFilter filter = new ItemInfoDataFilter(); Map<String, String> args = new HashMap<>(); args.put("startDate", "20150301"); args.put("endDate", "20150430"); args.put("category", "123"); filter.initialize(new ItemInfoDataFilter.Context(args)); ItemInfo data = new ItemInfo(); data.setCategoryCodeAsString("123"); assertThat(filter.acceptsData(data), is(true)); data.setCategoryCodeAsString("456"); assertThat(filter.acceptsData(data), is(false)); } }