HBaseのJava APIのHTablePoolクラスは、HTableインスタンスをプール(キャッシュ)するもの。
HTable自身は書き込みに関してスレッドセーフ(MTセーフ)ではない(HTableクラスのJavadocに書いてある)ので、インスタンスを複数スレッドで使い回すことは出来ない。
HTablePoolのJavadocにはスレッドセーフであるとか書かれていないが、使用目的と実装内容からして、スレッドセーフであると思われる。
|
testテーブルに対し、10個のスレッドで100件ずつのデータをputする例。
import java.io.IOException; import java.util.Date; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes;
public class HTablePoolSample {
public static void main(String[] args) {
HBaseConfiguration conf = new HBaseConfiguration();
HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE);
// confがデフォルトでいい場合は、HTablePool pool = new HTablePool();でよい
// スレッドで実行
Thread[] ts = new Thread[10];
int start = 0;
for (int i = 0; i < ts.length; i++) {
int end = start + 100;
ts[i] = new SampleThread(pool, "test", start, end);
ts[i].start();
start = end;
}
// スレッドの終了待ち
for (Thread t : ts) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class SampleThread extends Thread {
private HTablePool pool;
private String tableName;
private int start;
private int end;
public SampleThread(HTablePool pool, String tableName, int start, int end) {
this.pool = pool;
this.tableName = tableName;
this.start = start;
this.end = end;
}
@Override
public void run() {
HTable table = pool.getTable(tableName);
try {
for (int i = start; i < end; i++) {
Put p = new Put(Bytes.toBytes("key" + i));
p.add( Bytes.toBytes("data1"),
Bytes.toBytes("pool"),
Bytes.toBytes("poolSample " + new Date()));
table.put(p);
}
table.flushCommits();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
pool.putTable(table);
}
}
}
}
HTablePoolから、getTable()でHTableインスタンスを取得する。
使い終わったら、putTable()でインスタンスをプールに戻す。
本来であれば、一番最後(プログラムの終了時)でプール内に格納されているHTableをclose()した方がいいと思うのだが、HBase0.20のHTablePoolにはそういった機能は無い。
仮にHTable以外のインスタンス(TransactionalTableとか)を使いたい場合、HTablePool#newHTable()をオーバーライドした新しいクラスを作り、個別のHTableインスタンスを生成するようにする。
testテーブルに対し、10個のスレッドで100件ずつのデータをputする例。
import java.io.IOException; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes;
public class HTablePoolSample {
public static void main(String[] args) {
Configuration conf = HBaseConfiguration.create();
HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE);
// confがデフォルトでいい場合は、HTablePool pool = new HTablePool();でよい
String tableName = "test";
try {
// スレッドの実行
Thread[] ts = new Thread[10];
int start = 0;
for (int i = 0; i < ts.length; i++) {
int end = start + 100;
ts[i] = new SampleThread(pool, tableName, start, end);
ts[i].start();
start = end;
}
// スレッドの終了待ち
for (Thread t : ts) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
pool.closeTablePool(tableName);
}
}
static class SampleThread extends Thread {
private HTablePool pool;
private String tableName;
private int start;
private int end;
public SampleThread(HTablePool pool, String tableName, int start, int end) {
this.pool = pool;
this.tableName = tableName;
this.start = start;
this.end = end;
}
@Override
public void run() {
HTableInterface table = pool.getTable(tableName);
try {
for (int i = start; i < end; i++) {
Put p = new Put(Bytes.toBytes("key" + i));
p.add( Bytes.toBytes("data1"),
Bytes.toBytes("pool"),
Bytes.toBytes("poolSample " + new Date()));
table.put(p);
}
table.flushCommits();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
pool.putTable(table);
}
}
}
}
HTablePoolに渡すconfがConfigurationクラスになっている。
また、getTable()はHTableを返すのではなく、HTableInterfaceを返すようになっている。
(いずれもHBase0.89の全般的な変更)
HBase0.20と0.89の大きな違いは、プールを使い終わった後にHTableをクローズするcloseTablePool()が加わったこと。
ただしプール内で保持しているインスタンスをクローズするだけなので、get中のインスタンスは対象外。
したがって、closeTablePool()を呼ぶタイミングは、全インスタンスが回収された後でないといけないだろう。
HTablePoolのコンストラクターにHTableInterfaceFactoryが渡せるようにもなっているので、HTable以外のインスタンス(TransactionalTableとか)を使いたい場合は、それを返すFactoryを用意し、HTablePoolに渡す。
HTableInterfaceFactory factory = new HTableFactory() {
@Override
public HTableInterface createHTableInterface(Configuration config, byte[] tableName) {
try {
return new HTableインスタンス生成;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE, factory);
HBase0.89.20100621のHTablePoolがどういう実装になっているかをちょっと見てみた。
| ソース | 感想 |
|---|---|
private final Map<String, LinkedList<HTableInterface>> tables = |
HTableインスタンスをプールする為のマップ。 Collections.synchronizedMap()を使っているが、HBaseはJDK1.6以降なんだし、ConcurrentHashMapでいいんじゃないかな。 あと、型引数にLinkedListという具象クラス名をそのまま使うかねぇ。 ↓ private final Map<String, Queue<HTableInterface>> tables = new
ConcurrentHashMap<String, Queue<HTableInterface>>(); |
private HTableInterfaceFactory tableFactory = new
HTableFactory(); |
factoryを外部から受け渡すコンストラクターを使う場合、この初期化は無駄なのではないかと思う。 |
public HTableInterface getTable(String tableName) { |
getTable()が初めて呼ばれる際に複数スレッドから同時に呼ばれた場合、 tablesに別々にLinkedListインスタンスを入れることになるので、競合しそうに見える。 が、生成したLinkedListの中に何かを入れているわけではないので、 他のスレッドで作ったLinkedListインスタンスが無駄になるだけで、それ以外の害は無い。 あと、LinkedListでなくConcurrentLinkedQueueを使えば、synchronizedする必要が無い…かな? (削除時の同期化まで考えると、結局synchronizedが要るかも) 個人的にはLinkedListはArrayListよりメモリーを喰うから好きではないので、 synchronized有りのQueueで使うならArrayDequeの方がいいかも? |
public void putTable(HTableInterface table) { |
使わなくなったHTableインスタンスをキューに戻す処理だが。 キューの最大サイズ(maxSize)を超えた場合、何もせずにreturnしている。 しかし table.close()くらいは呼んだ方がいいんじゃないかと思う。まぁ、実際には最大サイズを超えることは無い気もするし^^;、 クローズもflushCommits()を呼んでいるだけなので、明示的にフラッシュしていれば特に何もする必要は無いし。 んー、それともフラッシュ(コミット)だから、あえて呼んでいないのかな?DBに反映させない為に…。 (自動コミットがオンだったら、いずれにしても適当な時点でコミットされる?) |
public void closeTablePool(final String tableName) { |
closeTablePool()は、テーブル名を渡すようになっている。 しかし最後にまとめてclose()することを考えるなら、 テーブル名に関わらず、保持している全てのHTableをクローズするメソッドがあった方が便利じゃないかと思う。 同期化することを考えると面倒かもしれないが。 少なくとも保持しているテーブル名一覧を取得するメソッドくらいはあってもいいと思う。 |
しかしまぁ、プールのget/putが呼ばれる頻度はそんなに高くはならないだろうから、あまり同期化の部分のスピードにはこだわらなくてもいいのかも(苦笑)