S-JIS[2010-08-15] 変更履歴

HBaseのWordCountサンプル

HBaseのテーブルでHadoopのWordCountを作ってみる実験。


出力テーブルの構成

最初に、指定されたHBaseのテーブル内の全カラムを読み込んで単語数をカウントし、結果をファイルへ出力するWordCountを作ってみた。
(→HBaseテーブルtoファイルのWordCount

次に、カウント結果をファイルでなくHBaseのテーブルに出力してみたいと思う。
ここで問題となるのが、出力テーブルをどういうレイアウト(項目定義)にすればよいか。


RDBならどういうテーブル設計になるか

RDBであれば、以下のようなテーブルレイアウトになると思う。

項目名 データ型 制約
単語 varchar2(100) primary key
件数 number(9) not null

『単語』の最大文字数は何とも言えないので、とりあえず100文字にしておこうか。
この辺り、KVSでは特に決める必要が無いので楽だ。(バイナリーで何でも入れられるから、事前に枠を決めておく必要が無い)

『件数』は数値なので、これも桁数を決めておく必要がある。
KVSでも数値として扱うならintかlongかくらいは決めないといけないが、Hadoop本家本元のWordCountのサンプルではintを使っている。


さて、一応これで良さそうだが、このレイアウトでは、入力テーブル別に単語・件数を保持することが出来ない。
なので『入力テーブル名』(単語を収集した元のテーブル)もプライマリキーに加えてやろう。

項目名 データ型 制約
入力テーブル名 varchar2(32) primary key
単語 varchar2(100)
件数 number(9) not null

そして、RDBとしては、正規化してテーブル名を保持するテーブルを分けるのが正統な感じがする。

table1r
項目名 データ型 制約
テーブルID number(4) primary key
テーブル名 varchar2(32) unique
wordcount1r
項目名 データ型 制約
入力テーブルID number(4) primary key
単語 varchar2(100)
件数 number(9) not null
create table table1r
(
	TABLE_ID  	number(4) primary key,
	TABLE_NAME	varchar2(32) unique
);
create table wordcount1r
(
	TABLE_ID 	number(4),
	WORD    	varchar2(100),
	CNT	number(9) not null,
	primary key(TABLE_ID, WORD)
);

HBaseのテーブル設計

このRDBのテーブル設計を、HBaseのテーブルにそのまま流用することが出来そうだ。

HBaseでは行キー(ROW)が各レコードのプライマリキーとなるので、上記のRDBテーブルのプライマリキーである2項目をつないで1つにして行キーとして使う。
『テーブルID』だったら桁数が固定なので、キーの一部に使っても問題は起きない(区別が付く)。

table1
行キー カラム
テーブルID(4バイト) テーブル名
wordcount1
行キー カラム
テーブルID(4バイト) + 単語 件数
create 'table1', 'TABLE_NAME'
create 'wordcount1', 'CNT'

しかしKVSはRDBでいう結合(join)が苦手なので(というかHBaseではサポートされていないので)、『テーブル名』くらいならデータ本体と一緒に持たせてしまった方が良さそう。
(結果を表示する際はテーブル名を使いたい)

ただし「テーブル名 + 単語」では『テーブル名』も『単語』も可変長だから、つないだ文字列が一意に定まるとは限らず、キーには向かない。
絶対に使われない値を決めて、それを区切り文字として使えばいいかな。
WordCountの場合はスペース(0x20)でいい気がする。そもそも単語は空白区切りで分割・認識するからスペースを含まないはずだし、テーブル名にスペースを使うのを禁止しても害は少ないだろう。

wordcount2
行キー カラム
テーブル名 + 区切り文字 + 単語 件数
create 'wordcount2', 'CNT'

ここでちょっと発想の転換。
HBaseはカラムファミリーがあって、その下ではqualifierが追加し放題。したがって、qualifierに『テーブル名』を使えばいいんじゃないか。

wordcount3
行キー family『件数』
qualifier『テーブル名』
単語 件数

ん〜、HBaseのテーブルレイアウト(テーブル定義・テーブル設計書)って、どう書けばいい(分かりやすい)んだ(苦笑)

create 'wordcount3', 'CNT'

これはRDBで言えば『テーブル名』を含んだカラム(列)を持つということで、RDBだったら『テーブル名』が増える度に列追加をする必要があるから全然現実的ではないが、カラムファミリータイプのKVS(HBase)では問題無いだろう。

wordcount3r
項目名 データ型 制約
単語 varchar2(100) primary key
テーブル0001_件数 number(9)  
テーブル0002_件数 number(9)  
テーブル0003_件数 number(9)  
   

しかしこの場合、HBase Sehllだと、“特定テーブルの単語一覧”という表示の仕方がちょっと面倒。(getでは行キーを指定しないといけないから使えない。scanなら可)
また、再実行しようと思ったら一旦該当テーブルのデータだけ削除したいが、“全行の特定カラム”という消し方も面倒。(deleteallでは行キーを指定する)


そこでまた逆転の発想で、『テーブル名』を行キーにして、『単語』をカラムにする。
こうすればHBase Sehllgetdeleteallで『テーブル名』を簡単に指定できる。

wordcount4
行キー family『件数』
qualifier『単語』
テーブル名 件数
create 'wordcount4', 'CNT'

しかしこうなると、qualifierの個数(RDBで言う列数)がかなりの量になると予想され、RDB脳にはなんだか不安な設計だ(苦笑)
でもHBaseのトップページのOverviewには「very large tables -- billions of rows X millions of columns --」(十億行×百万カラム)と書かれており、単語数で使う程度なら特に問題なさそう。(by @miyakawa_takuさん)


テーブルの使用例

これでテーブル設計の案が出揃ったので、実際に使うとしたらどうなるかシミュレートしてみる。

public static final byte[] TABLE_ID   = Bytes.toBytes("TABLE_ID");	//テーブルID
public static final byte[] TABLE_NAME = Bytes.toBytes("TABLE_NAME");	//テーブル名
public static final byte[] CNT        = Bytes.toBytes("CNT");		//件数

protected byte[] tableName = Bytes.toBytes("入力テーブル名");
テーブル案 表示方法 初期化(削除)方法 備考
wordcount1r
select WORD, CNT
from wordcount1r w, table1r t
where t.TABLE_NAME = '入力テーブル名'
and   w.TABLE_ID = t.TABLE_ID
;
delete from wordcount1r
where TABLE_ID = (
 select TABLE_ID
 from table1r
 where TABLE_NAME = '入力テーブル名'
);
 
wordcount1
(正規化)
(行キー:テーブルID+単語)
//テーブルIDを取得
byte[] tableId = null;
{
  Filter filter = new SingleColumnValueFilter(
	TABLE_NAME,
	HConstants.EMPTY_BYTE_ARRAY,
	CompareOp.EQUAL,
	tableName
  );
  Scan scan = new Scan(HConstants.EMPTY_START_ROW, filter);
  scan.addColumn(TABLE_ID, HConstants.EMPTY_BYTE_ARRAY);

  ResultScanner rs = table1.getScanner(scan);
  try {
    for (Result r : rs) {
      for (KeyValue kv : r.raw()) {
        tableId = kv.getValue();
      }
      break;
    }
  } finally {
    rs.close();
  }
}
Filter filter = new PrefixFilter(tableId);
Scan scan = new Scan(tableId, filter);
ResultScanner rs = table.getScanner(scan);
try {
  for (Result r : rs) {
    byte[] row = r.getRow();

    //テーブルIDは4桁であり、単語はそれ以降
    String word = Bytes.toString(row, 4, row.length - 4);

    for (KeyValue kv : r.raw()) {
      int cnt = Bytes.toInt(
        kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()
      );
      System.out.printf("%s\t%d%n", word, cnt);
    }
  }
} finally {
  rs.close();
}
//テーブルIDを取得

同左
Filter filter = new PrefixFilter(tableId);
Scan scan = new Scan(tableId, filter);
ResultScanner rs = table.getScanner(scan);
try {
  for (Result r : rs) {
    byte[] row = r.getRow();
    Delete del = new Delete(row);
    table.delete(del);
  }
} finally {
  rs.close();
}
table.flushCommits();
テーブルIDを取り出すだけでも面倒だが(苦笑)
削除もまた面倒。
本気で使うなら、きっと
テーブル名でインデックスを作ることになる。
wordcount2
(行キー:テーブル名+単語)
Filter filter = new PrefixFilter(tableName);
Scan scan = new Scan(tableName, filter);
ResultScanner rs = table.getScanner(scan);
try {
  for (Result r : rs) {
    String row = Bytes.toString(r.getRow());
    String word = row.substring(row.indexOf(' ') + 1);

    for (KeyValue kv : r.raw()) {
      int cnt = Bytes.toInt(
        kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()
      );
      System.out.printf("%s\t%d%n", word, cnt);
    }
  }
} finally {
  rs.close();
}
Filter filter = new PrefixFilter(tableName);
Scan scan = new Scan(tableName, filter);
ResultScanner rs = table.getScanner(scan);
try {
  for (Result r : rs) {
    byte[] row = r.getRow();
    Delete del = new Delete(row);
    table.delete(del);
  }
} finally {
  rs.close();
}
table.flushCommits();

 

 
wordcount3r
select WORD, テーブル0001_件数
from wordcount3r
where テーブル0001_件数 is not null
;
update wordcount3r
set テーブル0001_件数 = null
;
どのテーブルがどの項目かを
動的に管理するのは無理がある。
wordcount3
(行キー:単語)
Scan scan = new Scan();
scan.addColumn(CNT, tableName);
ResultScanner rs = table.getScanner(scan);
try {
  for (Result r : rs) {
    String word = Bytes.toString(r.getRow());

    for (KeyValue kv : r.raw()) {
      int cnt = Bytes.toInt(
        kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()
      );
      System.out.printf("%s\t%d%n", word, cnt);
    }
  }
} finally {
  rs.close();
}
Scan scan = new Scan();
scan.addColumn(CNT, tableName);
ResultScanner rs = table.getScanner(scan);
try {
  for (Result r : rs) {
    Delete del = new Delete(r.getRow());
    del.deleteColumn(CNT, tableName);
    table.delete(del);
  }
} finally {
  rs.close();
}
table.flushCommits();
 
scan 'wordcount3', { COLUMN=>'CNT:入力テーブル名' }    
wordcount4
(行キー:テーブル名)
Get get = new Get(tableName);
Result r = table.get(get);
for (KeyValue kv : r.raw()) {
  String word = Bytes.toString(
    kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()
  );
  int cnt = Bytes.toInt(
    kv.getBuffer(), kv.getValueOffset(),     kv.getValueLength()
  );
  System.out.printf("%s\t%d%n", word, cnt);
}
Delete del = new Delete(tableName);
table.delete(del);
table.flushCommits();
 
get 'wordcount4','入力テーブル名' deleteall 'wordcount4','入力テーブル名'  

wordcount1

wordcount1(行キーにテーブルIDを含める方式)に出力するHadoopプログラムは割愛。

とにかく面倒そうなのが、テーブルIDを採番する方法。(そのテーブル名のIDが存在していればそれを使うし、無ければ新しい番号を登録する。この際に排他を考慮する必要あり)
まぁHadoopを使うということはバッチ処理なので、“他処理が同時に動いてテーブルIDを採番する可能性”が無いという前提であれば、それほど難しいことは無いが。
あるいはマスターに事前に登録しておく前提で、IDが取得できなかったら異常終了させる。

テーブルIDさえ採番できてしまえば、wordcount2のプログラムでテーブル名を使う代わりにテーブルIDを使えばいい。


wordcount2

wordcount2(行キー:入力テーブル名+スペース+単語)に出力する例。

public class HBaseWordCount2 extends Configured implements Tool {

	public static final String OUT_TABLE  = "wordcount2";
	public static final byte[] OUT_FAMILY = Bytes.toBytes("CNT");
	public static final char DELIM = ' ';
	public static void main(String[] args) throws Exception {
		int r = ToolRunner.run(new HBaseWordCount2(), args);
		System.exit(r);
	}

	@Override
	public int run(String[] args) throws Exception {
		String tableName = args[0]; //入力テーブル名
		clearTable(Bytes.toBytes(tableName));

		Job job = new Job(getConf(), "wordcount2(HBase)");
		job.setJarByClass(getClass());

		TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), Map.class, Text.class, IntWritable.class, job);
		TableMapReduceUtil.initTableReducerJob(OUT_TABLE, Reduce.class, job, HRegionPartitioner.class);

		return job.waitForCompletion(true) ? 0 : 1;
	}
