S-JIS[2010-02-28/2012-04-28] 変更履歴

Hadoop HBase操作サンプル

HadoopのMap/Reduceを使ってHBaseで集計するサンプル。


概要(目的)

このページのサンプルの目的は、HadoopのMap/Reduceを使ってHBase上のデータの集計を行うこと。
集計のコーディング例

集計を行う為には元となるデータを作らなければならないので、CSVファイルから読み込んでHBaseに書き込んでみる。
インポートのコーディング例サンプルテーブル

また、テーブルに書かれたデータを確認するにはHBase Shellから見ればいい
と思ったら、数値等のバイナリデータは数値の形式では表示されないので、テキストファイルへ出力するプログラムも作ってみる。
エクスポートのコーディング例

HBaseとファイル間の入出力であれば HadoopのMap/Reduceを使わなくてもHBaseのデータアクセスだけ出来れば普通に行える事だが、大量データを分散してアクセスできるのがHadoopの売りでもあることだし、Hadoopを使ってコーディングしてみる。
(バイナリーファイルとの入出力であれば、HBase標準でサンプルが用意されている→Export・Importクラス


サンプルデータ・テーブル

Hadoopのファイル入出力で集計するサンプルで使ったような、学生の成績表(試験の点数)を集計する例を考えてみる。

#番号,名前,     数学,国語,理科,社会,英語
  101,天才,      100, 100, 100, 100, 100
  201,無気力,      5,  30,  10,  20,  15
  102,ひしだま,   90,  85,  80,  50,  10
  202,hoge,       10,  30,  25,  45,  20
  103,foo,        60,  60,  60,  60,  25
  204,xxx,        80,  77,  90,  40,  44
  205,yyy,        65,  90,  55,  80,  65
  104,zzz,        40,  60,  60,  70,  20

キーは学生番号で、情報として学生の名前とかを持たせた上で、各教科(数学とか国語とか)の点数と合計点数を入れる項目を用意する。
HBaseは同じ項目(family)に複数のデータを入れられるので、点数という1つのfamilyでqualifierに各教科を入れるという構造も考えられるが、試験ってのは学期毎とかに複数あるので、qualifierには試験名を入れることにする。

当サンプルで使用するstudentテーブルの構成(データイメージ)
ROW
(学生番号)
個人 数学 国語 理科 社会 英語 合計
101
qual cell
名前 天才
性別
qualifier cell
1学期中間 100
1学期期末  
 
qualifier cell
1学期中間 100
1学期期末  
 
qualifier cell
1学期中間 100
1学期期末  
 
qualifier cell
1学期中間 100
1学期期末  
 
qualifier cell
1学期中間 100
1学期期末  
 
qualifier cell
1学期中間 500
1学期期末  
 
102
qual cell
名前 ひしだま
性別
qualifier cell
1学期中間 90
1学期期末  
 
qualifier cell
1学期中間 85
1学期期末  
 
qualifier cell
1学期中間 80
1学期期末  
 
qualifier cell
1学期中間 50
1学期期末  
 
qualifier cell
1学期中間 10
1学期期末  
 
qualifier cell
1学期中間 315
1学期期末  
 
点数という1つのfamilyに入れる例
ROW
(学生番号)
個人 点数
101
qual cell
名前 天才
性別
qualifier cell
数学 100
国語 100
理科 100
社会 100
英語 100
合計 500
102
qual cell
名前 ひしだま
性別
qualifier cell
数学 90
国語 85
理科 80
社会 50
英語 10
合計 315

HBaseのテーブルは、qualifierの中はデータ追加で自動的に増やせるが、family(列)はテーブルを一旦使用不可にして項目追加を行わないと増やせない。
したがって変化の無いものを列(family)にする方がいいんじゃないかと思う。
(試験名の方を可変にしておけば、模試とか補習(苦笑)とかにも対応できるし)


準備:テーブルの作成

今回のサンプル用に、HBase Shellからcreateコマンドを実行するか、HBaseのテーブル作成APIを使ってテーブルを作成しておく。

HBase Shellの場合

hbase(main):001:0> create 'student','personal','suugaku','kokugo','rika','shakai','eigo','total5'

HBase APIの場合

package jp.hishidama.hadoop.hbase;

import org.apache.hadoop.hbase.util.Bytes;

public class Student {
	public static final String TABLE_NAME = "student";
	public static final byte[] TABLE_NAME_BYTES = Bytes.toBytes(TABLE_NAME);

	public static final byte[] 個人 = Bytes.toBytes("personal");
	public static final byte[] 名前 = Bytes.toBytes("name");

	public static final byte[] 数学 = Bytes.toBytes("suugaku");
	public static final byte[] 国語 = Bytes.toBytes("kokugo");
	public static final byte[] 理科 = Bytes.toBytes("rika");
	public static final byte[] 社会 = Bytes.toBytes("shakai");
	public static final byte[] 英語 = Bytes.toBytes("eigo");
	public static final byte[] 合計 = Bytes.toBytes("total5");
}
import static jp.hishidama.hadoop.hbase.Student.*;
〜
	static void createTable() throws IOException {
		// 設定ファイルを用いてHbaseへ接続
		HBaseConfiguration conf = new HBaseConfiguration();
		conf.addResource(new Path("C:/cygwin/usr/local/hbase-0.20.3/conf/hbase-site.xml"));
		HBaseAdmin admin = new HBaseAdmin(conf);

		// 既に存在していたら何もしない
		if (admin.tableExists(TABLE_NAME_BYTES)) {
			return;
		}

		// テーブルを作成
		HTableDescriptor desc = new HTableDescriptor(TABLE_NAME_BYTES);
		desc.addFamily(new HColumnDescriptor(個人));
		desc.addFamily(new HColumnDescriptor(数学));
		desc.addFamily(new HColumnDescriptor(国語));
		desc.addFamily(new HColumnDescriptor(理科));
		desc.addFamily(new HColumnDescriptor(社会));
		desc.addFamily(new HColumnDescriptor(英語));
		desc.addFamily(new HColumnDescriptor(合計));
		admin.createTable(desc);
	}

CSVファイルからHBaseへのインポート

HadoopのMapperでCSVファイルを読み込み、HBaseのテーブルに書き込む例。

package jp.hishidama.hadoop.hbase;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import static jp.hishidama.hadoop.hbase.Student.*;
public class HBaseImport {

	private static final String KEY_試験名 = HBaseImport.class + "_name";
	public static class Map extends Mapper<LongWritable, Text, NullWritable, Put> {

		private static final byte[][] CSV = { 数学, 国語, 理科, 社会, 英語 };

		private byte[] 試験名;

		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			super.setup(context);

			試験名 = Bytes.toBytes(context.getConfiguration().get(KEY_試験名));
		}
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String line = value.toString();
			if (line.startsWith("#") || line.trim().isEmpty()) {
				return;
			}
// CSVの複数項目を1回のPutで更新する例

Put put = null;

StringTokenizer tokenizer = new StringTokenizer(line, ",");
for (int i = 0; tokenizer.hasMoreTokens(); i++) {
	String s = tokenizer.nextToken();
	switch (i) {
	case 0: // 学生番号
		byte[] no = Bytes.toBytes(s);
		put = new Put(no);
		break;
	case 1: // 名前

		put.add(個人, 名前, Bytes.toBytes(s));

		break;
	default:

		byte[] family = CSV[i - 2];
		int ten = Integer.parseInt(s.trim());
		put.add(family, 試験名, Bytes.toBytes(ten));

		break;
	}
}

context.write(NullWritable.get(), put);
// CSVの項目毎にPutして更新する例

Put put = null;
byte[] no = null;
StringTokenizer tokenizer = new StringTokenizer(line, ",");
for (int i = 0; tokenizer.hasMoreTokens(); i++) {
	String s = tokenizer.nextToken();
	switch (i) {
	case 0: // 学生番号
		no = Bytes.toBytes(s);

		break;
	case 1: // 名前
		put = new Put(no);
		put.add(個人, 名前, Bytes.toBytes(s));
		context.write(NullWritable.get(), put);
		break;
	default:
		put = new Put(no);
		byte[] family = CSV[i - 2];
		int ten = Integer.parseInt(s.trim());
		put.add(family, 試験名, Bytes.toBytes(ten));
		context.write(NullWritable.get(), put);
		break;
	}
}
		}
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME); //出力先テーブル名を指定
		conf.set(KEY_試験名, args[0]);

		Job job = new Job(conf, "hbase import");
		job.setJarByClass(HBaseImport.class);

		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Put.class);

		job.setMapperClass(Map.class);
		job.setNumReduceTasks(0);

		job.setInputFormatClass(TextInputFormat.class);	// ファイルから入力
		job.setOutputFormatClass(TableOutputFormat.class);	// HBaseへ出力

		FileInputFormat.setInputPaths(job, new Path(args[1]));

		boolean success = job.waitForCompletion(true);
		System.out.println(success);
	}
}

