CascadingのTapクラスについて。
Cascadingにおいて、データの入出力先(つまりファイル)を表すのがTap。
Cascadingでは、データの入力元をsource、出力先をsinkと呼ぶ。
sourceのデータがPipeに渡され、Pipeの最終的な出力先がsinkになる。
実際にファイルを表すには、Hfsクラスを使う。
Hfsに渡すパス(URI)にプロトコル(fileやhdfs等)を指定することで、Hadoopで扱えるURIを使うことが出来る。
sourceは、Tapをデータの入力として使用する場合の呼び方。
import org.apache.hadoop.fs.Path; import cascading.scheme.TextLine; import cascading.tap.Hfs; import cascading.tap.Tap; import cascading.tuple.Fields;
String path = new Path("hdfs://localhost/user/hishidama/input").toUri().toString(); // String path = "hdfs://localhost/user/hishidama/input"; Tap source = new Hfs(new TextLine(new Fields("line")), path); FlowConnector flowConnector = new FlowConnector(); Flow flow = flowConnector.connect("flow", source, sink, pipe); flow.complete(); //実行開始
Hfsの第2引数pathにはパス(URI)を表すの文字列を渡す。
相対パス(スキーム指定なし)は扱えないので、絶対パス形式にする必要がある。
Hadoopが扱えるURIはPathクラスで判別してくれる(相対パスでもデフォルト値に応じてfileやhdfsの絶対パスに変換してくれる)ので、それを使うのがいいと思う。
しかしHfsは直接Pathを受け取ってくれないので、toString()で文字列に変換する必要がある。
しかもPath#toString()は相対パスだと相対パスのままなので、toUri()を使って絶対パス形式にしている。
なお、ディレクトリーを指定すると、その下にあるファイル全てからデータが読み込まれる。
Hfsの第1引数は、データを受け取る型を指定する。
TextLineは、ファイルから1行ずつデータを取得するときに使う。
TextLineのコンストラクターにFieldsを指定すると、その行データに名前を付けることができ、後続のPipe処理ではその名前を使ってデータを取得することが出来る。
引数 | 例 | 説明 |
---|---|---|
1個 | new TextLine(new Fields("line")) |
行データを「line」という名前で取得できる。 |
2個 | new TextLine(new Fields("offset", "line")) |
データの位置を「offset」、データ自体を「line」という名前で取得できる。 |
0個 | new TextLine() |
「new TextLine(TextLine.DEFAULT_SOURCE_FIELDS) 」すなわち「 new TextLine(new Fields("offset", "line")) 」と同じ。 |
その他 | IllegalArgumentException | TextLineのFieldsには、1個か2個しか項目を指定できない。 それ以外を指定すると例外が発生する。 |
要するに、TextLineはHadoopのTextInputFormatの出力をイメージしているのだろう。
(TextInputFormatを使うと、MapperにはキーがoffsetでデータがTextとなって渡されてくる)
Cascadingでは、複数のPipeを別々のsourceから入力させる事が出来る。
Tap source1 = new Hfs(new TextLine(new Fields("line")), path1); Tap source2 = new Hfs(new TextLine(new Fields("line")), path2); Map<String, Tap> sources = new HashMap<String, Tap>(); sources.put(pipe1.getName(), source1); sources.put(pipe2.getName(), source2); FlowConnector flowConnector = new FlowConnector(); Flow flow = flowConnector.connect("flow", sources, sink, pipe); flow.complete(); //実行開始
パイプの名前をキー、sourceをデータとするMapを用意し、それをFlowConnector#connect()に渡してやればよい。
Cascading1.1で、区切り文字を指定できるTextDelimitedが追加された。(TextLineの代わりに使用する)[2010-05-01]
import cascading.scheme.TextDelimited;
Tap source = new Hfs( new TextDelimited(new Fields("col1", "col2", "col3"), true, ","), path );
引数 | 説明 | デフォルト |
---|---|---|
fields | 区切られた各項目の名前を定義する。 | 必須 |
skipHeader | trueにすると、先頭行をスキップする。 | false |
delimiter | 区切り文字。 | 必須 |
strict | trueにすると、区切ったデータ数とフィールドの項目数が違う場合にエラーとする。 | true |
quote | 各項目を囲む文字。 | なし(null) |
types | 型の指定。各項目のデータを文字列から指定された型に変換する。 | なし(null) |
safe | falseにすると、typesによって変換する際の例外を無視する。 | true |
sinkは、Tapをデータの出力先として使用する場合の呼び方。
import org.apache.hadoop.fs.Path; import cascading.scheme.TextLine; import cascading.tap.Hfs; import cascading.tap.SinkMode; import cascading.tap.Tap;
String path = new Path("hdfs://localhost/user/hishidama/output").toUri().toString(); // String path = "hdfs://localhost/user/hishidama/output"; Tap sink = new Hfs(new TextLine(), path, SinkMode.REPLACE); FlowConnector flowConnector = new FlowConnector(); Flow flow = flowConnector.connect("flow", source, sink, pipe); flow.complete(); //実行開始
Hfsの第2引数pathについては、sourceのpathと同様。
Hfsの第1引数のTextLineもsourceのTextLineとほぼ同様だが、
入力項目名は使わないので、TextLineに項目名を指定する必要は無い(デフォルト状態でよい)。
デフォルトでは、出力時はPipeのデータ(全項目)を項目毎にタブ区切りで結合した1つの文字列として出力される。(タブ区切りはTuple#toString()の中で行
われている)
→出力形式の加工
Hfsの第3引数には、出力モード(SinkMode列挙型)を指定できる。
SinkMode | 説明 |
---|---|
KEEP | デフォルト。 出力先ディレクトリーが既に存在していたら、FileAlreadyExistsExceptionが発生する。 (Hadoopのデフォルトの処理と同じ) |
REPLACE | 出力先ディレクトリーが既に存在していたら、削除してから実行する。これは便利。 |
APPEND | 出力先が既に存在していたら、追加する。 ファイルシステムがそれに対応している場合のみ有効。 Cascading1.1で非推奨になった(UPDATEを使用する)。[2010-04-25] |
UPDATE | Cascading1.1から使用可能。[2010-04-25] DB等でデータを更新(上書き)することを示す。 |
sinkのファイルを指定する際に、出力する項目を指定する(絞り込む)ことも出来る。
TextLineの第1引数の項目群は入力用の名前付けで、第2引数の項目群が出力する項目名。
import cascading.scheme.TextLine;
Tap sink = new Hfs( new TextLine(TextLine.DEFAULT_SOURCE_FIELDS, new Fields("col1", "col2", "col3")), path, SinkMode.REPLACE );
また、デフォルトでは出力はタブ区切りとなるが、これはTuple#toString()がそうなっている為。
したがって、Pipeの一番最後の処理の出力Tupleを専用のクラスにしてしまえばよい。
public class CommaFormatFunction extends BaseOperation<Object> implements Function<Object> { private static final long serialVersionUID = 1L; public CommaFormatFunction() { // 出力するフィールド名を指定 super(Fields.ALL); } @Override public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) { TupleEntry args = functionCall.getArguments(); Tuple tuple = new Tuple(args.getTuple()) { private static final long serialVersionUID = 1L; @Override public String toString() { return super.toString(", "); } }; functionCall.getOutputCollector().add(tuple); } }
pipe = new Each(pipe, new CommaFormatFunction());
入ってきたTupleから独自のTupleクラス(toString()をオーバーライド)にデータを置き換えている。
(toString()そのものは区切り文字を変更できないが、区切り文字を引数に指定できるメソッドはTuple内に用意されている。なんだか半端だなぁ)
このデータ変換処理は、Pipeの一番最後でないと意味が無い。
というのは、別の処理が後に入ると、そちらで新しいTupleインスタンスが作られる可能性があって、当然そちらではこのクラスは使われない為。
sink用のTapに出力項目を指定した場合、そこでもTupleが新しくなってしまうので、この方法との併用は出来ない。
しかしまぁたぶん、Cascading1.0での出力の整形に関しては、用意されている関数(FieldFormatterやFieldJoiner)を使うのがCascadingの意図に沿っているのだろう。[/2010-04-11]
これらの関数は、出力項目は1項目のみになる。
出力項目名を指定するコンストラクターもある。
import cascading.operation.text.FieldJoiner;
pipe = new Each(pipe, new FieldJoiner(",")); //カンマ区切りで全項目を結合
import cascading.operation.text.FieldFormatter;
pipe = new Each(pipe, new FieldFormatter("%s, %5d"));
Cascading1.1では、出力を整形できるTextDelimitedが追加された。[2010-05-01]
import cascading.scheme.TextDelimited;
Tap sink = new Hfs(
new TextDelimited(new Fields("col1", "col2", "col3"), ", "),
path,
SinkMode.REPLACE
);
出力する項目の絞り込み・順序と、区切る文字列を指定できる。
ただし、Fields.ALL等は指定できない。必ず項目を1つ以上指定する必要がある。
せっかく便利なクラスなのに、ちょっと片手落ちな感じがする…。
Exception in thread "main" java.lang.IllegalArgumentException: may not be zero declared fields, found: [{?}:ALL]
at cascading.scheme.TextDelimited.<init>(TextDelimited.java:442)
at cascading.scheme.TextDelimited.<init>(TextDelimited.java:364)
at cascading.scheme.TextDelimited.<init>(TextDelimited.java:107)
at jp.hishidama.hadoop.cascading.sample.Sample.main(Sample.java:253)