//wordcount2の既存データを削除する(→MapReduceで削除する例)

	void clearTable(byte[] tableName) throws IOException {
		Configuration conf = HBaseConfiguration.create(getConf());
		HTableFactory factory = new HTableFactory();
		HTableInterface table = factory.createHTableInterface(conf, Bytes.toBytes(OUT_TABLE));
		try {
			Filter filter = new PrefixFilter(tableName);
			Scan scan = new Scan(tableName, filter);
			ResultScanner rs = table.getScanner(scan);
			try {
				for (Result r : rs) {
					byte[] row = r.getRow();
					Delete del = new Delete(row, System.currentTimeMillis(), null);
					table.delete(del);
				}
			} finally {
				rs.close();
			}
		} finally {
			factory.releaseHTableInterface(table);
		}
	}
//一番最初に作ったWordCountMapと同じ

	static class Map extends TableMapper<Text, IntWritable> {
		private static final IntWritable one = new IntWritable(1);
		private Text word = new Text();

		@Override
		protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
			for (KeyValue kv : value.raw()) {
				String s = Bytes.toString(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());

				StringTokenizer tokenizer = new StringTokenizer(s);
				while (tokenizer.hasMoreTokens()) {
					word.set(tokenizer.nextToken());
					context.write(word, one);
				}
			}
		}
	}
	static class Reduce extends TableReducer<Text, IntWritable, NullWritable> {

		protected String tableName;

		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			super.setup(context);

			tableName = context.getConfiguration().get(TableInputFormat.INPUT_TABLE); //入力テーブル名
		}

		@Override
		protected void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable value : values) {
				sum += value.get();
			}

			byte[] row = Bytes.toBytes(tableName + DELIM + word);

			Put put = new Put(row);
			put.add(OUT_FAMILY, HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes(sum));
			context.write(NullWritable.get(), put);
		}
	}
}

