S-JIS[2010-04-05/2010-05-01] 変更履歴

Cascading Tap

CascadingのTapクラスについて。

  • Cascading APIのTap
  • Cascading APIのHfs

Tapの概要

Cascadingにおいて、データの入出力先(つまりファイル)を表すのがTap。

Cascadingでは、データの入力元をsource、出力先をsinkと呼ぶ。
sourceのデータがPipeに渡され、Pipeの最終的な出力先がsinkになる。

実際にファイルを表すには、Hfsクラスを使う。
Hfsに渡すパス(URI)にプロトコル(fileやhdfs等)を指定することで、Hadoopで扱えるURIを使うことが出来る。


sourceの使用方法

sourceは、Tapをデータの入力として使用する場合の呼び方。

import org.apache.hadoop.fs.Path;

import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.Tap;
import cascading.tuple.Fields;
	String path = new Path("hdfs://localhost/user/hishidama/input").toUri().toString();
//	String path = "hdfs://localhost/user/hishidama/input";
	Tap source = new Hfs(new TextLine(new Fields("line")), path);

	FlowConnector flowConnector = new FlowConnector();
	Flow flow = flowConnector.connect("flow", source, sink, pipe);
	flow.complete();	//実行開始

Hfsの第2引数pathにはパス(URI)を表すの文字列を渡す。
相対パス(スキーム指定なし)は扱えないので、絶対パス形式にする必要がある。
Hadoopが扱えるURIはPathクラスで判別してくれる(相対パスでもデフォルト値に応じてfileやhdfsの絶対パスに変換してくれる)ので、それを使うのがいいと思う。
しかしHfsは直接Pathを受け取ってくれないので、toString()で文字列に変換する必要がある。
しかもPath#toString()は相対パスだと相対パスのままなので、toUri()を使って絶対パス形式にしている。
なお、ディレクトリーを指定すると、その下にあるファイル全てからデータが読み込まれる。

Hfsの第1引数は、データを受け取る型を指定する。
TextLineは、ファイルから1行ずつデータを取得するときに使う。
TextLineのコンストラクターにFieldsを指定すると、その行データに名前を付けることができ、後続のPipe処理ではその名前を使ってデータを取得することが出来る。

引数 説明
1個 new TextLine(new Fields("line")) 行データを「line」という名前で取得できる。
2個 new TextLine(new Fields("offset", "line")) データの位置を「offset」、データ自体を「line」という名前で取得できる。
0個 new TextLine() new TextLine(TextLine.DEFAULT_SOURCE_FIELDS)」すなわち
new TextLine(new Fields("offset", "line"))」と同じ。
その他 IllegalArgumentException TextLineのFieldsには、1個か2個しか項目を指定できない。
それ以外を指定すると例外が発生する。

要するに、TextLineはHadoopのTextInputFormatの出力をイメージしているのだろう。
TextInputFormatを使うと、MapperにはキーがoffsetでデータがTextとなって渡されてくる)


Cascadingでは、複数のPipeを別々のsourceから入力させる事が出来る。

	Tap source1 = new Hfs(new TextLine(new Fields("line")), path1);
	Tap source2 = new Hfs(new TextLine(new Fields("line")), path2);

	Map<String, Tap> sources = new HashMap<String, Tap>();
	sources.put(pipe1.getName(), source1);
	sources.put(pipe2.getName(), source2);

	FlowConnector flowConnector = new FlowConnector();
	Flow flow = flowConnector.connect("flow", sources, sink, pipe);
	flow.complete();	//実行開始

パイプの名前をキー、sourceをデータとするMapを用意し、それをFlowConnector#connect()に渡してやればよい。


TextDelimited

Cascading1.1で、区切り文字を指定できるTextDelimitedが追加された。(TextLineの代わりに使用する)[2010-05-01]

import cascading.scheme.TextDelimited;
	Tap source = new Hfs(
		new TextDelimited(new Fields("col1", "col2", "col3"), true, ","),
		path
	);
引数 説明 デフォルト
fields 区切られた各項目の名前を定義する。 必須
skipHeader trueにすると、先頭行をスキップする。 false
delimiter 区切り文字。 必須
strict trueにすると、区切ったデータ数とフィールドの項目数が違う場合にエラーとする。 true
quote 各項目を囲む文字。 なし(null)
types 型の指定。各項目のデータを文字列から指定された型に変換する。 なし(null)
safe falseにすると、typesによって変換する際の例外を無視する。 true

sinkとして使用するTextDelimited


sinkの使用方法

sinkは、Tapをデータの出力先として使用する場合の呼び方。

import org.apache.hadoop.fs.Path;

import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
	String path = new Path("hdfs://localhost/user/hishidama/output").toUri().toString();
//	String path = "hdfs://localhost/user/hishidama/output";
	Tap sink = new Hfs(new TextLine(), path, SinkMode.REPLACE);

	FlowConnector flowConnector = new FlowConnector();
	Flow flow = flowConnector.connect("flow", source, sink, pipe);
	flow.complete();	//実行開始

Hfsの第2引数pathについては、sourceのpathと同様。

Hfsの第1引数のTextLineもsourceのTextLineとほぼ同様だが、 入力項目名は使わないので、TextLineに項目名を指定する必要は無い(デフォルト状態でよい)。
デフォルトでは、出力時はPipeのデータ(全項目)を項目毎にタブ区切りで結合した1つの文字列として出力される。(タブ区切りはTuple#toString()の中で行 われている)
出力形式の加工

Hfsの第3引数には、出力モード(SinkMode列挙型)を指定できる。

SinkMode 説明
KEEP デフォルト。
出力先ディレクトリーが既に存在していたら、FileAlreadyExistsExceptionが発生する。
(Hadoopのデフォルトの処理と同じ)
REPLACE 出力先ディレクトリーが既に存在していたら、削除してから実行する。これは便利。
APPEND 出力先が既に存在していたら、追加する。
ファイルシステムがそれに対応している場合のみ有効。
Cascading1.1で非推奨になった(UPDATEを使用する)。[2010-04-25]
UPDATE Cascading1.1から使用可能。[2010-04-25]
DB等でデータを更新(上書き)することを示す。

出力形式の加工・整形

sinkのファイルを指定する際に、出力する項目を指定する(絞り込む)ことも出来る。
TextLineの第1引数の項目群は入力用の名前付けで、第2引数の項目群が出力する項目名。

import cascading.scheme.TextLine;
	Tap sink = new Hfs(
		new TextLine(TextLine.DEFAULT_SOURCE_FIELDS, new Fields("col1", "col2", "col3")),
		path,
		SinkMode.REPLACE
	);

また、デフォルトでは出力はタブ区切りとなるが、これはTuple#toString()がそうなっている為。

したがって、Pipeの一番最後の処理の出力Tupleを専用のクラスにしてしまえばよい。

public class CommaFormatFunction extends BaseOperation<Object> implements Function<Object> {
	private static final long serialVersionUID = 1L;

	public CommaFormatFunction() {
		// 出力するフィールド名を指定
		super(Fields.ALL);
	}

	@Override
	public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {

		TupleEntry args = functionCall.getArguments();
		Tuple tuple = new Tuple(args.getTuple()) {
			private static final long serialVersionUID = 1L;

			@Override
			public String toString() {
				return super.toString(", ");
			}
		};

		functionCall.getOutputCollector().add(tuple);
	}
}
		pipe = new Each(pipe, new CommaFormatFunction());

入ってきたTupleから独自のTupleクラス(toString()をオーバーライド)にデータを置き換えている。
(toString()そのものは区切り文字を変更できないが、区切り文字を引数に指定できるメソッドはTuple内に用意されている。なんだか半端だなぁ

このデータ変換処理は、Pipeの一番最後でないと意味が無い。
というのは、別の処理が後に入ると、そちらで新しいTupleインスタンスが作られる可能性があって、当然そちらではこのクラスは使われない為。

sink用のTapに出力項目を指定した場合、そこでもTupleが新しくなってしまうので、この方法との併用は出来ない。


しかしまぁたぶん、Cascading1.0での出力の整形に関しては、用意されている関数(FieldFormatterやFieldJoiner)を使うのがCascadingの意図に沿っているのだろう。[/2010-04-11]

これらの関数は、出力項目は1項目のみになる。
出力項目名を指定するコンストラクターもある。

FieldJoinerによる結合

import cascading.operation.text.FieldJoiner;
		pipe = new Each(pipe, new FieldJoiner(",")); //カンマ区切りで全項目を結合

FieldFormatterによる整形

import cascading.operation.text.FieldFormatter;
		pipe = new Each(pipe, new FieldFormatter("%s, %5d"));

String#format()


Cascading1.1では、出力を整形できるTextDelimitedが追加された。[2010-05-01]

import cascading.scheme.TextDelimited;
	Tap sink = new Hfs(
		new TextDelimited(new Fields("col1", "col2", "col3"), ", "),
		path,
		SinkMode.REPLACE
	);

出力する項目の絞り込み・順序と、区切る文字列を指定できる。

ただし、Fields.ALL等は指定できない。必ず項目を1つ以上指定する必要がある。
せっかく便利なクラスなのに、ちょっと片手落ちな感じがする…。

Exception in thread "main" java.lang.IllegalArgumentException: may not be zero declared fields, found: [{?}:ALL]
	at cascading.scheme.TextDelimited.<init>(TextDelimited.java:442)
	at cascading.scheme.TextDelimited.<init>(TextDelimited.java:364)
	at cascading.scheme.TextDelimited.<init>(TextDelimited.java:107)
	at jp.hishidama.hadoop.cascading.sample.Sample.main(Sample.java:253)

sourceとして使用するTextDelimited


Cascadingへ戻る / Hadoopへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま