S-JIS[2010-07-11/2012-04-28] 変更履歴

HBaseのHadoop関連クラス

HBaseHadoopでアクセスする為のクラス。


WordCountサンプル

HBaseのテーブルの全項目を読み込み、単語数を数えるサンプル。
出力先はファイル。(通常のHadoopと同じ)
(→HBaseのテーブルに出力するWordCount

package jp.hishidama.sample.hbase;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HBaseWordCount extends Configured implements Tool {

	public static void main(String[] args) throws Exception {
		int r = ToolRunner.run(new HBaseWordCount(), args);
		System.exit(r);
	}
	@Override
	public int run(String[] args) throws Exception {
		Job job = new Job(getConf(), "wordcount(HBase)");
		job.setJarByClass(getClass());
		Configuration conf = job.getConfiguration();

		job.setInputFormatClass(TableInputFormat.class);	// HBaseから入力
		conf.set(TableInputFormat.INPUT_TABLE, args[0]);	// テーブル名

		job.setMapperClass(WordCountMap.class);	// 単語を分解するMapper

		job.setReducerClass(IntSumReducer.class);	// 集計するReducer
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setOutputFormatClass(TextOutputFormat.class);	// ファイルへ出力
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		return job.waitForCompletion(true) ? 0 : 1;
	}
	static class WordCountMap extends TableMapper<Text, IntWritable> {
		private static final IntWritable one = new IntWritable(1);
		private Text word = new Text();

		@Override
		protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
			for (KeyValue kv : value.raw()) {
				String s = Bytes.toString(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());

				StringTokenizer tokenizer = new StringTokenizer(s);
				while (tokenizer.hasMoreTokens()) {
					word.set(tokenizer.nextToken());
					context.write(word, one);
				}
			}
		}
	}
}

TableInputFormatを使うと、指定したテーブルの項目を読み込み、Mapperを呼び出す。
Mapperの入力キーは読み込んだレコードの行キー(ROW)、値はその行の全データ(Result)となる。
(TableInputFormat用にScanを設定することにより、読み込むデータを絞ることが可能)


実行方法

Hadoopのプログラムなので、Hadoopのシェルから実行する。
その際、HBaseのjarファイルをHadoopのクラスパスに加えておく必要がある。

HADOOP_HOME/conf/hadoop-env.sh:

# Extra Java CLASSPATH elements. Optional.
# export HADOOP_CLASSPATH=
HBASE_HOME=/usr/local/hbase-0.89
HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_HOME/hbase-0.89.20100621.jar:$HBASE_HOME/lib/zookeeper-3.3.1.jar:$HBASE_HOME/conf
export HADOOP_CLASSPATH

※Hadoopの起動中に変更した場合は、Hadoopの再起動が必要。

Windowsの場合、Cygwin(bash)から実行:

$ hadoop fs -rmr output
$ hadoop jar hbase-sample.jar jp.hishidama.sample.hbase.HBaseWordCount TEST_TYPE output
$ hadoop fs -cat output/part-r-00000

InputFormat

Hadoop0.20のmapreduceパッケージを使ってHBaseのテーブルから読み込むInputFormat。

クラス名 リーダー 出力型 説明
キー
TableInputFormatBase TableRecordReader
(HBase0.89で独立したクラスになった)
Immutable
Bytes
Writable
Result HBaseのテーブルを入力とする。(抽象クラス)
自分でHBaseのテーブルからデータを読み込むInputFormatを作りたい場合は、このクラスから作ると便利。
キーはROW、値はスキャンされた1レコード。
TableInputFormat 同上 Immutable
Bytes
Writable
Result HBaseのテーブルを入力とする。(具象クラス)
このクラスは、confからテーブル名や検索条件を読み込んでTableInputFormatBaseを初期化するだけ。
(実体はTableInputFormatBaseにある)
TableInputFormatの使用例

TableInputFormatの使用例