wordcount3・wordcount4

wordcount3(行キー:単語)とwordcount4(行キー:テーブル名)に出力する例。
wordcount3とwordcount4は似ているので、MultiTableOutputFormatのサンプルを兼ねて、まとめて出力してみた。

public class HBaseWordCount34 extends Configured implements Tool {

	public static final String OUT_TABLE3 = "wordcount3";
	public static final String OUT_TABLE4 = "wordcount4";
	public static final byte[] OUT_FAMILY = Bytes.toBytes("CNT");
	public static void main(String[] args) throws Exception {
		int r = ToolRunner.run(new HBaseWordCount34(), args);
		System.exit(r);
	}

	@Override
	public int run(String[] args) throws Exception {
		String tableName = args[0];	//入力テーブル名
		{
			HTableFactory factory = new HTableFactory();
			Configuration conf = HBaseConfiguration.create(getConf());
			byte[] tableNameBytes = Bytes.toBytes(tableName);
			clearTable3(factory, conf, tableNameBytes);
			clearTable4(factory, conf, tableNameBytes);
		}

		Job job = new Job(getConf(), "wordcount34(HBase)");
		job.setJarByClass(getClass());

		TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), Map.class, Text.class, IntWritable.class, job);

		job.setOutputFormatClass(MultiTableOutputFormat.class);
		job.setReducerClass(Reduce.class);
		job.setOutputKeyClass(ImmutableBytesWritable.class);
		job.setOutputValueClass(Put.class);
