|
|
|
|
HBaseのテーブルの全項目を読み込み、単語数を数えるサンプル。
出力先はファイル。(通常のHadoopと同じ)
(→HBaseのテーブルに出力するWordCount)
package jp.hishidama.sample.hbase;
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
public class HBaseWordCount extends Configured implements Tool { public static void main(String[] args) throws Exception { int r = ToolRunner.run(new HBaseWordCount(), args); System.exit(r); }
@Override public int run(String[] args) throws Exception { Job job = new Job(getConf(), "wordcount(HBase)"); job.setJarByClass(getClass()); Configuration conf = job.getConfiguration(); job.setInputFormatClass(TableInputFormat.class); // HBaseから入力 conf.set(TableInputFormat.INPUT_TABLE, args[0]); // テーブル名 job.setMapperClass(WordCountMap.class); // 単語を分解するMapper job.setReducerClass(IntSumReducer.class); // 集計するReducer job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(TextOutputFormat.class); // ファイルへ出力 FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; }
static class WordCountMap extends TableMapper<Text, IntWritable> { private static final IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { for (KeyValue kv : value.raw()) { String s = Bytes.toString(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); StringTokenizer tokenizer = new StringTokenizer(s); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } } }
TableInputFormatを使うと、指定したテーブルの項目を読み込み、Mapperを呼び出す。
Mapperの入力キーは読み込んだレコードの行キー(ROW)、値はその行の全データ(Result)となる。
(TableInputFormat用にScanを設定することにより、読み込むデータを絞ることが可能)
Hadoopのプログラムなので、Hadoopのシェルから実行する。
その際、HBaseのjarファイルをHadoopのクラスパスに加えておく必要がある。
# Extra Java CLASSPATH elements. Optional.
# export HADOOP_CLASSPATH=
HBASE_HOME=/usr/local/hbase-0.89
HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_HOME/hbase-0.89.20100621.jar:$HBASE_HOME/lib/zookeeper-3.3.1.jar:$HBASE_HOME/conf
export HADOOP_CLASSPATH
※Hadoopの起動中に変更した場合は、Hadoopの再起動が必要。
$ hadoop fs -rmr output $ hadoop jar hbase-sample.jar jp.hishidama.sample.hbase.HBaseWordCount TEST_TYPE output $ hadoop fs -cat output/part-r-00000
Hadoop0.20のmapreduceパッケージを使ってHBaseのテーブルから読み込むInputFormat。
クラス名 | リーダー | 出力型 | 説明 | |
---|---|---|---|---|
キー | 値 | |||
TableInputFormatBase |
TableRecordReader (HBase0.89で独立したクラスになった) |
Immutable Bytes Writable |
Result | HBaseのテーブルを入力とする。(抽象クラス) 自分でHBaseのテーブルからデータを読み込むInputFormatを作りたい場合は、このクラスから作ると便利。 キーはROW、値はスキャンされた1レコード。 |
TableInputFormat | 同上 | Immutable Bytes Writable |
Result | HBaseのテーブルを入力とする。(具象クラス) このクラスは、confからテーブル名や検索条件を読み込んでTableInputFormatBaseを初期化するだけ。 (実体はTableInputFormatBaseにある) →TableInputFormatの使用例 |
TableInputFormatを使う際は、job・confに関連情報を自分で設定するか、TableMapReduceUtil#initTableMapperJob()を使用して設定する。
job.setInputFormatClass(TableInputFormat.class); // HBaseから入力 conf.set(TableInputFormat.INPUT_TABLE, "tableName"); // テーブル名 conf.set(TableInputFormat.SCAN, new Scan()); // スキャン条件 job.setMapperClass(WordCountMap.class); // Mapper job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class);
TableMapReduceUtil.initTableMapperJob( "tableName", // テーブル名 new Scan(), // スキャン条件 WordCountMap.class, // Mapper Text.class, IntWritable.class, job);
Hadoop0.20のmapreduceパッケージを使ってHBaseのテーブルを更新するOutputFormat。
クラス名 (ライター・コミッター) |
入力型 | 説明 | |
---|---|---|---|
キー | 値 | ||
TableOutputFormat (TableRecordWriter・ TableOutputCommitter) |
任意 | Writable | HBaseのテーブルへ出力する(HBaseのテーブルを更新する)。 キーは使用されないので何でもいいが、ソート不要という意味でNullWritableを指定するのが良さげ。 値の型はWritableだが、実際に受け入れられるのはPutとDeleteのみ。それ以外を指定するとIOExceptionが発生する。 テーブル名はconfの"hbase.mapred.outputtable"で指定する。 オートフラッシュはオフ(false:自動ではコミットしない)になっているが、デフォルトのコミッターでは何も行わない。 OutputFormatがclose()される際にまとめてコミット(フラッシュ)される。 →TableOutputFormatの使用例 →TableOutputFormatの危険性 [2012-04-28] |
MultiTableOutputFormat (MultiTableRecordWriter・ TableOutputCommitter) |
Immutable Bytes Writable |
Writable | HBase0.89以降。 HBaseのテーブルへ出力する(HBaseのテーブルを更新する)。 キーにテーブル名を指定することにより、複数テーブルに対応している。 それ以外はTableOutputFormatと同様。 →MultiTableOutputFormatの使用例 |
HFileOutputFormat | Immutable Bytes Writable |
KeyValue | HBaseのデータをHFileに出力する。(後続処理でHFileをHBaseへバルクロードする) 使用する場合はHFileOutputFormat.configureIncrementalLoad()で設定する。 →使い方は『HADOOP HACKS』に載っている。[2012-04-28] |
IndexOutputFormat (IndexRecordWriter・ FileOutputCommitter) |
Immutable Bytes Writable |
Lucene Document Wrapper |
HBaseからApache Luceneという全文検索ソフト用のデータを作成する? IndexTableReducerからデータを受け取る想定と思われる。 HBase0.89で廃止された。 |
TableOutputFormatを使う際は、job・confに関連情報を自分で設定するか、TableMapReduceUtil#initTableReducerJob()を使用して設定する。
job.setOutputFormatClass(TableOutputFormat.class); // HBaseを更新 conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); // テーブル名 job.setReducerClass(reducerClass); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); job.setPartitionerClass(HRegionPartitioner.class);
TableMapReduceUtil.initTableReducerJob( tableName, // テーブル名 reducerClass, job, HRegionPartitioner.class);
TableMapReduceUtil.initTableReducerJob()
でPartitionerにHRegionPartitionerを指定すると、Reduceタスクの個数の最大値がテーブルのリージョン(データの領域の個数)に抑えられる。[2010-08-12]
ただし何も指定していないとReduceタスクの個数は1個なので、その場合は最大値が1になるから意味が無い(苦笑)
別途TableMapReduceUtil.setNumReduceTasks(tableName,
job)
を呼ぶと、Reduceタスクの個数がテーブルのリージョン数になる。
MultiTableOutputFormatでは、事前にconfにテーブル名をセットする必要は無い。
また、PartitionerにHRegionPartitionerを指定してはいけない。(HRegionPartitionerはTableOutputFormat専用なので)
job.setOutputFormatClass(MultiTableOutputFormat.class); job.setReducerClass(SampleMultiTableReducer.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); //× job.setPartitionerClass(HRegionPartitioner.class);
public class SampleMultiTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } ImmutableBytesWritable tableName = new ImmutableBytesWritable(Bytes.toBytes("更新テーブル名")); Put put = new Put(〜); 〜 context.write(tableName, put); } }
ちなみにMultiTableOutputFormatではPartitionerを指定しないが、[2010-08-12]
この場合、デフォルトのHashPartitionerによってキー(更新対象テーブル名)のハッシュコードでReduceタスクに振り分けられてしまう事になるので、テーブルのリージョン(データが配置されている領域)とは無関係になってしまうから、微妙に効率悪そう。
(何も指定しないとReduceタスクの個数は1個なので、その場合はPartitionerなんて気にする必要も無いが(苦笑))
TableOutputFormatは分かり易いので試すにはいいが、大規模・正式な業務では使わない方がいいらしい。[2012-04-28]
(@ueshinさんから教えていただいた話。→その他の話)
TableOutputFormatは出力バッファがたまるとflushするので、途中でジョブ(タスク)が落ちたりしたら、半端に更新された状態になってしまう。
また、タスクの投機的実行によって同じタスクが動く場合も更新が二重に行われることになる。
(そういう状態になっても大丈夫な処理・業務内容の場合だけTableOutputFormatを使ってもよい、ということだと思う)
ではどうすればいいかと言うと、HFileOutputFormatを使う。
HFileOutputFormatで一旦HFileを作成し、その後HFileをHBaseへバルクロードする。
HFileを作る部分は、ファイルを作っているだけなのでやり直しもきくしタスクが投機的実行されても問題ない。
HFileをHBaseへバルクロードする部分はアトミックに行われるそうなので、これも大丈夫。
HFileOutputFormatの使い方やバルクロードの方法は『HADOOP HACKS』を参照。
Hadoop0.20のmapreduceパッケージを使ってHBaseのテーブルから読み込んだデータを処理するMapper。
クラス名 | 入力型 | 出力型 | 説明 | ||
---|---|---|---|---|---|
KEYIN | VALIN | KEYOUT | VALOUT | ||
TableMapper | Immutable Bytes Writable |
Result | 任意 | 任意 | HBaseのテーブルを入力とするMapper。(抽象クラス) →TableMapperの使用例 |
IdentityTableMapper | Immutable Bytes Writable |
Result | Immutable Bytes Writable |
Result | HBaseのテーブルからの入力をそのまま出力する。 |
GroupingTableMapper | Immutable Bytes Writable |
Result | Immutable Bytes Writable |
Result | HBaseのテーブルの複数columnをグルーピングする(?) |
public class SampleTableMapper extends TableMapper<Text, Text> { //型引数には出力型を指定する private Text keyOut = new Text(); private Text valOut = new Text(); /** * @param row 行キー(ROW) * @param result 1行分のデータ * @param context MapperのContext */ @Override protected void map(ImmutableBytesWritable row, Result result, Context context) throws IOException, InterruptedException { for (KeyValue kv : result.raw()) { String f = Bytes.toString(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength() ); String q = Bytes.toString(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()); String v = Bytes.toString(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength() ); keyOut.set(f + ":" + q); valOut.set(v); context.write(keyOut, valOut); } } }
Hadoop0.20のmapreduceパッケージを使ってHBaseのテーブルを更新する為のReducer。
クラス名 | 入力型 | 出力型 | 説明 | ||
---|---|---|---|---|---|
KEYIN | VALIN | KEYOUT | VALOUT | ||
TableReducer | 任意 | 任意 | 任意 | Writable | HBaseのテーブルへ出力する為のReducer。(抽象クラス) →TableReducerの使用例 |
IdentityTableReducer | Writable | Writable | Writable | Writable | 入力のキーと値をそのまま全て出力する。 PutまたはDeleteをそのままTableOutputFormatへ出力する為のReducer。 でもこれを使うくらいなら、Reducerの個数を0個にする方が高速だと思う。 |
KeyValueSortReducer | Immutable Bytes Writable |
KeyValue | Immutable Bytes Writable |
KeyValue | KeyValueの一覧を受け取り、KeyValueに定義されているソート順でソートして全て出力する。 HFileOutputFormat.configureIncrementalLoad()でMapperの出力型がKeyValueの場合に使われる。 |
PutSortReducer | Immutable Bytes Writable |
Put | Immutable Bytes Writable |
KeyValue |
HBase0.89以降。 Put内のKeyValueを抜き取り、KeyValueに定義されているソート順でソートして全て出力する。 HFileOutputFormat.configureIncrementalLoad()でMapperの出力型がPutの場合に使われる。 |
IndexTableReducer | Immutable Bytes Writable |
Result | Immutable Bytes Writable |
Lucene Document Wrapper |
Apache Luceneという全文検索ソフトがあるらしい。 それ用のデータをHBaseから作成する為のReducerか? 出力先はIndexOutputFormatが想定されている模様。 HBase0.89で廃止された。 |
public class SampleTableReducer extends TableReducer<ImmutableBytesWritable, IntWritable, NullWritable> { private static final byte[] FAMILY = Bytes.toBytes("family"); private static final byte[] QUALIFIER = Bytes.toBytes("qualifier"); /** * @param key Mapperで出力されたキー(この例ではROWという想定) * @param values Mapperで出力された値 (この例ではintの値) * @param context ReducerのContext */ @Override protected void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } Put put = new Put(key.get()); put.add(FAMILY, QUALIFIER, Bytes.toBytes(sum)); context.write(NullWritable.get(), put); //TableOutputFormatへ受け渡すので、キーはNullWritable } }
Hadoop0.20のmapreduceパッケージを使用するPartitionerクラス。
クラス名 | 型 | 説明 | |
---|---|---|---|
KEY | VALUE | ||
HRegionPartitioner | Immutable Bytes Writable? |
任意 | リージョン毎に分割するPartitioner。 TableOutputFormatで使用する。 型引数KEYが指定できるようになっているが、使われていないから無意味では? |
SimpleTotalOrderPartitioner | Immutable Bytes Writable |
任意 | HBase0.89以降。 HFileOutputFormatと連動して使用する模様。 |
TableMapReduceUtilは、HBaseのHadoop関連クラスを扱うユーティリティー。
メソッド名 | 説明 | 引数 | |
---|---|---|---|
initTableMapperJob() | TableInputFormatを使用する為の設定を行う。 | table | 読み込むテーブルのテーブル名。 |
scan | スキャン条件。null不可。 | ||
mapper | TableMapperを継承したクラス。 | ||
outputKeyClass | Mapperの出力キーの型。null可。 | ||
outputValueClass | Mapperの出力値の型。null可。 | ||
job | 設定対象のJob。 | ||
initTableReducerJob() | TableOutputFormatを使用する為の設定を行う。 | table | 更新するテーブルのテーブル名。 |
reducer | TableReducerを継承したクラス。 | ||
job | 設定対象のJob。 | ||
partitioner | Partitioner。 nullを指定すると特に設定されない。 HRegionPartitionerを指定すると、Reducerの個数が考慮される。 |
||
quorumAddress | HBase0.89以降。null可。 | ||
serverClass | HBase0.89以降。null可。 | ||
serverImpl | HBase0.89以降。null可。 | ||
limitNumReduceTasks() |
Reducerの個数の上限を設定する。 現在のReducerの個数の設定 (デフォルトは1)が テーブルのリージョン数を上回っている場合、 Reducerの個数をリージョン数に変更する。 |
table | 更新するテーブルのテーブル名。 |
job | 設定対象のJob。 | ||
setNumReduceTasks() | Reducerの個数をテーブルのリージョン数に設定する。 | table | 更新するテーブルのテーブル名。 |
job | 設定対象のJob。 | ||
setScannerCaching() | スキャナーがレコードを返す際のキャッシュされるレコード数を設定する。 | job | 設定対象のJob。 |
batchSize | レコード数。 |