TableInputFormatを使う際は、job・confに関連情報を自分で設定するか、TableMapReduceUtil#initTableMapperJob()を使用して設定する。

		job.setInputFormatClass(TableInputFormat.class);		// HBaseから入力
		conf.set(TableInputFormat.INPUT_TABLE, "tableName");	// テーブル名
		conf.set(TableInputFormat.SCAN, new Scan());		// スキャン条件
		job.setMapperClass(WordCountMap.class);	// Mapper
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		TableMapReduceUtil.initTableMapperJob(
			"tableName",	// テーブル名
			new Scan(),	// スキャン条件
			WordCountMap.class,	// Mapper
			Text.class,
			IntWritable.class,
			job);

OutputFormat

Hadoop0.20のmapreduceパッケージを使ってHBaseのテーブルを更新するOutputFormat。

クラス名
(ライター・コミッター)
入力型 説明
キー
TableOutputFormat
TableRecordWriter
TableOutputCommitter
任意 Writable HBaseのテーブルへ出力する(HBaseのテーブルを更新する)。
キーは使用されないので何でもいいが、ソート不要という意味でNullWritableを指定するのが良さげ。
値の型はWritableだが、実際に受け入れられるのはPutDeleteのみ。それ以外を指定するとIOExceptionが発生する。
テーブル名はconf"hbase.mapred.outputtable"で指定する。
オートフラッシュはオフ(false:自動ではコミットしない)になっているが、デフォルトのコミッターでは何も行わない。
OutputFormatがclose()される際にまとめてコミット(フラッシュ)される。
TableOutputFormatの使用例
TableOutputFormatの危険性 [2012-04-28]
MultiTableOutputFormat
(MultiTableRecordWriter・
TableOutputCommitter
Immutable
Bytes
Writable
Writable HBase0.89以降。
HBaseのテーブルへ出力する(HBaseのテーブルを更新する)。
キーにテーブル名を指定することにより、複数テーブルに対応している。
それ以外はTableOutputFormatと同様。
MultiTableOutputFormatの使用例
HFileOutputFormat Immutable
Bytes
Writable
KeyValue HBaseのデータをHFileに出力する。(後続処理でHFileをHBaseへバルクロードする)
使用する場合はHFileOutputFormat.configureIncrementalLoad()で設定する。
→使い方は『HADOOP HACKS』に載っている。[2012-04-28]
IndexOutputFormat
(IndexRecordWriter・
FileOutputCommitter)
Immutable
Bytes
Writable
Lucene
Document
Wrapper
HBaseからApache Luceneという全文検索ソフト用のデータを作成する?
IndexTableReducerからデータを受け取る想定と思われる。
HBase0.89で廃止された。

TableOutputFormatの使用例

TableOutputFormatを使う際は、job・confに関連情報を自分で設定するか、TableMapReduceUtil#initTableReducerJob()を使用して設定する。

		job.setOutputFormatClass(TableOutputFormat.class);		// HBaseを更新
		conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);	// テーブル名
		job.setReducerClass(reducerClass);
		job.setOutputKeyClass(ImmutableBytesWritable.class);
		job.setOutputValueClass(Writable.class);
		job.setPartitionerClass(HRegionPartitioner.class);
		TableMapReduceUtil.initTableReducerJob(
			tableName,	// テーブル名
			reducerClass,
			job,
			HRegionPartitioner.class);

TableMapReduceUtil.initTableReducerJob()でPartitionerにHRegionPartitionerを指定すると、Reduceタスクの個数の最大値がテーブルのリージョン(データの領域の個数)に抑えられる。[2010-08-12]
ただし何も指定していないとReduceタスクの個数は1個なので、その場合は最大値が1になるから意味が無い(苦笑)
別途TableMapReduceUtil.setNumReduceTasks(tableName, job)を呼ぶと、Reduceタスクの個数がテーブルのリージョン数になる。

HBaseへインポートする例集計結果を保存する例


MultiTableOutputFormatの使用例

MultiTableOutputFormatでは、事前にconfにテーブル名をセットする必要は無い。
また、PartitionerにHRegionPartitionerを指定してはいけない。(HRegionPartitionerTableOutputFormat専用なので)

		job.setOutputFormatClass(MultiTableOutputFormat.class);
		job.setReducerClass(SampleMultiTableReducer.class);
		job.setOutputKeyClass(ImmutableBytesWritable.class);
		job.setOutputValueClass(Put.class);