//×		job.setPartitionerClass(HRegionPartitioner.class); //MultiTableOutputFormatの場合はHRegionPartitionerは指定不可

		return job.waitForCompletion(true) ? 0 : 1;
	}
//wordcount3の既存データを削除する(→MapReduceで削除する例)

	void clearTable3(HTableFactory factory, Configuration conf, byte[] tableName) throws IOException {
		HTableInterface table = factory.createHTableInterface(conf, Bytes.toBytes(OUT_TABLE3));
		try {
			Scan scan = new Scan();
			scan.addColumn(OUT_FAMILY, tableName);
			ResultScanner rs = table.getScanner(scan);
			long ts = System.currentTimeMillis();
			try {
				for (Result r : rs) {
					Delete del = new Delete(r.getRow(), ts, null);
					del.deleteColumn(OUT_FAMILY, tableName, ts);
					table.delete(del);
				}
			} finally {
				rs.close();
			}
		} finally {
			factory.releaseHTableInterface(table);
		}
	}


//wordcount4の既存データを削除する

	void clearTable4(HTableFactory factory, Configuration conf, byte[] tableName) throws IOException {
		HTableInterface table = factory.createHTableInterface(conf, Bytes.toBytes(OUT_TABLE4));
		try {
			Delete del = new Delete(tableName, System.currentTimeMillis(), null);
			table.delete(del);
		} finally {
			factory.releaseHTableInterface(table);
		}
	}
