CascadingのTapクラスについて。
Cascadingにおいて、データの入出力先(つまりファイル)を表すのがTap。
Cascadingでは、データの入力元をsource、出力先をsinkと呼ぶ。
sourceのデータがPipeに渡され、Pipeの最終的な出力先がsinkになる。
実際にファイルを表すには、Hfsクラスを使う。
Hfsに渡すパス(URI)にプロトコル(fileやhdfs等)を指定することで、Hadoopで扱えるURIを使うことが出来る。
sourceは、Tapをデータの入力として使用する場合の呼び方。
import org.apache.hadoop.fs.Path; import cascading.scheme.TextLine; import cascading.tap.Hfs; import cascading.tap.Tap; import cascading.tuple.Fields;
String path = new Path("hdfs://localhost/user/hishidama/input").toUri().toString();
// String path = "hdfs://localhost/user/hishidama/input";
Tap source = new Hfs(new TextLine(new Fields("line")), path);
FlowConnector flowConnector = new FlowConnector();
Flow flow = flowConnector.connect("flow", source, sink, pipe);
flow.complete(); //実行開始
Hfsの第2引数pathにはパス(URI)を表すの文字列を渡す。
相対パス(スキーム指定なし)は扱えないので、絶対パス形式にする必要がある。
Hadoopが扱えるURIはPathクラスで判別してくれる(相対パスでもデフォルト値に応じてfileやhdfsの絶対パスに変換してくれる)ので、それを使うのがいいと思う。
しかしHfsは直接Pathを受け取ってくれないので、toString()で文字列に変換する必要がある。
しかもPath#toString()は相対パスだと相対パスのままなので、toUri()を使って絶対パス形式にしている。
なお、ディレクトリーを指定すると、その下にあるファイル全てからデータが読み込まれる。
Hfsの第1引数は、データを受け取る型を指定する。
TextLineは、ファイルから1行ずつデータを取得するときに使う。
TextLineのコンストラクターにFieldsを指定すると、その行データに名前を付けることができ、後続のPipe処理ではその名前を使ってデータを取得することが出来る。
| 引数 | 例 | 説明 |
|---|---|---|
| 1個 | new TextLine(new Fields("line")) |
行データを「line」という名前で取得できる。 |
| 2個 | new TextLine(new Fields("offset", "line")) |
データの位置を「offset」、データ自体を「line」という名前で取得できる。 |
| 0個 | new TextLine() |
「new TextLine(TextLine.DEFAULT_SOURCE_FIELDS)」すなわち「 new TextLine(new Fields("offset", "line"))」と同じ。 |
| その他 | IllegalArgumentException | TextLineのFieldsには、1個か2個しか項目を指定できない。 それ以外を指定すると例外が発生する。 |
要するに、TextLineはHadoopのTextInputFormatの出力をイメージしているのだろう。
(TextInputFormatを使うと、MapperにはキーがoffsetでデータがTextとなって渡されてくる)
Cascadingでは、複数のPipeを別々のsourceから入力させる事が出来る。
Tap source1 = new Hfs(new TextLine(new Fields("line")), path1);
Tap source2 = new Hfs(new TextLine(new Fields("line")), path2);
Map<String, Tap> sources = new HashMap<String, Tap>();
sources.put(pipe1.getName(), source1);
sources.put(pipe2.getName(), source2);
FlowConnector flowConnector = new FlowConnector();
Flow flow = flowConnector.connect("flow", sources, sink, pipe);
flow.complete(); //実行開始
パイプの名前をキー、sourceをデータとするMapを用意し、それをFlowConnector#connect()に渡してやればよい。
Cascading1.1で、区切り文字を指定できるTextDelimitedが追加された。(TextLineの代わりに使用する)[2010-05-01]
import cascading.scheme.TextDelimited;
Tap source = new Hfs(
new TextDelimited(new Fields("col1", "col2", "col3"), true, ","),
path
);
| 引数 | 説明 | デフォルト |
|---|---|---|
| fields | 区切られた各項目の名前を定義する。 | 必須 |
| skipHeader | trueにすると、先頭行をスキップする。 | false |
| delimiter | 区切り文字。 | 必須 |
| strict | trueにすると、区切ったデータ数とフィールドの項目数が違う場合にエラーとする。 | true |
| quote | 各項目を囲む文字。 | なし(null) |
| types | 型の指定。各項目のデータを文字列から指定された型に変換する。 | なし(null) |
| safe | falseにすると、typesによって変換する際の例外を無視する。 | true |
sinkは、Tapをデータの出力先として使用する場合の呼び方。
import org.apache.hadoop.fs.Path; import cascading.scheme.TextLine; import cascading.tap.Hfs; import cascading.tap.SinkMode; import cascading.tap.Tap;
String path = new Path("hdfs://localhost/user/hishidama/output").toUri().toString();
// String path = "hdfs://localhost/user/hishidama/output";
Tap sink = new Hfs(new TextLine(), path, SinkMode.REPLACE);
FlowConnector flowConnector = new FlowConnector();
Flow flow = flowConnector.connect("flow", source, sink, pipe);
flow.complete(); //実行開始
Hfsの第2引数pathについては、sourceのpathと同様。
Hfsの第1引数のTextLineもsourceのTextLineとほぼ同様だが、
入力項目名は使わないので、TextLineに項目名を指定する必要は無い(デフォルト状態でよい)。
デフォルトでは、出力時はPipeのデータ(全項目)を項目毎にタブ区切りで結合した1つの文字列として出力される。(タブ区切りはTuple#toString()の中で行
われている)
→出力形式の加工
Hfsの第3引数には、出力モード(SinkMode列挙型)を指定できる。
| SinkMode | 説明 |
|---|---|
| KEEP | デフォルト。 出力先ディレクトリーが既に存在していたら、FileAlreadyExistsExceptionが発生する。 (Hadoopのデフォルトの処理と同じ) |
| REPLACE | 出力先ディレクトリーが既に存在していたら、削除してから実行する。これは便利。 |
| APPEND | 出力先が既に存在していたら、追加する。 ファイルシステムがそれに対応している場合のみ有効。 Cascading1.1で非推奨になった(UPDATEを使用する)。[2010-04-25] |
| UPDATE | Cascading1.1から使用可能。[2010-04-25] DB等でデータを更新(上書き)することを示す。 |
sinkのファイルを指定する際に、出力する項目を指定する(絞り込む)ことも出来る。
TextLineの第1引数の項目群は入力用の名前付けで、第2引数の項目群が出力する項目名。
import cascading.scheme.TextLine;
Tap sink = new Hfs(
new TextLine(TextLine.DEFAULT_SOURCE_FIELDS, new Fields("col1", "col2", "col3")),
path,
SinkMode.REPLACE
);
また、デフォルトでは出力はタブ区切りとなるが、これはTuple#toString()がそうなっている為。
したがって、Pipeの一番最後の処理の出力Tupleを専用のクラスにしてしまえばよい。
public class CommaFormatFunction extends BaseOperation<Object> implements Function<Object> {
private static final long serialVersionUID = 1L;
public CommaFormatFunction() {
// 出力するフィールド名を指定
super(Fields.ALL);
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {
TupleEntry args = functionCall.getArguments();
Tuple tuple = new Tuple(args.getTuple()) {
private static final long serialVersionUID = 1L;
@Override
public String toString() {
return super.toString(", ");
}
};
functionCall.getOutputCollector().add(tuple);
}
}
pipe = new Each(pipe, new CommaFormatFunction());
入ってきたTupleから独自のTupleクラス(toString()をオーバーライド)にデータを置き換えている。
(toString()そのものは区切り文字を変更できないが、区切り文字を引数に指定できるメソッドはTuple内に用意されている。なんだか半端だなぁ)
このデータ変換処理は、Pipeの一番最後でないと意味が無い。
というのは、別の処理が後に入ると、そちらで新しいTupleインスタンスが作られる可能性があって、当然そちらではこのクラスは使われない為。
sink用のTapに出力項目を指定した場合、そこでもTupleが新しくなってしまうので、この方法との併用は出来ない。
しかしまぁたぶん、Cascading1.0での出力の整形に関しては、用意されている関数(FieldFormatterやFieldJoiner)を使うのがCascadingの意図に沿っているのだろう。[/2010-04-11]
これらの関数は、出力項目は1項目のみになる。
出力項目名を指定するコンストラクターもある。
import cascading.operation.text.FieldJoiner;
pipe = new Each(pipe, new FieldJoiner(",")); //カンマ区切りで全項目を結合
import cascading.operation.text.FieldFormatter;
pipe = new Each(pipe, new FieldFormatter("%s, %5d"));
Cascading1.1では、出力を整形できるTextDelimitedが追加された。[2010-05-01]
import cascading.scheme.TextDelimited;
Tap sink = new Hfs(
new TextDelimited(new Fields("col1", "col2", "col3"), ", "),
path,
SinkMode.REPLACE
);
出力する項目の絞り込み・順序と、区切る文字列を指定できる。
ただし、Fields.ALL等は指定できない。必ず項目を1つ以上指定する必要がある。
せっかく便利なクラスなのに、ちょっと片手落ちな感じがする…。
Exception in thread "main" java.lang.IllegalArgumentException: may not be zero declared fields, found: [{?}:ALL]
at cascading.scheme.TextDelimited.<init>(TextDelimited.java:442)
at cascading.scheme.TextDelimited.<init>(TextDelimited.java:364)
at cascading.scheme.TextDelimited.<init>(TextDelimited.java:107)
at jp.hishidama.hadoop.cascading.sample.Sample.main(Sample.java:253)