//×		job.setPartitionerClass(HRegionPartitioner.class);
public class SampleMultiTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable value : values) {
			sum += value.get();
		}

		ImmutableBytesWritable tableName = new ImmutableBytesWritable(Bytes.toBytes("更新テーブル名"));
		Put put = new Put(〜);
		〜
		context.write(tableName, put);
	}
}

ちなみにMultiTableOutputFormatではPartitionerを指定しないが、[2010-08-12]
この場合、デフォルトのHashPartitionerによってキー(更新対象テーブル名)のハッシュコードでReduceタスクに振り分けられてしまう事になるので、テーブルのリージョン(データが配置されている領域)とは無関係になってしまうから、微妙に効率悪そう。
何も指定しないとReduceタスクの個数は1個なので、その場合はPartitionerなんて気にする必要も無いが(苦笑))


TableOutputFormatの危険性

TableOutputFormatは分かり易いので試すにはいいが、大規模・正式な業務では使わない方がいいらしい。[2012-04-28]
@ueshinさんから教えていただいた話。→その他の話

TableOutputFormatは出力バッファがたまるとflushするので、途中でジョブ(タスク)が落ちたりしたら、半端に更新された状態になってしまう。
また、タスクの投機的実行によって同じタスクが動く場合も更新が二重に行われることになる。
(そういう状態になっても大丈夫な処理・業務内容の場合だけTableOutputFormatを使ってもよい、ということだと思う)

ではどうすればいいかと言うと、HFileOutputFormatを使う。
HFileOutputFormatで一旦HFileを作成し、その後HFileをHBaseへバルクロードする。
HFileを作る部分は、ファイルを作っているだけなのでやり直しもきくしタスクが投機的実行されても問題ない。
HFileをHBaseへバルクロードする部分はアトミックに行われるそうなので、これも大丈夫。

HFileOutputFormatの使い方やバルクロードの方法は『HADOOP HACKS』を参照。


Mapper

Hadoop0.20のmapreduceパッケージを使ってHBaseのテーブルから読み込んだデータを処理するMapper。

クラス名 入力型 出力型 説明
KEYIN VALIN KEYOUT VALOUT
TableMapper Immutable
Bytes
Writable
Result 任意 任意 HBaseのテーブルを入力とするMapper。(抽象クラス)
TableMapperの使用例
IdentityTableMapper Immutable
Bytes
Writable
Result Immutable
Bytes
Writable
Result HBaseのテーブルからの入力をそのまま出力する。
GroupingTableMapper Immutable
Bytes
Writable
Result Immutable
Bytes
Writable
Result HBaseのテーブルの複数columnをグルーピングする(?)

TableMapperの使用例

public class SampleTableMapper extends TableMapper<Text, Text> { //型引数には出力型を指定する
	private Text keyOut = new Text();
	private Text valOut = new Text();