//一番最初に作ったWordCountMapと同じ

	static class Map extends TableMapper<Text, IntWritable> {
		private static final IntWritable one = new IntWritable(1);
		private Text word = new Text();

		@Override
		protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
			for (KeyValue kv : value.raw()) {
				String s = Bytes.toString(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());

				StringTokenizer tokenizer = new StringTokenizer(s);
				while (tokenizer.hasMoreTokens()) {
					word.set(tokenizer.nextToken());
					context.write(word, one);
				}
			}
		}
	}
	static class Reduce extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
		private static final ImmutableBytesWritable keyOut3 = new ImmutableBytesWritable(Bytes.toBytes(OUT_TABLE3));
		private static final ImmutableBytesWritable keyOut4 = new ImmutableBytesWritable(Bytes.toBytes(OUT_TABLE4));

		protected byte[] tableName;

		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			super.setup(context);

			tableName = Bytes.toBytes(context.getConfiguration().get(TableInputFormat.INPUT_TABLE));	//入力テーブル名
		}

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable intw : values) {
				sum += intw.get();
			}
			byte[] value = Bytes.toBytes(sum);

			byte[] word  = Bytes.toBytes(key.toString());

			{
				Put put = new Put(word);
				put.add(OUT_FAMILY, tableName, value);
				context.write(keyOut3, put);
			}
			{
				Put put = new Put(tableName);
				put.add(OUT_FAMILY, word, value);
				context.write(keyOut4, put);
			}
		}
	}
}

データクリア(カラム削除)のMapReduce

上記のサンプルでは、wordcount2wordcount3のテーブルのデータ削除(clearTable()clearTable3())に何気なくScanとDeleteを組み合わせて使ったけれど、もしかしてこういうパターンでもMapReduceを使う方がいいのかも。
TableOutputFormatMultiTableOutputFormatはDeleteにも対応しているし。
Scanで取ってきたキーに対してDeleteするなら、Scanを分散させた方が全体の処理速度は速いはず。

しかも単語カウントの前処理としてデータ削除を行うのはやめて、
削除ジョブのDelete(のコンストラクターやdeleteColumn()・deleteFamily())で現在時刻を指定しておけば、単語を数えるプログラム(データ作成ジョブ)と並行で動かしても大丈夫かも?
データ作成ジョブを削除ジョブより後で開始させれば、作成時のタイムスタンプは必ず削除日時より新しくなるから。

Deleteでタイムスタンプを指定しない場合、その時点の最新データが削除されるが、
Deleteでタイムスタンプを指定すると、HBaseはデータ内部で履歴を保持しているので、その時点以前のデータだけ消える扱いになる。
データ作成ジョブと並行で動かしても、常に削除のタイムスタンプの方が古ければ、あるカラムに対して削除処理と作成処理のどちらが先に動いても、最終的には作成されたデータだけが残る。

ちなみにDeleteの場合は集計は不要なのでReducerは要らず、Mapperだけでいい。(setNumReduceTasks()で0を指定する)

wordcount2の削除 wordcount3の削除 備考
public class HBaseWordCountDelete extends Configured implements Tool {
 
public static final String OUT_TABLE = "wordcount2";
public static final String OUT_TABLE = "wordcount3";
public static final byte[] OUT_FAMILY = Bytes.toBytes("CNT");
 
	protected static final String DELETE_TSTAMP = "delete_timestamp";

	public static void main(String[] args) throws Exception {
		int r = ToolRunner.run(new HBaseWordCountDelete(), args);
		System.exit(r);
	}

	@Override
	public int run(String[] args) throws Exception {
		byte[] tableName = Bytes.toBytes(args[0]); //入力テーブル名

		Job job = new Job(getConf(), "wordcount(HBase) delete");
		job.setJarByClass(getClass());

		job.getConfiguration().setLong(DELETE_TSTAMP, System.currentTimeMillis()); //削除する日時
削除用のタイムスタンプを
コンフィグで保持しておく。
Filter filter = new PrefixFilter(tableName);
Scan scan = new Scan(tableName, filter);
Scan scan = new Scan();
scan.addColumn(OUT_FAMILY, tableName);
削除対象データの抽出条件
の指定
		TableMapReduceUtil.initTableMapperJob(
			OUT_TABLE, scan,
			DeleteMap.class, NullWritable.class, Delete.class,
			job
		);

		job.setNumReduceTasks(0); //Reducerは無し

		job.setOutputFormatClass(TableOutputFormat.class);
		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, OUT_TABLE);

		return job.waitForCompletion(true) ? 0 : 1;
	}
 
	static class DeleteMap extends TableMapper<NullWritable, Delete> {

	protected long deleteTimestamp;

	@Override
	protected void setup(Context context) throws IOException, InterruptedException {
		super.setup(context);

		deleteTimestamp = context.getConfiguration().getLong(DELETE_TSTAMP, HConstants.LATEST_TIMESTAMP);
	}
削除用のタイムスタンプを
コンフィグから取得しておく。


@Override
protected void map(ImmutableBytesWritable key, Result value,
  Context context) throws IOException, InterruptedException {




	byte[] row = value.getRow();
	Delete del = new Delete(row, deleteTimestamp, null);

	context.write(NullWritable.get(), del);
}
protected byte[] tableName;

@Override
protected void map(ImmutableBytesWritable key, Result value,
  Context context) throws IOException, InterruptedException {
	if (tableName == null) {
		KeyValue kv = value.raw()[0];
		tableName = kv.getQualifier();
	}
	byte[] row = value.getRow();
	Delete del = new Delete(row, deleteTimestamp, null);
	del.deleteColumn(OUT_FAMILY, tableName, deleteTimestamp);
	context.write(NullWritable.get(), del);
}
 
	}
}
 

いずれにしてもScanで一旦クライアント側に全データを戻して、そのキーに対してDeleteを発行するので、けっこう無駄は多い気がする。

本当は、Deleteに削除条件を指定してHBaseのサーバー側で全部処理してくれると一番速いと思うのだが。
ScanFilterを指定するのと同様に、DeleteにFilterを設定してサーバー側で条件判定を行いつつ削除すれば、クライアントは無関係になる)

列ファミリー内の全カラムを削除することに関しては、
テーブル定義の変更として列ファミリーを削除することは出来るので、一旦削除して追加し直すことによってデータをクリアするという手も考えられる。
でもこの方法の場合、テーブルを一旦停止(disable)しないといけないんだよねぇ。

あるいはバルクロードで削除するとか…出来ないかなぁ。


Hadoop関連クラスへ戻る / Java APIへ戻る / HBaseへ戻る / Cassandraへ行く / 技術メモへ戻る
メールの送信先:ひしだま