S-JIS[2010-07-08] 変更履歴

HTablePool

HBaseJava APIHTablePoolクラスは、HTableインスタンスをプール(キャッシュ)するもの。
HTable自身は書き込みに関してスレッドセーフ(MTセーフ)ではない(HTableクラスのJavadocに書いてある)ので、インスタンスを複数スレッドで使い回すことは出来ない。
HTablePoolのJavadocにはスレッドセーフであるとか書かれていないが、使用目的と実装内容からして、スレッドセーフであると思われる。


コーディング例(HBase0.20)

testテーブルに対し、10個のスレッドで100件ずつのデータをputする例。

import java.io.IOException;
import java.util.Date;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
public class HTablePoolSample {

	public static void main(String[] args) {
		HBaseConfiguration conf = new HBaseConfiguration();
		HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE);
// confがデフォルトでいい場合は、HTablePool pool = new HTablePool();でよい

		// スレッドで実行
		Thread[] ts = new Thread[10];
		int start = 0;
		for (int i = 0; i < ts.length; i++) {
			int end = start + 100;
			ts[i] = new SampleThread(pool, "test", start, end);

			ts[i].start();

			start = end;
		}

		// スレッドの終了待ち
		for (Thread t : ts) {
			try {
				t.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	static class SampleThread extends Thread {
		private HTablePool pool;
		private String tableName;
		private int start;
		private int end;

		public SampleThread(HTablePool pool, String tableName, int start, int end) {
			this.pool = pool;
			this.tableName = tableName;
			this.start = start;
			this.end = end;
		}

		@Override
		public void run() {
			HTable table = pool.getTable(tableName);
			try {
				for (int i = start; i < end; i++) {
					Put p = new Put(Bytes.toBytes("key" + i));
					p.add(	Bytes.toBytes("data1"),
						Bytes.toBytes("pool"),
						Bytes.toBytes("poolSample " + new Date()));
					table.put(p);
				}
				table.flushCommits();
			} catch (IOException e) {
				throw new RuntimeException(e);
			} finally {
				pool.putTable(table);
			}
		}
	}
}

HTablePoolから、getTable()でHTableインスタンスを取得する。
使い終わったら、putTable()でインスタンスをプールに戻す。

本来であれば、一番最後(プログラムの終了時)でプール内に格納されているHTableをclose()した方がいいと思うのだが、HBase0.20のHTablePoolにはそういった機能は無い。

仮にHTable以外のインスタンス(TransactionalTableとか)を使いたい場合、HTablePool#newHTable()をオーバーライドした新しいクラスを作り、個別のHTableインスタンスを生成するようにする。


コーディング例(HBase0.89)

testテーブルに対し、10個のスレッドで100件ずつのデータをputする例。

import java.io.IOException;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
public class HTablePoolSample {

	public static void main(String[] args) {
		Configuration conf = HBaseConfiguration.create();
		HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE);
// confがデフォルトでいい場合は、HTablePool pool = new HTablePool();でよい

		String tableName = "test";

		try {
			// スレッドの実行
			Thread[] ts = new Thread[10];
			int start = 0;
			for (int i = 0; i < ts.length; i++) {
				int end = start + 100;
				ts[i] = new SampleThread(pool, tableName, start, end);

				ts[i].start();

				start = end;
			}

			// スレッドの終了待ち
			for (Thread t : ts) {
				try {
					t.join();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		} finally {
			pool.closeTablePool(tableName);
		}
	}
	static class SampleThread extends Thread {
		private HTablePool pool;
		private String tableName;
		private int start;
		private int end;

		public SampleThread(HTablePool pool, String tableName, int start, int end) {
			this.pool = pool;
			this.tableName = tableName;
			this.start = start;
			this.end = end;
		}

		@Override
		public void run() {
			HTableInterface table = pool.getTable(tableName);
			try {
				for (int i = start; i < end; i++) {
					Put p = new Put(Bytes.toBytes("key" + i));
					p.add(	Bytes.toBytes("data1"),
						Bytes.toBytes("pool"),
						Bytes.toBytes("poolSample " + new Date()));
					table.put(p);
				}
				table.flushCommits();
			} catch (IOException e) {
				throw new RuntimeException(e);
			} finally {
				pool.putTable(table);
			}
		}
	}
}

HTablePoolに渡すconfがConfigurationクラスになっている。
また、getTable()はHTableを返すのではなく、HTableInterfaceを返すようになっている。
(いずれもHBase0.89の全般的な変更)

HBase0.20と0.89の大きな違いは、プールを使い終わった後にHTableをクローズするcloseTablePool()が加わったこと。
ただしプール内で保持しているインスタンスをクローズするだけなので、get中のインスタンスは対象外。
したがって、closeTablePool()を呼ぶタイミングは、全インスタンスが回収された後でないといけないだろう。

HTablePoolのコンストラクターにHTableInterfaceFactoryが渡せるようにもなっているので、HTable以外のインスタンス(TransactionalTableとか)を使いたい場合は、それを返すFactoryを用意し、HTablePoolに渡す。

		HTableInterfaceFactory factory = new HTableFactory() {
			@Override
			public HTableInterface createHTableInterface(Configuration config, byte[] tableName) {
				try {
					return new HTableインスタンス生成;
				} catch (IOException e) {
					throw new RuntimeException(e);
				}
			}
		};
		HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE, factory);

HTablePoolの実装

HBase0.89.20100621のHTablePoolがどういう実装になっているかをちょっと見てみた。

ソース 感想
private final Map<String, LinkedList<HTableInterface>> tables =
Collections.synchronizedMap(new HashMap<String, LinkedList<HTableInterface>>());
HTableインスタンスをプールする為のマップ。
Collections.synchronizedMap()を使っているが、HBaseはJDK1.6以降なんだし、ConcurrentHashMapでいいんじゃないかな。
あと、型引数にLinkedListという具象クラス名をそのまま使うかねぇ。

private final Map<String, Queue<HTableInterface>> tables = new ConcurrentHashMap<String, Queue<HTableInterface>>();
private HTableInterfaceFactory tableFactory = new HTableFactory();
 
factoryを外部から受け渡すコンストラクターを使う場合、この初期化は無駄なのではないかと思う。
public HTableInterface getTable(String tableName) {
  LinkedList<HTableInterface> queue = tables.get(tableName);
  if(queue == null) {
    queue = new LinkedList<HTableInterface>();
    tables.put(tableName, queue);
    return createHTable(tableName);
  }
 
getTable()が初めて呼ばれる際に複数スレッドから同時に呼ばれた場合、
tablesに別々にLinkedListインスタンスを入れることになるので、競合しそうに見える。
が、生成したLinkedListの中に何かを入れているわけではないので、
他のスレッドで作ったLinkedListインスタンスが無駄になるだけで、それ以外の害は無い。
あと、LinkedListでなくConcurrentLinkedQueueを使えば、synchronizedする必要が無い…かな?
(削除時の同期化まで考えると、結局synchronizedが要るかも)
個人的にはLinkedListはArrayListよりメモリーを喰うから好きではないので、
synchronized有りのQueueで使うならArrayDequeの方がいいかも?
public void putTable(HTableInterface table) {
  LinkedList<HTableInterface> queue = tables.get(Bytes.toString(table.getTableName()));
  synchronized(queue) {
    if(queue.size() >= maxSize) return;
    queue.add(table);
  }
使わなくなったHTableインスタンスをキューに戻す処理だが。
キューの最大サイズ(maxSize)を超えた場合、何もせずにreturnしている。
しかしtable.close()くらいは呼んだ方がいいんじゃないかと思う。
まぁ、実際には最大サイズを超えることは無い気もするし^^;、
クローズもflushCommits()を呼んでいるだけなので、明示的にフラッシュしていれば特に何もする必要は無いし。
んー、それともフラッシュ(コミット)だから、あえて呼んでいないのかな?DBに反映させない為に…。
(自動コミットがオンだったら、いずれにしても適当な時点でコミットされる?)
public void closeTablePool(final String tableName) { closeTablePool()は、テーブル名を渡すようになっている。
しかし最後にまとめてclose()することを考えるなら、
テーブル名に関わらず、保持している全てのHTableをクローズするメソッドがあった方が便利じゃないかと思う。
同期化することを考えると面倒かもしれないが。
少なくとも保持しているテーブル名一覧を取得するメソッドくらいはあってもいいと思う。

しかしまぁ、プールのget/putが呼ばれる頻度はそんなに高くはならないだろうから、あまり同期化の部分のスピードにはこだわらなくてもいいのかも(苦笑)


Java APIへ戻る / HBaseへ戻る / HBase変更点へ / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま