S-JIS[2010-04-30] 変更履歴

Cascading HBase操作サンプル

Cascadingを使ってHBaseで集計するサンプル。


ダウンロード・インストール

Cascading本体にはHBase用のクラスは入っていない(と思われる)が、
Cascadingの拡張としてHBase用のSchemeおよびTapを公開している人(cwenselさん?)がいるので、それをダウンロードしてくる。
どうもコンパイルされた形式(jarファイル)では提供されていないようなので、自分でコンパイル・jarファイル化する。

  1. Cascading ModulesのページでIntegrationのCascading.HBaseをクリックする。
  2. cwensel / cascading.hbaseのページの右上の「Download Source」をクリックする。
    (スクリプトエラーが出るかもしれないが、続行できるなら無視)
  3. ZIPとTARのアイコンが表示されるので、どちらかを選択する。
    すると、数秒後にダウンロードが始まる。(cwensel-cascading.hbase-fb99199.zip)
  4. ダウンロードしてきたファイルを適当な場所に解凍する。
  5. 解凍されたディレクトリーにはソースしか入っていない(コンパイルされていない)ので、EclipseにJavaプロジェクト(JDK1.6)を作ってコピーする。
    cwensel-cascading.hbase-fb99199/src/java/ cascading/hbase/ HBaseScheme.java
    HBaseTap.java
    Eclipseのworkspace/cascading-hbase/src/ cascading/hbase/ HBaseScheme.java
    HBaseTap.java
  6. ビルドパスに下記のライブラリーを追加する。
    Hadoop HADOOP_HOME/hadoop-0.20.2-core.jar Hadoop本体(ver0.19以降)
    HADOOP_HOME/lib/slf4j-api-1.4.3.jar ロギング
    HBase HBASE_HOME/hbase-0.20.3.jar HBase本体
    Cascading CASCADING_HOME/cascading-1.1.0.jar Cascading本体(ver1.0.1以降)
  7. jarファイル化してcascading-hbase.jarを作っておく。

テスト用ソース(cwensel-cascading.hbase-fb99199/src/test/)をコンパイルする為には、さらに以下のライブラリーも必要。

HBase HBASE_HOME/hbase-0.20.3-test.jar HBaseのテスト
JUnit JUnit3ライブラリー JUnit

「Download Source」をクリックした際にエラーが出て続行できない場合は、個々のソースを直接ダウンロードする。
ちょっと面倒だけど、幸い2ファイルしか必要ない。

  1. cwensel / cascading.hbaseのファイル・ディレクトリー一覧から「src/」を選択する。
  2. 同様に「java/」→「cascading/」→「hbase/」→「HBaseScheme.java」を選択していく。
  3. HBaseScheme.javaのソースが表示される画面になるので、(それをコピペしてもいいんだけど、)
    ソースの右上の「raw」を右クリックして「対象をファイルに保存」すると、HBaseScheme.javaがダウンロードできる。
  4. 同様にHBaseTap.javaもダウンロードしてくる。

サンプル:準備

例によって、HBaseをHadoopで扱うサンプル(学生の成績の加工)と同様の処理をCascadingで作ってみる。

あちらで作ったテーブル・データをそのまま使いたいところだが、
あちらは成績(点数)をintで保存しているのに対し、HBaseTapはデータをStringでしか扱えない
そこで、テーブル構造はそのままで、データだけStringで扱うことにする。

したがって、まずCSVファイルからHBaseにCascadingを使って取り込むプログラムを作成する。

HBase Shellでテーブル作成

hbase(main):001:0> create 'student','personal','suugaku','kokugo','rika','shakai','eigo','total5'

サンプル:HBaseへのインポート

以下のようなCSVファイルからHBaseのテーブルへインポートするサンプル。

#番号,名前,     数学,国語,理科,社会,英語
  101,天才,      100, 100, 100, 100, 100
  201,無気力,      5,  30,  10,  20,  15
  102,ひしだま,   90,  85,  80,  50,  10
  202,hoge,       10,  30,  25,  45,  20
  103,foo,        60,  60,  60,  60,  25
  204,xxx,        80,  77,  90,  40,  44
  205,yyy,        65,  90,  55,  80,  65
  104,zzz,        40,  60,  60,  70,  20