ファイルから入力するので、InputFormatにはTextInputFormatを使う。(ファイル名はmain()のargs[1]で渡すものとする)
この辺りはHadoopでファイルを読み込むのと全く同じ。

HBaseへの出力にはTableOutputFormatを使用する。
このクラスを使う場合、出力先(更新対象)のテーブル名をconfにセットしておく。(パラメーター名TableOutputFormat.OUTPUT_TABLE
TableOutputFormatへの入力型は、値はPut(またはDelete)で、キーは何でもよい。 したがってここではキーにはNullWritableを指定している。
ただ単にデータをテーブルへ登録するだけの場合は集計は無いので、Reducerは不要。なのでMapperの出力(Put)がそのままTableOutputFormatへ渡り、テーブルのデータが更新される。
TableOutputFormatの危険性 [2012-04-28]

今回の例では、main()のargs[0]で試験名を渡すことにしている。
この試験名はテーブルの更新対象項目のqualifierに使いたいので、Mapperに渡す必要がある。
Mapper(やReducer)の引数contextからcontext.getConfiguration()でconfを取得できるので、main()でconfに値をセットしておけばよい。
ただ、Mapper#map()で毎回取り直してbyte[]に変換するのは無駄なので、Mapper#setup()で取り出してフィールドに保持しておけばいい。

Mapper#map()の中でのPutへの値のセットの仕方(context.write()のタイミング)は、2通り考えられる。
1つは(Putには同一のROWに対して複数の更新をadd()できるので、)まとめてadd()して1回だけcontext.write()する方法。
もう1つは更新毎に毎回context.write()を呼び出す方法。
RDBなら1レコード内の複数項目更新なら1回だけUPDATEする(前者の方法)のが常識だが、HBaseはレコード単位でデータを保持しているのではないので、もしかすると後者の方が効率がいいかもしれない。(未検証)
(TableOutputFormatのデフォルト状態では、最後のclose()時にまとめてコミットされる模様。という事は、Putのインスタンス数が減る前者の方がいいのかもしれない。
 むしろ、コミッターを変更して毎回コミットするようにしたら後者の方がいい? でも1レコードずつコミットしてたら負荷がかかるかも?)

※ちなみに、このMapperでは点数と一緒に学生名も登録しているが、あるべき設計の姿で言えば、マスター登録専用(学生の名前とか性別とかの情報を登録する)のプログラムが別にあるべきでしょうね。

このサンプルの実行方法


HBaseからCSVファイルへのエクスポート

HadoopのMapperでHBaseのテーブルからデータを読み込み、CSVファイルとして書き込む例。

package jp.hishidama.hadoop.hbase;
import java.io.IOException;
import java.util.Arrays;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import static jp.hishidama.hadoop.hbase.Student.*;
public class HBaseExport {

	private static final TreeMap<byte[], String> CSV_MAP = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
	static {
		int n = 0;
		CSV_MAP.put(名前, Integer.toString(n++));
		CSV_MAP.put(数学, Integer.toString(n++));
		CSV_MAP.put(国語, Integer.toString(n++));
		CSV_MAP.put(理科, Integer.toString(n++));
		CSV_MAP.put(社会, Integer.toString(n++));
		CSV_MAP.put(英語, Integer.toString(n++));
		CSV_MAP.put(合計, Integer.toString(n++));
	}
	public static class Map extends TableMapper<Text, Text> {

		private Text key  = new Text();
		private Text word = new Text();

		@Override
		protected void map(ImmutableBytesWritable row, Result result, Context context) throws IOException, InterruptedException {

			String no = Bytes.toString(row.get()); //学生番号
			key.set(no);

			for (KeyValue kv : result.raw()) {	//スキャンされた全項目をループ処理
				byte[] k;
				String s;
				if (kv.matchingQualifier(名前)) {
					k = kv.getQualifier();
					s = Bytes.toString(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
				} else {
					k = kv.getFamily();
					int n = Bytes.toInt(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
					s = Integer.toString(n);
				}
				word.set(CSV_MAP.get(k) + "=" + s);
				context.write(key, word);
			}
		}
	}
	public static class Reduce extends Reducer<Text, Text, Text, Text> {
		private Text word = new Text();

		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

			String[] csv = new String[CSV_MAP.size()];
			for (Text value : values) {
				String[] ss = value.toString().split("=");
				csv[Integer.parseInt(ss[0])] = ss[1];
			}

		//	String s = StringUtils.arrayToString(csv);
		//	word.set(Bytes.toBytes(s));
			String s = Arrays.toString(csv);
			word.set(Bytes.toBytes(s.substring(1, s.length() - 1)));
			context.write(key, word);
		}
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME); //入力元テーブル名を指定

		// スキャンする項目を列挙
		String name = args[0]; //試験名
		StringBuilder sb = new StringBuilder(128);
		for (byte[] s : CSV_MAP.keySet()) {
			if (sb.length() > 0) {
				sb.append(' ');
			}
			if (Bytes.equals(s, 名前)) {
				sb.append(Bytes.toString(個人)).append(':').append(Bytes.toString(s));
			} else {
				sb.append(Bytes.toString(s))   .append(':').append(name);
			}
		}
		conf.set(TableInputFormat.SCAN_COLUMNS, sb.toString()); //スキャンする項目を指定

		Job job = new Job(conf, "hbase export");
		job.setJarByClass(HBaseExport.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);

		job.setInputFormatClass(TableInputFormat.class);	// HBaseから入力
		job.setOutputFormatClass(TextOutputFormat.class);	// ファイルへ出力

		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		boolean success = job.waitForCompletion(true);
		System.out.println(success);
	}
}

出力イメージ:

  101	天才, 100, 100, 100, 100, 100, null
  102	ひしだま, 90, 85, 80, 50, 10, null
  103	foo, 60, 60, 60, 60, 25, null
  104	zzz, 40, 60, 60, 70, 20, null
  201	無気力, 5, 30, 10, 20, 15, null
  202	hoge, 10, 30, 25, 45, 20, null
  204	xxx, 80, 77, 90, 40, 44, null
  205	yyy, 65, 90, 55, 80, 65, null

ファイルへ出力するので、OutputFormatにはTextOutputFormatを使う。(ファイル名はmain()のargs[1]で渡すものとする)
この辺りはHadoopでファイルを書き出すのと全く同じ。
(キーと値の間はタブで区切られている。完全なCSVファイルにしたいならCommaTextOutputFormatを使う とか)

HBaseからテーブルを読み込む(スキャンする)にはTableInputFormatを使用する。
このクラスを使う場合、読み込む対象のテーブル名をconfにセットしておく。(パラメーター名TableInputFormat.INPUT_TABLE
TableInputFormatからの出力型(Mapperへの入力型)は、キーはImmutableBytesWritableでROWが入ってくる。値はResult
通常のMapperにそれらの型を指定してもいいのだが、TableMapperというクラスが用意されているので、それを使うのがいいだろう。

テーブルからどの項目(family:qualifier)をスキャンするかは、conf(パラメーター名TableInputFormat.SCAN_COLUMNS)に指定できる。
これは列名「family:qualifier」をスペース区切りで複数指定できる。
今回のサンプルでは試験名をqualifierに使っているので、(インポートのサンプルと同様に)main()のargs[0]で試験名を渡している。

Mapper#map()には、スキャンされたデータがROW毎に入ってくる。
今回のサンプルでは、出力のキーは学生番号、値は「CSVの列番号=点数」という文字列にしている。
そうするとReducerには学生番号毎に全点数が入ってくるので、値を「=」で分離し、CSVの列番号の位置に点数を配置している。
(本来は、こういうのは「=」で区切った文字列にするのではなく、クラスにすべきだと思うけど…)

このサンプルの実行方法


集計してテーブルを更新

HBaseのテーブルの各行を集計してテーブルを更新する例。

package jp.hishidama.hadoop.hbase;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;

import static jp.hishidama.hadoop.hbase.Student.*;
public class HBaseRecordSum {

	private static final String KEY_試験名 = HBaseRecordSum.class + "_name";
	public static class Map extends TableMapper<ImmutableBytesWritable, IntWritable> {

		private IntWritable intw = new IntWritable();

		@Override
		protected void map(ImmutableBytesWritable row, Result result, Context context) throws IOException, InterruptedException {

			for (KeyValue kv : result.raw()) {	//スキャンされた全項目をループ処理
				int n = Bytes.toInt(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
				intw.set(n);
				context.write(row, intw);
			}
		}
	}
	public static class Reduce extends TableReducer<ImmutableBytesWritable, IntWritable, NullWritable> {

		private byte[] 試験名;

		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			super.setup(context);

			試験名 = Bytes.toBytes(context.getConfiguration().get(KEY_試験名));
		}

		@Override
		protected void reduce(ImmutableBytesWritable row, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

			int sum = 0;
			for (IntWritable value : values) {
				sum += value.get();
			}
			Put put = new Put(row.get());
			put.add(合計, 試験名, Bytes.toBytes(sum));
			context.write(NullWritable.get(), put);
		}
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		conf.set(TableInputFormat.INPUT_TABLE,   TABLE_NAME);
		conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);

		String name = args[0];
		conf.set(KEY_試験名, name);
		conf.set(TableInputFormat.SCAN_COLUMNS,
			Bytes.toString(数学) + ":" + name + " " +
			Bytes.toString(国語) + ":" + name + " " +
			Bytes.toString(理科) + ":" + name + " " +
			Bytes.toString(社会) + ":" + name + " " +
			Bytes.toString(英語) + ":" + name
		);

		Job job = new Job(conf, "hbase personal sum");
		job.setJarByClass(HBaseRecordSum.class);

		job.setMapOutputKeyClass  (ImmutableBytesWritable.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass  (NullWritable.class);
		job.setOutputValueClass(Put.class);

		job.setMapperClass  (Map.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass (Reduce.class);

		job.setInputFormatClass (TableInputFormat.class);	// HBaseから入力
		job.setOutputFormatClass(TableOutputFormat.class);	// HBaseへ出力

		boolean success = job.waitForCompletion(true);
		System.out.println(success);
	}
}

エクスポート(テーブルからのスキャン)インポート(テーブルへの更新)のテーブル処理部分を抜き出して合わせただけ。

ただ、集計があるのでReducerを用意している。
そしてテーブルからの入力を受け取るTableMapperと同様に、テーブルを更新する為のTableReducerを使用している。

このサンプルの実行方法


カスタマイズ:TableMapReduceUtil

テーブルを扱う為の初期化処理もある程度共通なので、TableMapReduceUtilという初期化用のクラスが提供されている。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;

import static jp.hishidama.hadoop.hbase.Student.*;
	public static void main(String[] args) throws Exception {
		Job job = new Job();
		job.setJobName("hbase personal sum");
		job.setJarByClass(HBaseRecordSum2.class);

		Configuration conf = job.getConfiguration();
		String name = args[0];
		conf.set(KEY_試験名, name);
		byte[] qualifier = Bytes.toBytes(name);

		Scan scan = new Scan();
		scan.addColumn(数学, qualifier);
		scan.addColumn(国語, qualifier);
		scan.addColumn(理科, qualifier);
		scan.addColumn(社会, qualifier);
		scan.addColumn(英語, qualifier);
		TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, Map.class, ImmutableBytesWritable.class, IntWritable.class, job);

		TableMapReduceUtil.initTableReducerJob(TABLE_NAME, Reduce.class, job /* , HRegionPartitioner.class */);
		job.setCombinerClass(IntSumReducer.class);

		boolean success = job.waitForCompletion(true);
		System.out.println(success);
	}

initTableMapperJob()は、入力(TableInputFormatおよびMapper)の初期化を行う。
すなわち、入力テーブル名とMapperクラス、Mapperの出力型を初期化する。
TableInputFormat.INPUT_TABLEsetMapperClass()setMapOutputKeyClass()setMapOutputValueClass()setInputFormatClass()
また、テーブルからの取得条件はScanクラスで指定する。
Scanの条件指定の例

initTableReducerJob()は、出力(ReducerおよびTableOutputFormat)の初期化を行う。
すなわち、更新テーブル名とReducerクラスを初期化する。必要であればPartitionerも指定できる。HRegionPartitionerはHBase用のクラスなので、効果が期待できるのかな?
TableOutputFormat.OUTPUT_TABLEsetReducerClass()・setPartitionerClass()・setOutputFormatClass()
Combinerは指定できないので、必要であれば別途初期化する。

ちなみに、ConfigurationをJobのコンストラクターに渡すと、内部でクローンが作られるようだ。
つまり以下のようなコーディングでは、セットした値がMap/Reduce実行時には使用されない。

		Configuration conf = new Configuration();
		Job job = new Job(conf, "sample");
		conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);

//		confjob.getConfiguration()はインスタンスが異なる。

サンプル実行用のbuild.xml

上記サンプルを実行する為の(単独環境用)build.xml。
Eclipseのプロジェクトでclassesディレクトリーにコンパイルしたファイルを置き、binディレクトリーの下にこのbuild.xmlを置く想定。
また、入力ファイルの置いてあるディレクトリーはHadoopのファイルから読み込むサンプルの場所を想定している。

C:\workspace\HadoopSample
│  .classpath
│  .project
│
├─bin
│      build.xml
│
├─classes
│
└─src
C:\cygwin\home\hadoop
├─rowsum
│  │  rowsum.jar
│  │
│  ├─input│      file01.utf8.txt
│  │
│  └─output
│          .part-m-00000.crc
│          part-m-00000.sjis.txt
│
└─student
    └─output
            .part-r-00000.crc
            part-r-00000.sjis.txt

build.xmlの内容

  1. HBaseのスタートストップ(停止は、ピリオドが増えていく様子は表示されない(苦笑))
  2. CSVファイルからHBaseへインポート
  3. HBaseからファイルへエクスポート
  4. HBase上で集計
<?xml version="1.0" encoding="Windows-31J"?>
<project name="hadoop hbase" basedir=".">
	<property environment="env" />

	<property name="cygwin"          location="C:\cygwin" />
	<property name="hadoop.home"     location="${cygwin}/usr/local/hadoop-0.20.1" />
	<property name="hbase.home"      location="${cygwin}/usr/local/hbase-0.20.3" />

	<property name="target.home"   location="${cygwin}/home/hadoop/student" />
	<property name="target.input"  location="${target.home}/../rowsum/input" />
	<property name="target.output" location="${target.home}/output" />

	<property name="main.class.import" value="jp.hishidama.hadoop.hbase.HBaseImport" />
	<property name="main.class.export" value="jp.hishidama.hadoop.hbase.HBaseExport" />
	<property name="main.class.sum5"   value="jp.hishidama.hadoop.hbase.HBaseRecordSum" />
	<property name="shiken.name"       value="2010-1-1" />

	<path id="class.path">
		<pathelement location="../classes" />
		<fileset dir="${hadoop.home}">
			<include name="hadoop-*-core.jar" />
			<include name="lib/**/*.jar" />
		</fileset>
		<fileset dir="${hbase.home}">
			<include name="*.jar" />
			<exclude name="*test.jar" />
			<include name="lib/zookeeper*.jar" />
		</fileset>
		<pathelement location="${hbase.home}/conf" /> <!-- 設定ファイルを読み込ませる[2010-03-30] -->
	</path>
	<target name="0.hbase_start">
		<exec executable="${cygwin}/bin/cygpath.exe" outputproperty="start.sh">
			<arg value="-u" />
			<arg value="${hbase.home}/bin/start-hbase.sh" />
		</exec>
		<exec executable="${cygwin}/bin/bash.exe">
			<arg value="--login" />
			<arg value="${start.sh}" />
		</exec>
	</target>

	<target name="0.hbase_stop">
		<exec executable="${cygwin}/bin/cygpath.exe" outputproperty="stop.sh">
			<arg value="-u" />
			<arg value="${hbase.home}/bin/stop-hbase.sh" />
		</exec>
		<exec executable="${cygwin}/bin/bash.exe">
			<arg value="--login" />
			<arg value="${stop.sh}" />
		</exec>
	</target>
	<target name="import" depends="1.1.execute" />

	<target name="1.1.execute" description="import">
		<java classname="${main.class.import}" fork="true" maxmemory="1024m">
			<arg value="${shiken.name}" />
			<arg value="${target.input}" />
			<classpath refid="class.path" />
			<sysproperty key="hadoop.log.dir" path="${hadoop.home}/logs" />
			<sysproperty key="hadoop.log.file" value="hadoop.log" />
			<sysproperty key="hadoop.home.dir" path="${hadoop.home}/" />
			<sysproperty key="hadoop.id.str" value="${env.COMPUTERNAME}" />
			<sysproperty key="hadoop.root.logger" value="INFO,console" />
			<sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" />
			<env key="PATH" path="${cygwin}/bin" />
		</java>
	</target>
	<target name="export" depends="2.1.rm_output,2.2.execute,2.3.output_encode,2.4.cat_output" />

	<target name="2.1.rm_output" description="outputディレクトリーを削除する">
		<delete dir="${target.output}" failonerror="true" />
	</target>

	<target name="2.2.execute" description="hadoopを実行する">
		<java classname="${main.class.export}" fork="true" maxmemory="1024m">
			<arg value="${shiken.name}" />
			<arg value="${target.output}" />
			<classpath refid="class.path" />
			<sysproperty key="hadoop.log.dir" path="${hadoop.home}/logs" />
			<sysproperty key="hadoop.log.file" value="hadoop.log" />
			<sysproperty key="hadoop.home.dir" path="${hadoop.home}/" />
			<sysproperty key="hadoop.id.str" value="${env.COMPUTERNAME}" />
			<sysproperty key="hadoop.root.logger" value="INFO,console" />
			<sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" />
			<env key="PATH" path="${cygwin}/bin" />
		</java>
	</target>

	<target name="2.3.output_encode" description="outputの文字コードをSJISに変換する">
		<copy todir="${target.output}" encoding="UTF-8" outputencoding="Windows-31J">
			<fileset dir="${target.output}">
				<include name="part*" />
				<exclude name="*.sjis.txt" />
			</fileset>
			<mapper type="glob" from="*" to="*.sjis.txt" />
		</copy>
		<delete>
			<fileset dir="${target.output}">
				<include name="part*" />
				<exclude name="*.sjis.txt" />
			</fileset>
		</delete>
	</target>

	<target name="2.4.cat_output" description="outputの内容を表示する">
		<exec executable="${cygwin}/bin/cat.exe" dir="${target.output}">
			<arg value="*" />
		</exec>
	</target>
	<target name="sum" depends="3.1.execute" />

	<target name="3.1.execute" description="sum">
		<java classname="${main.class.sum5}" fork="true" maxmemory="1024m">
			<arg value="${shiken.name}" />
			<classpath refid="class.path" />
			<sysproperty key="hadoop.log.dir" path="${hadoop.home}/logs" />
			<sysproperty key="hadoop.log.file" value="hadoop.log" />
			<sysproperty key="hadoop.home.dir" path="${hadoop.home}/" />
			<sysproperty key="hadoop.id.str" value="${env.COMPUTERNAME}" />
			<sysproperty key="hadoop.root.logger" value="INFO,console" />
			<sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" />
			<env key="PATH" path="${cygwin}/bin" />
		</java>
	</target>
</project>

Hadoopシェルから実行する方法

HBaseを使ったMap/ReduceのプログラムをHadoopのシェルから実行すると、HBaseのjarファイルが見つからなくて例外が発生する。[2010-03-01]
なので、HadoopのクラスパスにHBaseのjarファイルを追加してやる必要がある。
また、HBASE_HOME/confをクラスパスに加えておくと、hbase-default.xmlとhbase-site.xmlがデフォルトで読み込まれる。[2010-03-30]

/usr/local/hadoop-0.20.1/conf/hadoop-env.sh:

# Extra Java CLASSPATH elements. Optional.
# export HADOOP_CLASSPATH=
HBASE_HOME=/usr/local/hbase-0.20.3
export HADOOP_CLASSPATH=$HBASE_HOME/hbase-0.20.3.jar:$HBASE_HOME/lib/zookeeper-3.2.2.jar:$HBASE_HOME/conf

実行例:

$ cd /home/hadoop/student

$ ls
studnet.jar

$ /usr/local/hadoop/bin/hadoop jar studnet.jar jp.hishidama.hadoop.hbase.HBaseExport 2010-1-1 output
〜ログがいっぱい出力される〜

$ cat output/*
  101   天才, 100, 100, 100, 100, 100, 500
  102   ひしだま, 90, 85, 80, 50, 10, 315
  103   foo, 60, 60, 60, 60, 25, 265
  104   zzz, 40, 60, 60, 70, 20, 250
  201   無気力, 5, 30, 10, 20, 15, 80
  202   hoge, 10, 30, 25, 45, 20, 130
  204   xxx, 80, 77, 90, 40, 44, 331
  205   yyy, 65, 90, 55, 80, 65, 355

Driver(Export・Import・RowCounter)

HBaseのHBASE_HOME/hbase-0.20.3.jarには、Map/Reduceのサンプルとなるクラスが入っている。[2010-03-01]
(実際はサンプルというか、ツールだな^^;)
これを実行するにはhadoopシェルにZooKeeperのjarファイルを追加しておく必要がある。

クラス名 内容
org.apache.hadoop.hbase.mapreduce.RowCounter テーブルのレコード数をカウントする。
第1引数にテーブル名、第2引数以降(省略可)には条件となるcolumnを指定する。
org.apache.hadoop.hbase.mapreduce.Export テーブルをバイナリーファイル(シーケンスファイル)に出力する。
第1引数にテーブル名、第2引数に出力先ディレクトリーを指定する。
org.apache.hadoop.hbase.mapreduce.Import エクスポートされたバイナリーファイル(シーケンスファイル)をテーブルにインポートする。
第1引数にテーブル名、第2引数に入力元ディレクトリーを指定する。
org.apache.hadoop.hbase.migration.nineteen.HStoreFileToStoreFile バージョン0.19のHStoreFileを0.20のStoreFileに変換するものらしい。
org.apache.hadoop.hbase.mapreduce.Driver 上記のクラスを実行するクラス。
HBASE_HOME/hbase-0.20.3.jarのMain-Classにはこのクラスが指定されている。
第1引数で実行するコマンド、第2引数以降にそのコマンド専用のオプションを指定する。
rowcounterRowCounter
exportExport
importImport
hsf2sf…HStoreFileToStoreFile
$ /usr/local/hadoop/bin/hadoop jar $(cygpath -w /usr/local/hbase-0.20.3/hbase-0.20.3.jar) export student data

$ ls -l data/
total 8
-rwxrwxrwx 1 hishidama なし 4761 2010-03-01 23:56 part-m-00000
$ /usr/local/hadoop/bin/hadoop jar $(cygpath -w /usr/local/hbase-0.20.3/hbase-0.20.3.jar) import student data

Hadoopへ戻る / HBaseへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま