Asakusa FrameworkのDirect I/Oのメモ。
|
|
|
|
Direct I/OはHDFS(あるいはMapR FSやAmazon S3)上のファイルを読み書きする機能。
(Asakusaアプリケーションを実行したマシン上のローカルファイルを読み書きしたい場合はWindGateを使う)
AsakusaFWの歴史的な経緯により、Direct I/OはWindGateより後に使えるようになった。
Direct I/OのDirectの意味は、「WindGateのような連携コンポーネントを介さず、直接HDFSにアクセスする」といったニュアンスだと思う。
(Direct I/OはHDFS APIを使って普通にHDFS上のファイルにアクセスしており、特殊なことをしてファイルシステムに直接アクセスしている訳ではない)
DMDLファイル上のデータモデルに@directio.csv等の属性を付けることにより、Direct I/Oを使用する為のクラスが自動生成される。
その生成されたクラスをジョブフローのImporter/Exporterとして記述することで、HDFS上のファイルが読み書きできる。
Direct I/Oを使ったAsakusaアプリケーションを作成するには、0.6.0より前はMavenを使ってDirect I/Oアーキタイプを指定する必要があった。[2014-12-13]
0.6.0以降はGradleを使うようになり、アプリケーションプロジェクトのテンプレートファイルが提供されるようになった。
テンプレートファイルではデフォルトでDirect I/Oが使える。
(テンプレートファイルのダウンロードや新規プロジェクトの作成は、Shafuを使うのが便利)
Direct I/O用のImporter/Exporterクラスを作成する場合、DMDLの属性を利用するのが便利。[2015-03-15]
データモデル定義(dmdlファイル)に@directioの属性を付けておくと、DMDLのコンパイル時にImporter/Exporterの雛形クラスが生成される。
このクラスを継承して具体的なImporter/Exporterクラスを作成する。
※全て自前でImporter/Exporterクラスを作るのであれば、雛形クラスを生成する必要は無いので、@directio属性を付けなくてよい。→独自にテキストファイルを読み込むImporterを作成する例
@directio.csv example = { 〜 };
Importerの例 | Exporterの例 |
---|---|
public class ExampleFromCsv extends
AbstractExampleCsvInputDescription { |
public class ExampleToCsv extends
AbstractExampleCsvOutputDescription { |
ベースパス・リソースパターンや削除パターンは、Direct I/O固有の定義。入出力ファイルのパスを指定する。
→ベースパスとリソースパターンの使い分け
Direct
I/Oでは、ファイルを出力する前に既存ファイルを削除する機能がある。その削除するファイル名のパターンをgetDeletePatternsメソッドで指定する。
削除パターンには入力ファイル用のリソースパターンと同じ形式が使える。
Direct I/OのImporter/Exporterクラスには、入出力ファイルの場所をベースパスとリソースパターンで指定する。[2015-03-15]
→Direct I/Oユーザーガイドのファイルの入出力
Importerの例 | Exporterの例 |
---|---|
public class ItemInfoFromCsv extends
AbstractItemInfoCsvInputDescription { |
public class CategorySummaryToCsv extends
AbstractCategorySummaryCsvOutputDescription { |
Asakusaアプリケーションの実行時には、ベースパスとリソースパターン(および、asakusa-resources.xml)を組み合わせて入出力ファイルのパスを生成する。
(→ベースパスと実際のパス)
ベースパスとリソースパターンの使い分けは、Hadoopの入出力方法を例に考えると分かり易いと思う。
Hadoopでは、出力にパス(ディレクトリー)を指定すると、その下にpart-m-00000とか00001とかのファイルを出力する。
例えば、「output/hoge」を指定すると、「output/hoge/part-m-00000」が作られる。
この「output/hoge」がベースパスに当たり、「part-m-00000」の部分がリソースパターンに当たる。
「part-m-00000」というファイル名にしようと思ったら、リソースパターンに「part-m-*」を指定する。という具合に、ファイル名(特に可変部分)を指定するのがリソースパターン。
つまり、Direct I/Oでは、ベースパスだけで出力先(論理的なファイル)を区別する。
したがって、ベースパスは各論理ファイル(Exporter)毎に一意になるように(重複しないように)設定する必要がある。
複数種類のファイルを出力するバッチの場合、その出力先のベースパスが重複していると、バッチのコンパイル時にエラーになる。
11:47:14 ERROR 2つの出力のベースパスが重複しています: com.example.jobflow.CategorySummary2ToCsv[result/category] <-> com.example.jobflow.CategorySummaryToCsv[result/category]
11:47:14 ERROR コンパイルはエラーにより中断しました (com.example.batch.SummarizeBatch2)
java.io.IOException: フローの入出力が正しくないため、コンパイルを中止します (2つの出力のベースパスが重複しています: com.example.jobflow.CategorySummary2ToCsv[result/category] <-> com.example.jobflow.CategorySummaryToCsv[result/category])
at com.asakusafw.compiler.flow.FlowCompiler.validate(FlowCompiler.java:172) ~[ashigel-compiler-0.7.0-hadoop1.jar:na]
at com.asakusafw.compiler.flow.FlowCompiler.compile(FlowCompiler.java:84) ~[ashigel-compiler-0.7.0-hadoop1.jar:na]
at com.asakusafw.compiler.batch.processor.JobFlowWorkDescriptionProcessor.build(JobFlowWorkDescriptionProcessor.java:67) ~[ashigel-compiler-0.7.0-hadoop1.jar:na]
at com.asakusafw.compiler.batch.processor.JobFlowWorkDescriptionProcessor.process(JobFlowWorkDescriptionProcessor.java:55) ~[ashigel-compiler-0.7.0-hadoop1.jar:na]
at com.asakusafw.compiler.batch.processor.JobFlowWorkDescriptionProcessor.process(JobFlowWorkDescriptionProcessor.java:39) ~[ashigel-compiler-0.7.0-hadoop1.jar:na]
at com.asakusafw.compiler.batch.BatchCompiler.processUnit(BatchCompiler.java:109) ~[ashigel-compiler-0.7.0-hadoop1.jar:na]
at com.asakusafw.compiler.batch.BatchCompiler.processUnits(BatchCompiler.java:98) ~[ashigel-compiler-0.7.0-hadoop1.jar:na]
at com.asakusafw.compiler.batch.BatchCompiler.compile(BatchCompiler.java:60) ~[ashigel-compiler-0.7.0-hadoop1.jar:na]
at com.asakusafw.compiler.testing.DirectBatchCompiler.compile(DirectBatchCompiler.java:105) ~[ashigel-compiler-0.7.0-hadoop1.jar:na]
at jp.hishidama.asakusafw_wrapper.batch.BatchCompilerDriver.compile(BatchCompilerDriver.java:183) [dmdlparser-caller.jar:na]
at jp.hishidama.asakusafw_wrapper.batch.BatchCompilerDriver.start(BatchCompilerDriver.java:100) [dmdlparser-caller.jar:na]
at jp.hishidama.asakusafw_wrapper.batch.BatchCompilerDriver.main(BatchCompilerDriver.java:41) [dmdlparser-caller.jar:na]
11:47:14 ERROR バッチをコンパイルする際にエラーが発生しました: [com.example.batch.SummarizeBatch2]
こういう出力先の重複チェックが入っているのは、Direct I/Oのトランザクション制御の一環らしい。
→Direct I/Oユーザーガイドの簡易的な出力のトランザクション
基本的には、出力のリソースパターン(ファイル名)には「*」を入れておくのがよい。実行基盤によって複数のファイルが出力されるから。[/2017-04-30]
「*」が入っていない場合、実行基盤が出力した複数ファイルをひとつにまとめる処理が入るので、ファイルサイズが大きいと実行が遅くなる。
Exporterのリソースパターンに「*」が入っている場合、実行基盤のタスク毎にファイルが出力される。[2021-06-08]
基本的には、Asakusa on M3BPならスレッド数、Asakusa
on SparkならExecutorのタスク数に分かれる。
(各スレッド・タスクがそれぞれファイルを出力するので、ファイルを統合したりする処理が無くて早い)
ただし、Asakusa on
Sparkの場合、デフォルトでは並列数が2になっているので、出力ファイルは2分割しかされない(2ファイルに集約されるので、集約処理が入って遅くなる)。
com.asakusafw.spark.parallelismを設定すると、概ねその個数のファイルに分割される。(com.asakusafw.spark.parallelismのデフォルト値は2)
ワイルドカード「*」の他にも、リソースパターンに使用できる特殊な記号がある。
(リソースパターンに指定できる特殊な記号は、ImporterとExporterで異なる。→Direct I/Oユーザーガイドの入力ファイル名のパターン・出力ファイル名のパターン)
例えば、出力データの値(日付とかIDとか)をディレクトリー名やファイル名に使用することも出来る。
ただし、これはソート処理を伴うので、「*」と併用することは出来ない。
出力データの値を使いつつファイルを分割したい場合は、「[開始番号..終了番号]
」でランダムな値を付けることが出来る。
Direct I/Oユーザーガイドを見ると、ベースパスには「論理的なパス(論理パス)」を指定する、とある。[2015-03-15]
Asakusaアプリケーションの実行時には、論理パスを「実際のファイルシステム上の具体的なパス(ファイルシステムパス)」に変換して使用される。
これは、プログラム内には論理パス(のみ)を指定しておき、実行環境(テスト環境・ステージング環境・本番環境等)に応じて具体的なパスを切り替えられるようにする為だと思われる。
Asakusaアプリケーションを実行するには、実行環境にAsakusa Framework一式をインストールしておく必要がある。(その場所を環境変数ASAKUSA_HOMEで指定する)
Direct I/Oで論理パスをファイルシステムパスへ変換するには、ASAKUSA_HOME/core/conf/asakusa-resources.xmlが使われる。
→Direct I/Oユーザーガイドのデータソースの設定
Asakusaアプリケーションを実行するときにはその環境のasakusa-resources.xmlが読み込まれるので、環境毎に異なるファイルシステムパスを指定することが出来るわけだ。
Direct I/Oでは、asakusa-resources.xml内にファイルシステムパスを複数定義することが出来る。その定義を「データソース」と呼んでいるようだ。
例えば論理パスが「/」になるデータソースを定義してみる。名前(データソースID)は「root」としておこう。
すると、このデータソースのasakusa-resources.xml内の定義方法は以下のようになる。
<!-- rootデータソースの実装クラス名 --> <property> <name>com.asakusafw.directio.root</name> <value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value> </property> <!-- rootデータソースの論理パス --> <property> <name>com.asakusafw.directio.root.path</name> <value>/</value> </property> <!-- rootデータソースのファイルシステムパス --> <property> <name>com.asakusafw.directio.root.fs.path</name> <value>hdfs://hadoopCluster:8020/user/hishidama</value> </property>
実装クラス名は、HDFSを使う場合はHadoopDataSourceで固定。
Imorter/Exporterのベースパスで指定されたパスのうち、先頭部分がデータソース定義で指定された論理パスに一致すると、その部分がデータソース定義で指定されたファイルシステムパスに置換され、具体的なパスとなる。
例えばベースパスが「master」の場合、「/master」と見なされ、先頭の「/」が置換されて「hdfs://hadoopCluster:8020/user/hishidama/master」となる。
ちなみにデータソース定義で指定されたファイルシステムパスが「target/testing/directio」だと、変換された具体的なパスは「target/testing/directio/master」のようになる。
これは相対パスと見なされ、テスト環境(UNIX)だと実際には「$HOME/target/testing/directio/master」になるし、HDFSだったら実行したユーザーのホームディレクトリー下(例えば「hdfs://hadoopCluster:8020/user/hishidama/target/testing/directio/master」)になる。
もうひとつ、「result」というデータソースの例。
<!-- rootデータソースの定義 --> 〜 <!-- resultデータソースの実装クラス名 --> <property> <name>com.asakusafw.directio.result</name> <value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value> </property> <!-- resultデータソースの論理パス --> <property> <name>com.asakusafw.directio.result.path</name> <value>result</value> </property> <!-- resultデータソースのファイルシステムパス --> <property> <name>com.asakusafw.directio.result.fs.path</name> <value>hdfs://hadoopCluster:8020/user/hishidama/output</value> </property>
ベースパスが「result/category」の場合、先頭の「result」が置換されて「hdfs://hadoopCluster:8020/user/hishidama/output/category」となる。
asakura-resources.xml内に複数のデータソースが定義されている場合、一致する論理パスの中で最も長いものが使われる、と思われる。
ベースパスが「result/category」の場合、論理パス「result」と一致したのでresultデータソースが使われたが、このデータソース定義が無かったらrootデータソースと一致し、そちらが使われる。
さらにもうひとつ、「category」というデータソースの例。
<!-- rootデータソースの定義 --> 〜 <!-- categoryデータソースの実装クラス名 --> <property> <name>com.asakusafw.directio.category</name> <value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value> </property> <!-- categoryデータソースの論理パス --> <property> <name>com.asakusafw.directio.category.path</name> <value>result/category</value> </property> <!-- categoryデータソースのファイルシステムパス --> <property> <name>com.asakusafw.directio.category.fs.path</name> <value>hdfs://hadoopCluster:8020/user/hishidama/output/summary</value> </property>
ベースパスが「result/category」の場合、先頭(というか全体だが^^;)の「result/category」が置換されて「hdfs://hadoopCluster:8020/user/hishidama/output/summary」となる。
このように、入出力するファイルの種類数分だけデータソースを定義することも可能である。
(つまり、Importer/Exporterのベースパスには論理ファイル名だけを書いておき、asakusa-resources.xmlで全ファイルのファイルシステムパスを定義する、といった使い方が出来る)