学生番号
student-id
(ROW)
個人
personal
数学
suugaku
国語
koguko
理科
rika
社会
shakai
英語
eigo
合計
total5
102
qualifier cell
name ひしだま
qualifier cell
試験名 90
qualifier cell
試験名 85
qualifier cell
試験名 80
qualifier cell
試験名 50
qualifier cell
試験名 10
 
import java.util.StringTokenizer;

import jp.hishidama.hadoop.cascading.conf.CascadingConfigured;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowProcess;
import cascading.hbase.HBaseScheme;
import cascading.hbase.HBaseTap;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
public class CBaseImport extends CascadingConfigured implements Tool {

	public static final String TABLE_NAME = "student";

	public static final String F_SID  = "student-id";	//学生番号
	public static final String F_NAME = "personal:name";	//学生名
	/**
	 * カンマで分割して出力するFunction
	 */
	protected static class SplitFunction extends BaseOperation<Object> implements Function<Object> {
		private static final long serialVersionUID = 1L;

		public SplitFunction(Fields fieldDeclaration) {
			super(1, fieldDeclaration);
		}

		@Override
		public void operate(FlowProcess flowProcess, FunctionCall<Object> functionCall) {
			String line = functionCall.getArguments().getTuple().getString(0);
			if (line.isEmpty() || line.startsWith("#")) {
				return;
			}

			Tuple output = Tuple.size(fieldDeclaration.size());

			StringTokenizer tokenizer = new StringTokenizer(line, ",");
			for (int i = 0; tokenizer.hasMoreTokens(); i++) {
				String token = tokenizer.nextToken().trim();
				output.set(i, token);
			}

			functionCall.getOutputCollector().add(output);
		}
	}
	@Override
	public int run(String[] args) throws Exception {
		String 試験名 = args[0];
		final String F_数学 = "suugaku:" + 試験名;
		final String F_国語 = "kokugo:"  + 試験名;
		final String F_理科 = "rika:"    + 試験名;
		final String F_社会 = "shakai:"  + 試験名;
		final String F_英語 = "eigo:"    + 試験名;

		Tap source = new Hfs(new TextLine(new Fields("line")), makeQualifiedPath(args[1]));

		Fields keyFields   = new Fields(F_SID);
		Fields valueFields = new Fields(F_NAME, F_数学, F_国語, F_理科, F_社会, F_英語);
		Tap sink = new HBaseTap(TABLE_NAME, new HBaseScheme(keyFields, valueFields), SinkMode.UPDATE);

		Pipe pipe = new Pipe("pipe");
		pipe = new Each(pipe, new SplitFunction(new Fields(F_SID, F_NAME, F_数学, F_国語, F_理科, F_社会, F_英語)));

		FlowConnector flowConnector = new FlowConnector(getProperties());
		Flow flow = flowConnector.connect("student-import", source, sink, pipe);
		flow.complete();

		return 0;
	}

	public static void main(String[] args) throws Exception {
		int r = ToolRunner.run(new CBaseImport(), args);
		System.exit(r);
	}
}

