HBaseのテーブルでHadoopのWordCountを作ってみる実験。
|
|
最初に、指定されたHBaseのテーブル内の全カラムを読み込んで単語数をカウントし、結果をファイルへ出力するWordCountを作ってみた。
(→HBaseテーブルtoファイルのWordCount)
次に、カウント結果をファイルでなくHBaseのテーブルに出力してみたいと思う。
ここで問題となるのが、出力テーブルをどういうレイアウト(項目定義)にすればよいか。
RDBであれば、以下のようなテーブルレイアウトになると思う。
項目名 | データ型 | 制約 |
---|---|---|
単語 | varchar2(100) | primary key |
件数 | number(9) | not null |
『単語』の最大文字数は何とも言えないので、とりあえず100文字にしておこうか。
この辺り、KVSでは特に決める必要が無いので楽だ。(バイナリーで何でも入れられるから、事前に枠を決めておく必要が無い)
『件数』は数値なので、これも桁数を決めておく必要がある。
KVSでも数値として扱うならintかlongかくらいは決めないといけないが、Hadoop本家本元のWordCountのサンプルではintを使っている。
さて、一応これで良さそうだが、このレイアウトでは、入力テーブル別に単語・件数を保持することが出来ない。
なので『入力テーブル名』(単語を収集した元のテーブル)もプライマリキーに加えてやろう。
項目名 | データ型 | 制約 |
---|---|---|
入力テーブル名 | varchar2(32) | primary key |
単語 | varchar2(100) | |
件数 | number(9) | not null |
そして、RDBとしては、正規化してテーブル名を保持するテーブルを分けるのが正統な感じがする。
項目名 | データ型 | 制約 |
---|---|---|
テーブルID | number(4) | primary key |
テーブル名 | varchar2(32) | unique |
項目名 | データ型 | 制約 |
---|---|---|
入力テーブルID | number(4) | primary key |
単語 | varchar2(100) | |
件数 | number(9) | not null |
create table table1r ( TABLE_ID number(4) primary key, TABLE_NAME varchar2(32) unique ); create table wordcount1r ( TABLE_ID number(4), WORD varchar2(100), CNT number(9) not null, primary key(TABLE_ID, WORD) );
このRDBのテーブル設計を、HBaseのテーブルにそのまま流用することが出来そうだ。
HBaseでは行キー(ROW)が各レコードのプライマリキーとなるので、上記のRDBテーブルのプライマリキーである2項目をつないで1つにして行キーとして使う。
『テーブルID』だったら桁数が固定なので、キーの一部に使っても問題は起きない(区別が付く)。
行キー | カラム |
---|---|
テーブルID(4バイト) | テーブル名 |
行キー | カラム |
---|---|
テーブルID(4バイト) + 単語 | 件数 |
create 'table1', 'TABLE_NAME' create 'wordcount1', 'CNT'
しかしKVSはRDBでいう結合(join)が苦手なので(というかHBaseではサポートされていないので)、『テーブル名』くらいならデータ本体と一緒に持たせてしまった方が良さそう。
(結果を表示する際はテーブル名を使いたい)
ただし「テーブル名 + 単語」では『テーブル名』も『単語』も可変長だから、つないだ文字列が一意に定まるとは限らず、キーには向かない。
絶対に使われない値を決めて、それを区切り文字として使えばいいかな。
WordCountの場合はスペース(0x20)でいい気がする。そもそも単語は空白区切りで分割・認識するからスペースを含まないはずだし、テーブル名にスペースを使うのを禁止しても害は少ないだろう。
行キー | カラム |
---|---|
テーブル名 + 区切り文字 + 単語 | 件数 |
create 'wordcount2', 'CNT'
ここでちょっと発想の転換。
HBaseはカラムファミリーがあって、その下ではqualifierが追加し放題。したがって、qualifierに『テーブル名』を使えばいいんじゃないか。
行キー | family『件数』 |
---|---|
qualifier『テーブル名』 | |
単語 | 件数 |
ん〜、HBaseのテーブルレイアウト(テーブル定義・テーブル設計書)って、どう書けばいい(分かりやすい)んだ(苦笑)
create 'wordcount3', 'CNT'
これはRDBで言えば『テーブル名』を含んだカラム(列)を持つということで、RDBだったら『テーブル名』が増える度に列追加をする必要があるから全然現実的ではないが、カラムファミリータイプのKVS(HBase)では問題無いだろう。
wordcount3r 項目名 データ型 制約 単語 varchar2(100) primary key テーブル0001_件数 number(9) テーブル0002_件数 number(9) テーブル0003_件数 number(9) …
しかしこの場合、HBase Sehllだと、“特定テーブルの単語一覧”という表示の仕方がちょっと面倒。(getでは行キーを指定しないといけないから使えない。scanなら可)
また、再実行しようと思ったら一旦該当テーブルのデータだけ削除したいが、“全行の特定カラム”という消し方も面倒。(deleteallでは行キーを指定する)
そこでまた逆転の発想で、『テーブル名』を行キーにして、『単語』をカラムにする。
こうすればHBase Sehllのgetやdeleteallで『テーブル名』を簡単に指定できる。
行キー | family『件数』 |
---|---|
qualifier『単語』 | |
テーブル名 | 件数 |
create 'wordcount4', 'CNT'
しかしこうなると、qualifierの個数(RDBで言う列数)がかなりの量になると予想され、RDB脳にはなんだか不安な設計だ(苦笑)
でもHBaseのトップページのOverviewには「very large tables -- billions of rows X millions of columns --」(十億行×百万カラム)と書かれており、単語数で使う程度なら特に問題なさそう。(by @miyakawa_takuさん)
これでテーブル設計の案が出揃ったので、実際に使うとしたらどうなるかシミュレートしてみる。
public static final byte[] TABLE_ID = Bytes.toBytes("TABLE_ID"); //テーブルID public static final byte[] TABLE_NAME = Bytes.toBytes("TABLE_NAME"); //テーブル名 public static final byte[] CNT = Bytes.toBytes("CNT"); //件数 protected byte[] tableName = Bytes.toBytes("入力テーブル名");
テーブル案 | 表示方法 | 初期化(削除)方法 | 備考 |
---|---|---|---|
wordcount1r |
select WORD, CNT from wordcount1r w, table1r t where t.TABLE_NAME = '入力テーブル名' and w.TABLE_ID = t.TABLE_ID ; |
delete from wordcount1r where TABLE_ID = ( select TABLE_ID from table1r where TABLE_NAME = '入力テーブル名' ); |
|
wordcount1 (正規化) (行キー:テーブルID+単語) |
//テーブルIDを取得 byte[] tableId = null; { Filter filter = new SingleColumnValueFilter( TABLE_NAME, HConstants.EMPTY_BYTE_ARRAY, CompareOp.EQUAL, tableName ); Scan scan = new Scan(HConstants.EMPTY_START_ROW, filter); scan.addColumn(TABLE_ID, HConstants.EMPTY_BYTE_ARRAY); ResultScanner rs = table1.getScanner(scan); try { for (Result r : rs) { for (KeyValue kv : r.raw()) { tableId = kv.getValue(); } break; } } finally { rs.close(); } } Filter filter = new PrefixFilter(tableId); Scan scan = new Scan(tableId, filter); ResultScanner rs = table.getScanner(scan); try { for (Result r : rs) { byte[] row = r.getRow(); //テーブルIDは4桁であり、単語はそれ以降 String word = Bytes.toString(row, 4, row.length - 4); for (KeyValue kv : r.raw()) { int cnt = Bytes.toInt( kv.getBuffer(), kv.getValueOffset(), kv.getValueLength() ); System.out.printf("%s\t%d%n", word, cnt); } } } finally { rs.close(); } |
//テーブルIDを取得 同左 Filter filter = new PrefixFilter(tableId); Scan scan = new Scan(tableId, filter); ResultScanner rs = table.getScanner(scan); try { for (Result r : rs) { byte[] row = r.getRow(); Delete del = new Delete(row); table.delete(del); } } finally { rs.close(); } table.flushCommits(); |
テーブルIDを取り出すだけでも面倒だが(苦笑) 削除もまた面倒。 本気で使うなら、きっと テーブル名でインデックスを作ることになる。 |
wordcount2 (行キー:テーブル名+単語) |
Filter filter = new PrefixFilter(tableName); Scan scan = new Scan(tableName, filter); ResultScanner rs = table.getScanner(scan); try { for (Result r : rs) { String row = Bytes.toString(r.getRow()); String word = row.substring(row.indexOf(' ') + 1); for (KeyValue kv : r.raw()) { int cnt = Bytes.toInt( kv.getBuffer(), kv.getValueOffset(), kv.getValueLength() ); System.out.printf("%s\t%d%n", word, cnt); } } } finally { rs.close(); } |
Filter filter = new PrefixFilter(tableName); Scan scan = new Scan(tableName, filter); ResultScanner rs = table.getScanner(scan); try { for (Result r : rs) { byte[] row = r.getRow(); Delete del = new Delete(row); table.delete(del); } } finally { rs.close(); } table.flushCommits();
|
|
wordcount3r |
select WORD, テーブル0001_件数 from wordcount3r where テーブル0001_件数 is not null ; |
update wordcount3r set テーブル0001_件数 = null ; |
どのテーブルがどの項目かを 動的に管理するのは無理がある。 |
wordcount3 (行キー:単語) |
Scan scan = new Scan(); scan.addColumn(CNT, tableName); ResultScanner rs = table.getScanner(scan); try { for (Result r : rs) { String word = Bytes.toString(r.getRow()); for (KeyValue kv : r.raw()) { int cnt = Bytes.toInt( kv.getBuffer(), kv.getValueOffset(), kv.getValueLength() ); System.out.printf("%s\t%d%n", word, cnt); } } } finally { rs.close(); } |
Scan scan = new Scan(); scan.addColumn(CNT, tableName); ResultScanner rs = table.getScanner(scan); try { for (Result r : rs) { Delete del = new Delete(r.getRow()); del.deleteColumn(CNT, tableName); table.delete(del); } } finally { rs.close(); } table.flushCommits(); |
|
scan 'wordcount3', { COLUMN=>'CNT:入力テーブル名' } |
|||
wordcount4 (行キー:テーブル名) |
Get get = new Get(tableName); Result r = table.get(get); for (KeyValue kv : r.raw()) { String word = Bytes.toString( kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength() ); int cnt = Bytes.toInt( kv.getBuffer(), kv.getValueOffset(), kv.getValueLength() ); System.out.printf("%s\t%d%n", word, cnt); } |
Delete del = new Delete(tableName); table.delete(del); table.flushCommits(); |
|
get 'wordcount4','入力テーブル名' |
deleteall 'wordcount4','入力テーブル名' |
wordcount1(行キーにテーブルIDを含める方式)に出力するHadoopプログラムは割愛。
とにかく面倒そうなのが、テーブルIDを採番する方法。(そのテーブル名のIDが存在していればそれを使うし、無ければ新しい番号を登録する。この際に排他を考慮する必要あり)
まぁHadoopを使うということはバッチ処理なので、“他処理が同時に動いてテーブルIDを採番する可能性”が無いという前提であれば、それほど難しいことは無いが。
あるいはマスターに事前に登録しておく前提で、IDが取得できなかったら異常終了させる。
テーブルIDさえ採番できてしまえば、wordcount2のプログラムでテーブル名を使う代わりにテーブルIDを使えばいい。
wordcount2(行キー:入力テーブル名+スペース+単語)に出力する例。
public class HBaseWordCount2 extends Configured implements Tool { public static final String OUT_TABLE = "wordcount2"; public static final byte[] OUT_FAMILY = Bytes.toBytes("CNT"); public static final char DELIM = ' ';
public static void main(String[] args) throws Exception { int r = ToolRunner.run(new HBaseWordCount2(), args); System.exit(r); } @Override public int run(String[] args) throws Exception { String tableName = args[0]; //入力テーブル名 clearTable(Bytes.toBytes(tableName)); Job job = new Job(getConf(), "wordcount2(HBase)"); job.setJarByClass(getClass()); TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), Map.class, Text.class, IntWritable.class, job); TableMapReduceUtil.initTableReducerJob(OUT_TABLE, Reduce.class, job, HRegionPartitioner.class); return job.waitForCompletion(true) ? 0 : 1; }
//wordcount2の既存データを削除する(→MapReduceで削除する例) void clearTable(byte[] tableName) throws IOException { Configuration conf = HBaseConfiguration.create(getConf()); HTableFactory factory = new HTableFactory(); HTableInterface table = factory.createHTableInterface(conf, Bytes.toBytes(OUT_TABLE)); try { Filter filter = new PrefixFilter(tableName); Scan scan = new Scan(tableName, filter); ResultScanner rs = table.getScanner(scan); try { for (Result r : rs) { byte[] row = r.getRow(); Delete del = new Delete(row, System.currentTimeMillis(), null); table.delete(del); } } finally { rs.close(); } } finally { factory.releaseHTableInterface(table); } }
//一番最初に作ったWordCountMapと同じ static class Map extends TableMapper<Text, IntWritable> { private static final IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { for (KeyValue kv : value.raw()) { String s = Bytes.toString(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); StringTokenizer tokenizer = new StringTokenizer(s); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } }
static class Reduce extends TableReducer<Text, IntWritable, NullWritable> { protected String tableName; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); tableName = context.getConfiguration().get(TableInputFormat.INPUT_TABLE); //入力テーブル名 } @Override protected void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } byte[] row = Bytes.toBytes(tableName + DELIM + word); Put put = new Put(row); put.add(OUT_FAMILY, HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes(sum)); context.write(NullWritable.get(), put); } } }
wordcount3(行キー:単語)とwordcount4(行キー:テーブル名)に出力する例。
wordcount3とwordcount4は似ているので、MultiTableOutputFormatのサンプルを兼ねて、まとめて出力してみた。
public class HBaseWordCount34 extends Configured implements Tool { public static final String OUT_TABLE3 = "wordcount3"; public static final String OUT_TABLE4 = "wordcount4"; public static final byte[] OUT_FAMILY = Bytes.toBytes("CNT");
public static void main(String[] args) throws Exception { int r = ToolRunner.run(new HBaseWordCount34(), args); System.exit(r); } @Override public int run(String[] args) throws Exception { String tableName = args[0]; //入力テーブル名 { HTableFactory factory = new HTableFactory(); Configuration conf = HBaseConfiguration.create(getConf()); byte[] tableNameBytes = Bytes.toBytes(tableName); clearTable3(factory, conf, tableNameBytes); clearTable4(factory, conf, tableNameBytes); } Job job = new Job(getConf(), "wordcount34(HBase)"); job.setJarByClass(getClass()); TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), Map.class, Text.class, IntWritable.class, job); job.setOutputFormatClass(MultiTableOutputFormat.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); //× job.setPartitionerClass(HRegionPartitioner.class); //MultiTableOutputFormatの場合はHRegionPartitionerは指定不可 return job.waitForCompletion(true) ? 0 : 1; }
//wordcount3の既存データを削除する(→MapReduceで削除する例) void clearTable3(HTableFactory factory, Configuration conf, byte[] tableName) throws IOException { HTableInterface table = factory.createHTableInterface(conf, Bytes.toBytes(OUT_TABLE3)); try { Scan scan = new Scan(); scan.addColumn(OUT_FAMILY, tableName); ResultScanner rs = table.getScanner(scan); long ts = System.currentTimeMillis(); try { for (Result r : rs) { Delete del = new Delete(r.getRow(), ts, null); del.deleteColumn(OUT_FAMILY, tableName, ts); table.delete(del); } } finally { rs.close(); } } finally { factory.releaseHTableInterface(table); } } //wordcount4の既存データを削除する void clearTable4(HTableFactory factory, Configuration conf, byte[] tableName) throws IOException { HTableInterface table = factory.createHTableInterface(conf, Bytes.toBytes(OUT_TABLE4)); try { Delete del = new Delete(tableName, System.currentTimeMillis(), null); table.delete(del); } finally { factory.releaseHTableInterface(table); } }
//一番最初に作ったWordCountMapと同じ static class Map extends TableMapper<Text, IntWritable> { private static final IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { for (KeyValue kv : value.raw()) { String s = Bytes.toString(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); StringTokenizer tokenizer = new StringTokenizer(s); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } }
static class Reduce extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { private static final ImmutableBytesWritable keyOut3 = new ImmutableBytesWritable(Bytes.toBytes(OUT_TABLE3)); private static final ImmutableBytesWritable keyOut4 = new ImmutableBytesWritable(Bytes.toBytes(OUT_TABLE4)); protected byte[] tableName; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); tableName = Bytes.toBytes(context.getConfiguration().get(TableInputFormat.INPUT_TABLE)); //入力テーブル名 } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable intw : values) { sum += intw.get(); } byte[] value = Bytes.toBytes(sum); byte[] word = Bytes.toBytes(key.toString()); { Put put = new Put(word); put.add(OUT_FAMILY, tableName, value); context.write(keyOut3, put); } { Put put = new Put(tableName); put.add(OUT_FAMILY, word, value); context.write(keyOut4, put); } } } }
上記のサンプルでは、wordcount2・wordcount3のテーブルのデータ削除(clearTable()・clearTable3())に何気なくScanとDeleteを組み合わせて使ったけれど、もしかしてこういうパターンでもMapReduceを使う方がいいのかも。
TableOutputFormatやMultiTableOutputFormatはDeleteにも対応しているし。
Scanで取ってきたキーに対してDeleteするなら、Scanを分散させた方が全体の処理速度は速いはず。
しかも単語カウントの前処理としてデータ削除を行うのはやめて、
削除ジョブのDelete(のコンストラクターやdeleteColumn()・deleteFamily())で現在時刻を指定しておけば、単語を数えるプログラム(データ作成ジョブ)と並行で動かしても大丈夫かも?
データ作成ジョブを削除ジョブより後で開始させれば、作成時のタイムスタンプは必ず削除日時より新しくなるから。
Deleteでタイムスタンプを指定しない場合、その時点の最新データが削除されるが、
Deleteでタイムスタンプを指定すると、HBaseはデータ内部で履歴を保持しているので、その時点以前のデータだけ消える扱いになる。
データ作成ジョブと並行で動かしても、常に削除のタイムスタンプの方が古ければ、あるカラムに対して削除処理と作成処理のどちらが先に動いても、最終的には作成されたデータだけが残る。
ちなみにDeleteの場合は集計は不要なのでReducerは要らず、Mapperだけでいい。(setNumReduceTasks()で0を指定する)
wordcount2の削除 | wordcount3の削除 | 備考 |
---|---|---|
public class HBaseWordCountDelete extends Configured implements Tool { |
||
public static final String OUT_TABLE = "wordcount2"; |
public static final String OUT_TABLE = "wordcount3"; public static final byte[] OUT_FAMILY = Bytes.toBytes("CNT"); |
|
protected static final String DELETE_TSTAMP = "delete_timestamp"; public static void main(String[] args) throws Exception { int r = ToolRunner.run(new HBaseWordCountDelete(), args); System.exit(r); } @Override public int run(String[] args) throws Exception { byte[] tableName = Bytes.toBytes(args[0]); //入力テーブル名 Job job = new Job(getConf(), "wordcount(HBase) delete"); job.setJarByClass(getClass()); job.getConfiguration().setLong(DELETE_TSTAMP, System.currentTimeMillis()); //削除する日時 |
削除用のタイムスタンプを コンフィグで保持しておく。 |
|
Filter filter = new PrefixFilter(tableName); Scan scan = new Scan(tableName, filter); |
Scan scan = new Scan(); scan.addColumn(OUT_FAMILY, tableName); |
削除対象データの抽出条件 の指定 |
TableMapReduceUtil.initTableMapperJob( OUT_TABLE, scan, DeleteMap.class, NullWritable.class, Delete.class, job ); job.setNumReduceTasks(0); //Reducerは無し job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, OUT_TABLE); return job.waitForCompletion(true) ? 0 : 1; } |
||
static class DeleteMap extends TableMapper<NullWritable, Delete> { protected long deleteTimestamp; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); deleteTimestamp = context.getConfiguration().getLong(DELETE_TSTAMP, HConstants.LATEST_TIMESTAMP); } |
削除用のタイムスタンプを コンフィグから取得しておく。 |
|
@Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { byte[] row = value.getRow(); Delete del = new Delete(row, deleteTimestamp, null); context.write(NullWritable.get(), del); } |
protected byte[] tableName; @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { if (tableName == null) { KeyValue kv = value.raw()[0]; tableName = kv.getQualifier(); } byte[] row = value.getRow(); Delete del = new Delete(row, deleteTimestamp, null); del.deleteColumn(OUT_FAMILY, tableName, deleteTimestamp); context.write(NullWritable.get(), del); } |
|
} } |
いずれにしてもScanで一旦クライアント側に全データを戻して、そのキーに対してDeleteを発行するので、けっこう無駄は多い気がする。
本当は、Deleteに削除条件を指定してHBaseのサーバー側で全部処理してくれると一番速いと思うのだが。
(ScanでFilterを指定するのと同様に、DeleteにFilterを設定してサーバー側で条件判定を行いつつ削除すれば、クライアントは無関係になる)
列ファミリー内の全カラムを削除することに関しては、
テーブル定義の変更として列ファミリーを削除することは出来るので、一旦削除して追加し直すことによってデータをクリアするという手も考えられる。
でもこの方法の場合、テーブルを一旦停止(disable)しないといけないんだよねぇ。
あるいはバルクロードで削除するとか…出来ないかなぁ。