HBaseのJava APIのHTablePoolクラスは、HTableインスタンスをプール(キャッシュ)するもの。
HTable自身は書き込みに関してスレッドセーフ(MTセーフ)ではない(HTableクラスのJavadocに書いてある)ので、インスタンスを複数スレッドで使い回すことは出来ない。
HTablePoolのJavadocにはスレッドセーフであるとか書かれていないが、使用目的と実装内容からして、スレッドセーフであると思われる。
|
testテーブルに対し、10個のスレッドで100件ずつのデータをputする例。
import java.io.IOException; import java.util.Date; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes;
public class HTablePoolSample { public static void main(String[] args) { HBaseConfiguration conf = new HBaseConfiguration(); HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE); // confがデフォルトでいい場合は、HTablePool pool = new HTablePool();でよい // スレッドで実行 Thread[] ts = new Thread[10]; int start = 0; for (int i = 0; i < ts.length; i++) { int end = start + 100; ts[i] = new SampleThread(pool, "test", start, end); ts[i].start(); start = end; } // スレッドの終了待ち for (Thread t : ts) { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
static class SampleThread extends Thread { private HTablePool pool; private String tableName; private int start; private int end; public SampleThread(HTablePool pool, String tableName, int start, int end) { this.pool = pool; this.tableName = tableName; this.start = start; this.end = end; } @Override public void run() { HTable table = pool.getTable(tableName); try { for (int i = start; i < end; i++) { Put p = new Put(Bytes.toBytes("key" + i)); p.add( Bytes.toBytes("data1"), Bytes.toBytes("pool"), Bytes.toBytes("poolSample " + new Date())); table.put(p); } table.flushCommits(); } catch (IOException e) { throw new RuntimeException(e); } finally { pool.putTable(table); } } } }
HTablePoolから、getTable()でHTableインスタンスを取得する。
使い終わったら、putTable()でインスタンスをプールに戻す。
本来であれば、一番最後(プログラムの終了時)でプール内に格納されているHTableをclose()した方がいいと思うのだが、HBase0.20のHTablePoolにはそういった機能は無い。
仮にHTable以外のインスタンス(TransactionalTableとか)を使いたい場合、HTablePool#newHTable()をオーバーライドした新しいクラスを作り、個別のHTableインスタンスを生成するようにする。
testテーブルに対し、10個のスレッドで100件ずつのデータをputする例。
import java.io.IOException; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes;
public class HTablePoolSample { public static void main(String[] args) { Configuration conf = HBaseConfiguration.create(); HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE); // confがデフォルトでいい場合は、HTablePool pool = new HTablePool();でよい String tableName = "test"; try { // スレッドの実行 Thread[] ts = new Thread[10]; int start = 0; for (int i = 0; i < ts.length; i++) { int end = start + 100; ts[i] = new SampleThread(pool, tableName, start, end); ts[i].start(); start = end; } // スレッドの終了待ち for (Thread t : ts) { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } } finally { pool.closeTablePool(tableName); } }
static class SampleThread extends Thread { private HTablePool pool; private String tableName; private int start; private int end; public SampleThread(HTablePool pool, String tableName, int start, int end) { this.pool = pool; this.tableName = tableName; this.start = start; this.end = end; } @Override public void run() { HTableInterface table = pool.getTable(tableName); try { for (int i = start; i < end; i++) { Put p = new Put(Bytes.toBytes("key" + i)); p.add( Bytes.toBytes("data1"), Bytes.toBytes("pool"), Bytes.toBytes("poolSample " + new Date())); table.put(p); } table.flushCommits(); } catch (IOException e) { throw new RuntimeException(e); } finally { pool.putTable(table); } } } }
HTablePoolに渡すconfがConfigurationクラスになっている。
また、getTable()はHTableを返すのではなく、HTableInterfaceを返すようになっている。
(いずれもHBase0.89の全般的な変更)
HBase0.20と0.89の大きな違いは、プールを使い終わった後にHTableをクローズするcloseTablePool()が加わったこと。
ただしプール内で保持しているインスタンスをクローズするだけなので、get中のインスタンスは対象外。
したがって、closeTablePool()を呼ぶタイミングは、全インスタンスが回収された後でないといけないだろう。
HTablePoolのコンストラクターにHTableInterfaceFactoryが渡せるようにもなっているので、HTable以外のインスタンス(TransactionalTableとか)を使いたい場合は、それを返すFactoryを用意し、HTablePoolに渡す。
HTableInterfaceFactory factory = new HTableFactory() { @Override public HTableInterface createHTableInterface(Configuration config, byte[] tableName) { try { return new HTableインスタンス生成; } catch (IOException e) { throw new RuntimeException(e); } } }; HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE, factory);
HBase0.89.20100621のHTablePoolがどういう実装になっているかをちょっと見てみた。
ソース | 感想 |
---|---|
private final Map<String, LinkedList<HTableInterface>> tables = |
HTableインスタンスをプールする為のマップ。 Collections.synchronizedMap()を使っているが、HBaseはJDK1.6以降なんだし、ConcurrentHashMapでいいんじゃないかな。 あと、型引数にLinkedListという具象クラス名をそのまま使うかねぇ。 ↓ private final Map<String, Queue<HTableInterface>> tables = new
ConcurrentHashMap<String, Queue<HTableInterface>>(); |
private HTableInterfaceFactory tableFactory = new
HTableFactory(); |
factoryを外部から受け渡すコンストラクターを使う場合、この初期化は無駄なのではないかと思う。 |
public HTableInterface getTable(String tableName) { |
getTable()が初めて呼ばれる際に複数スレッドから同時に呼ばれた場合、 tablesに別々にLinkedListインスタンスを入れることになるので、競合しそうに見える。 が、生成したLinkedListの中に何かを入れているわけではないので、 他のスレッドで作ったLinkedListインスタンスが無駄になるだけで、それ以外の害は無い。 あと、LinkedListでなくConcurrentLinkedQueueを使えば、synchronizedする必要が無い…かな? (削除時の同期化まで考えると、結局synchronizedが要るかも) 個人的にはLinkedListはArrayListよりメモリーを喰うから好きではないので、 synchronized有りのQueueで使うならArrayDequeの方がいいかも? |
public void putTable(HTableInterface table) { |
使わなくなったHTableインスタンスをキューに戻す処理だが。 キューの最大サイズ(maxSize)を超えた場合、何もせずにreturnしている。 しかし table.close() くらいは呼んだ方がいいんじゃないかと思う。まぁ、実際には最大サイズを超えることは無い気もするし^^;、 クローズもflushCommits()を呼んでいるだけなので、明示的にフラッシュしていれば特に何もする必要は無いし。 んー、それともフラッシュ(コミット)だから、あえて呼んでいないのかな?DBに反映させない為に…。 (自動コミットがオンだったら、いずれにしても適当な時点でコミットされる?) |
public void closeTablePool(final String tableName) { |
closeTablePool()は、テーブル名を渡すようになっている。 しかし最後にまとめてclose()することを考えるなら、 テーブル名に関わらず、保持している全てのHTableをクローズするメソッドがあった方が便利じゃないかと思う。 同期化することを考えると面倒かもしれないが。 少なくとも保持しているテーブル名一覧を取得するメソッドくらいはあってもいいと思う。 |
しかしまぁ、プールのget/putが呼ばれる頻度はそんなに高くはならないだろうから、あまり同期化の部分のスピードにはこだわらなくてもいいのかも(苦笑)