	/**
	 * @param row	行キー(ROW)
	 * @param result	1行分のデータ
	 * @param context	MapperのContext
	 */
	@Override
	protected void map(ImmutableBytesWritable row, Result result, Context context) throws IOException, InterruptedException {
		for (KeyValue kv : result.raw()) {
			String f = Bytes.toString(kv.getBuffer(), kv.getFamilyOffset(),    kv.getFamilyLength()   );
			String q = Bytes.toString(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
			String v = Bytes.toString(kv.getBuffer(), kv.getValueOffset(),     kv.getValueLength()    );

			keyOut.set(f + ":" + q);
			valOut.set(v);
			context.write(keyOut, valOut);
		}
	}
}

Reducer

Hadoop0.20のmapreduceパッケージを使ってHBaseのテーブルを更新する為のReducer。

クラス名 入力型 出力型 説明
KEYIN VALIN KEYOUT VALOUT
TableReducer 任意 任意 任意 Writable HBaseのテーブルへ出力する為のReducer。(抽象クラス)
TableReducerの使用例
IdentityTableReducer Writable Writable Writable Writable 入力のキーと値をそのまま全て出力する。
PutまたはDeleteをそのままTableOutputFormatへ出力する為のReducer。
でもこれを使うくらいなら、Reducerの個数を0個にする方が高速だと思う。
KeyValueSortReducer Immutable
Bytes
Writable
KeyValue Immutable
Bytes
Writable
KeyValue KeyValueの一覧を受け取り、KeyValueに定義されているソート順でソートして全て出力する。
HFileOutputFormat.configureIncrementalLoad()でMapperの出力型がKeyValueの場合に使われる。
PutSortReducer Immutable
Bytes
Writable
Put Immutable
Bytes
Writable
KeyValue HBase0.89以降。
Put内のKeyValueを抜き取り、KeyValueに定義されているソート順でソートして全て出力する。
HFileOutputFormat.configureIncrementalLoad()でMapperの出力型がPutの場合に使われる。
IndexTableReducer Immutable
Bytes
Writable
Result Immutable
Bytes
Writable
Lucene
Document
Wrapper
Apache Luceneという全文検索ソフトがあるらしい。
それ用のデータをHBaseから作成する為のReducerか?
出力先はIndexOutputFormatが想定されている模様。
HBase0.89で廃止された。

TableReducerの使用例

public class SampleTableReducer extends TableReducer<ImmutableBytesWritable, IntWritable, NullWritable> {
	private static final byte[] FAMILY    = Bytes.toBytes("family");
	private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");

	/**
	 * @param key	Mapperで出力されたキー(この例ではROWという想定)
	 * @param values	Mapperで出力された値 (この例ではintの値)
	 * @param context	ReducerのContext
	 */
	@Override
	protected void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable value : values) {
			sum += value.get();
		}

		Put put = new Put(key.get());
		put.add(FAMILY, QUALIFIER, Bytes.toBytes(sum));
		context.write(NullWritable.get(), put);	//TableOutputFormatへ受け渡すので、キーはNullWritable
	}
}

Partitioner

Hadoop0.20のmapreduceパッケージを使用するPartitionerクラス。

クラス名 説明
KEY VALUE
HRegionPartitioner Immutable
Bytes
Writable?
任意 リージョン毎に分割するPartitioner。
TableOutputFormatで使用する。
型引数KEYが指定できるようになっているが、使われていないから無意味では?
SimpleTotalOrderPartitioner Immutable
Bytes
Writable
任意 HBase0.89以降。
HFileOutputFormatと連動して使用する模様。

TableMapReduceUtil

TableMapReduceUtilは、HBaseのHadoop関連クラスを扱うユーティリティー。

メソッド名 説明 引数
initTableMapperJob() TableInputFormatを使用する為の設定を行う。 table 読み込むテーブルのテーブル名。
scan スキャン条件。null不可。
mapper TableMapperを継承したクラス。
outputKeyClass Mapperの出力キーの型。null可。
outputValueClass Mapperの出力値の型。null可。
job 設定対象のJob。
initTableReducerJob() TableOutputFormatを使用する為の設定を行う。 table 更新するテーブルのテーブル名。
reducer TableReducerを継承したクラス。
job 設定対象のJob。
partitioner Partitioner
nullを指定すると特に設定されない。
HRegionPartitionerを指定すると、Reducerの個数が考慮される。
quorumAddress HBase0.89以降。null可。
serverClass HBase0.89以降。null可。
serverImpl HBase0.89以降。null可。
limitNumReduceTasks() Reducerの個数の上限を設定する。
現在のReducerの個数の設定 (デフォルトは1)が
テーブルのリージョン数を上回っている場合、
Reducerの個数をリージョン数に変更する。
table 更新するテーブルのテーブル名。
job 設定対象のJob。
setNumReduceTasks() Reducerの個数をテーブルのリージョン数に設定する。 table 更新するテーブルのテーブル名。
job 設定対象のJob。
setScannerCaching() スキャナーがレコードを返す際のキャッシュされるレコード数を設定する。 job 設定対象のJob。
batchSize レコード数。

Java APIへ戻る / HBaseへ戻る / Cassandraへ行く / 技術メモへ戻る
メールの送信先:ひしだま