HBaseTapの第1引数にテーブル名、第2引数にHBaseSchemeを指定する。
HBaseSchemeでは、キー(HBaseのROW)を示すFieldsと、更新するHBaseの項目名(family:qualifierを指定する。
この更新項目名は、テーブルの項目名であると同時に、Cascadingのフィールド名でもある。
つまりパイプから そのフィールド名でデータを出力しておく必要がある。

上記の例では、パイプ(SplitFunction)から、学生番号(F_SID)・学生名(F_NAME)・各教科の点数を出力している。
このうち、HBaseSchemeでは学生番号(F_SID)をキー項目に指定して、その他を値項目に列挙している。
すると、HBaseのPut(実際は古いバージョンのクラスであるBatchUpdate)では、ROWを学生番号にして、その他の項目を更新項目としてテーブルを更新する。


ところで、これを実行すると、例外が発生した。

Exception in thread "flow student-import" java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: hbase:student
	at org.apache.hadoop.fs.Path.initialize(Path.java:140)
	at org.apache.hadoop.fs.Path.<init>(Path.java:126)
	at cascading.hbase.HBaseTap.getPath(HBaseTap.java:94)
	at cascading.tap.hadoop.Hadoop18TapUtil.cleanupTap(Hadoop18TapUtil.java:182)
	at cascading.flow.FlowStep.cleanTap(FlowStep.java:509)
	at cascading.flow.FlowStep.clean(FlowStep.java:494)
	at cascading.flow.Flow.cleanTemporaryFiles(Flow.java:1118)
	at cascading.flow.Flow.run(Flow.java:950)
	at java.lang.Thread.run(Thread.java:619)
Caused by: java.net.URISyntaxException: Relative path in absolute URI: hbase:student
	at java.net.URI.checkPath(URI.java:1787)
	at java.net.URI.<init>(URI.java:735)
	at org.apache.hadoop.fs.Path.initialize(Path.java:137)
	... 8 more

hbase:student」というURI絶対URIでない(パス部分が「/」で始まっていない)のでエラーになっているらしい。

HBaseTapでは、スキーム名を「hbase:」、テーブル名をパスとしてURIを形成している。
HBase0.20以降では(?)、これが上手く認識されないらしい。
参考: HBaseTap fails to cleanup due to hadoop Path throwing an exception on supplied URI

とりあえず、HBaseTapを以下のように書き換えたら動くようになった。

  private URI getURI()
    {
    try
      {
//    return new URI( SCHEME, tableName, null );
      return new URI( SCHEME, "/" + tableName, null);
      }
    catch( URISyntaxException exception )
      {
      throw new TapException( "unable to create uri", exception );
      }
    }

エラーになっていたのはパスのクリーンナップ処理。
ファイルを削除する為にURIを取得しているが、スキームが「hbase:」だと何もしないので、パスの内容は何でもいいようだ。


サンプル:HBaseからエクスポート

HBaseのテーブルからファイルへ出力する例。

import jp.hishidama.hadoop.cascading.conf.CascadingConfigured;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.hbase.HBaseScheme;
import cascading.hbase.HBaseTap;
import cascading.pipe.Pipe;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
public class CBaseExport extends CascadingConfigured implements Tool {

	public static final String TABLE_NAME = "student";

	public static final String F_SID  = "student-id";
	public static final String F_NAME = "personal:name";

	@Override
	public int run(String[] args) throws Exception {
		String 試験名 = args[0];
		final String F_数学 = "suugaku:" + 試験名;
		final String F_国語 = "kokugo:"  + 試験名;
		final String F_理科 = "rika:"    + 試験名;
		final String F_社会 = "shakai:"  + 試験名;
		final String F_英語 = "eigo:"    + 試験名;
		final String F_合計 = "total5:"  + 試験名;

		Fields keyFields   = new Fields(F_SID);
		Fields valueFields = new Fields(F_NAME, F_数学, F_国語, F_理科, F_社会, F_英語, F_合計);
		Tap source = new HBaseTap(TABLE_NAME, new HBaseScheme(keyFields, valueFields));

		Tap sink = new Hfs(new TextLine(), makeQualifiedPath(args[1]), SinkMode.REPLACE);

		Pipe pipe = new Pipe("pipe");
		pipe = new GroupBy(pipe, Fields.ALL);

		FlowConnector flowConnector = new FlowConnector(getProperties());
		Flow flow = flowConnector.connect("student-export", source, sink, pipe);
		flow.complete();

		return 0;
	}

	public static void main(String[] args) throws Exception {
		int r = ToolRunner.run(new CBaseExport(), args);
		System.exit(r);
	}
}

HBaseのテーブルから読み込む場合もHBaseTapとHBaseSchemeを使う。
取得元のテーブル名と項目名は指定できるが、その他の抽出条件は指定できないっぽい。
(プロパティーに条件を入れておけば出来るかも?)

この例ではDBから取って来た値をそのまま出力しているだけなので、パイプ処理は最小限しか無い。
ソートも不要なら、GroupByすら要らない(笑)


サンプル:合計を算出

各学生の5教科合計を算出する例。

public class CBaseTotal5 extends CascadingConfigured implements Tool {

	public static final String TABLE_NAME = "student";

	public static final String F_SID = "student-id";
	public static final String F_TEN = "ten";

	@Override
	public int run(String[] args) throws Exception {
		String 試験名 = args[0];
		final String F_数学 = "suugaku:" + 試験名;
		final String F_国語 = "kokugo:"  + 試験名;
		final String F_理科 = "rika:"    + 試験名;
		final String F_社会 = "shakai:"  + 試験名;
		final String F_英語 = "eigo:"    + 試験名;
		final String F_合計 = "total5:"  + 試験名;

		Fields keyFields   = new Fields(F_SID);
		Fields valueFields = new Fields(F_数学, F_国語, F_理科, F_社会, F_英語);
		Tap source = new HBaseTap(TABLE_NAME, new HBaseScheme(keyFields, valueFields));

		Tap sink   = new HBaseTap(TABLE_NAME, new HBaseScheme(keyFields, new Fields(F_合計)), SinkMode.UPDATE);

		Pipe pipe = new Pipe("pipe");
		Pipe pipe1 = new Each(pipe, new Fields(F_SID, F_数学), new Identity(new Fields(F_SID, F_TEN), Object.class, int.class));
		Pipe pipe2 = new Each(pipe, new Fields(F_SID, F_国語), new Identity(new Fields(F_SID, F_TEN), Object.class, int.class));
		Pipe pipe3 = new Each(pipe, new Fields(F_SID, F_理科), new Identity(new Fields(F_SID, F_TEN), Object.class, int.class));
		Pipe pipe4 = new Each(pipe, new Fields(F_SID, F_社会), new Identity(new Fields(F_SID, F_TEN), Object.class, int.class));
		Pipe pipe5 = new Each(pipe, new Fields(F_SID, F_英語), new Identity(new Fields(F_SID, F_TEN), Object.class, int.class));
		pipe = new GroupBy("grouping", Pipe.pipes(pipe1, pipe2, pipe3, pipe4, pipe5), new Fields(F_SID));
		pipe = new Every(pipe, new Fields(F_TEN), new Sum(new Fields(F_合計), int.class));

		FlowConnector flowConnector = new FlowConnector(getProperties());
		Flow flow = flowConnector.connect("student-total5", source, sink, pipe);
		flow.complete();

		return 0;
	}

	public static void main(String[] args) throws Exception {
		int r = ToolRunner.run(new CBaseTotal5(), args);
		System.exit(r);
	}
}

HBaseから読み込んだデータを科目毎の点数としてパイプ分割し、学生番号でグルーピングして集計している。

GroupByでは複数のパイプを指定できるが、全パイプの項目名が一致している必要がある。
そこで、Identity関数を使って項目名を変更している。
ついでに、点数は整数なので、intに変換している。(Object.classを指定した項目は、型の変換は行わない)


sourceから5教科分作る方法も考えられる。

	@Override
	public int run(String[] args) throws Exception {
		String 試験名 = args[0];
		final String[] kname = { "suugaku:" + 試験名, "kokugo:" + 試験名, "rika:" + 試験名, "shakai:" + 試験名, "eigo:" + 試験名 };
		final String F_合計 = "total5:" + 試験名;

		Fields keyFields = new Fields(F_SID);
		Tap[] source = new Tap[kname.length];
		for (int i = 0; i < source.length; i++) {
			source[i] = new HBaseTap(TABLE_NAME, new HBaseScheme(keyFields, new Fields(kname[i])));
		}

		Tap sink = new HBaseTap(TABLE_NAME, new HBaseScheme(keyFields, new Fields(F_合計)), SinkMode.UPDATE);

		Pipe[] pipes = new Pipe[kname.length];
		for (int i = 0; i < pipes.length; i++) {
			pipes[i] = new Each("pipe" + i, new Identity(new Fields(F_SID, F_TEN), Object.class, int.class));
		}

		Pipe pipe = new GroupBy("pipe", pipes, new Fields(F_SID));
		pipe = new Every(pipe, new Fields(F_TEN), new Sum(new Fields(F_合計), int.class));

		// パイプとsourceを紐付ける
		Map<String, Tap> sources = Cascades.tapsMap(pipes, source);

		FlowConnector flowConnector = new FlowConnector(getProperties());
		Flow flow = flowConnector.connect("student-total5", sources, sink, pipe);
		flow.complete();

		return 0;
	}

サンプル:偏差値の算出

各学生の偏差値を算出する例。

せっかくなので、各教科毎に偏差値を算出してみる。
算出した偏差値を保存する場所は、科目をテーブルのfamilyにしているので、qualifierで偏差値を指定することにする。
(こういった風に、後から適当に値を登録できるのは、テーブル構造ががっちり固まっていないHBaseならでは)

学生番号
student-id
(ROW)
個人
personal
数学
suugaku
国語
koguko
理科
rika
社会
shakai
英語
eigo
合計
total5
102
qualifier cell
name ひしだま
qualifier cell
試験名 90
試験名.偏差値 60.2
qualifier cell
試験名 85
試験名.偏差値 57.4
qualifier cell
試験名 80
試験名.偏差値 56.9
qualifier cell
試験名 50
試験名.偏差値 46.5
qualifier cell
試験名 10
試験名.偏差値 40.5
qualifier cell
試験名 315
試験名.偏差値 52.9
public class CBaseScore extends CascadingConfigured implements Tool {

	public static final String TABLE_NAME = "student";

	public static final Fields F_SID = new Fields("student-id");

	protected static final String[] KNAME = { "suugaku", "kokugo", "rika", "shakai", "eigo", "total5" };

	@Override
	public int run(String[] args) throws Exception {
		String 試験名 = args[0];

		Flow[] flow = new Flow[KNAME.length];
		for (int i = 0; i < flow.length; i++) {
			flow[i] = createFlow(試験名, KNAME[i]);
		}

		Cascade cascade = new CascadeConnector().connect(flow);
		cascade.complete(); // 実行

		return 0;
	}

	protected Flow createFlow(String 試験名, String kname) {
		final Fields F_KEY   = new Fields("dummy");
		final Fields F_TEN   = new Fields(kname + ":" + 試験名);		//HBaseの項目名とCascadingのフィールド名を兼ねる
		final Fields F_SCORE = new Fields(kname + ":" + 試験名 + ".score");	//HBaseの項目名とCascadingのフィールド名を兼ねる

		Tap source = new HBaseTap(TABLE_NAME, new HBaseScheme(F_SID, F_TEN));
		Tap sink   = new HBaseTap(TABLE_NAME, new HBaseScheme(F_SID, F_SCORE), SinkMode.UPDATE);

		Pipe pipe = new Pipe("pipe");
		pipe = new Each(pipe, new Insert(F_KEY, 0), Fields.ALL);
		pipe = new StandardScore(pipe,
			F_KEY,					//グルーピング項目
			F_TEN,					//点数項目
			new Fields("ave", "sigma").append(F_SCORE),	//平均・標準偏差・偏差値に使う項目名
			Fields.join(F_SID, F_SCORE)			//出力項目
		);

		FlowConnector flowConnector = new FlowConnector(getProperties());
		Flow flow = flowConnector.connect(kname + "-flow", source, sink, pipe);
		return flow;
	}

	public static void main(String[] args) throws Exception {
		int r = ToolRunner.run(new CBaseScore(), args);
		System.exit(r);
	}
}

フローを実行するのに、Cascadeを使っている。(→Flowの並列実行
このプログラムは全体として「偏差値を算出する」ものだが、実体としては各教科毎に算出する。
しかし各教科間には直接の入出力関係が無いので、並列で実行できる。
そこで、教科毎のFlowを作成し、Cascadeを使って実行するようにした。

偏差値の算出ロジック本体は、前に作った偏差値を計算するサブアセンブリー(StandardScore)をそのまま使用できた。
(→StandardScoreを使ってCSVファイルから偏差値を計算する例
なお、ソートキー(グルーピングキー)となるものが無いので、Insert関数を使って固定値(ここでは0にした)を入れ込んだ


Cascadingへ戻る / Hadoopへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま