S-JIS[2010-05-16/2010-05-17] 変更履歴

HBaseトランザクション

HBaseではトランザクションを扱うことが出来る。
このトランザクションの排他制御方式は「OCC(Optimistic Concurrency Control:楽観的並行性制御)」。


環境設定

HBase0.20でトランザクションを使うためには、いくつか事前設定しておく必要がある。

  作業内容 設定内容・実施コマンド 備考
1 HBASE_HOME\conf\hbase-site.xml
を編集する。
  <property>
    <name>hbase.regionserver.class</name>
    <value>org.apache.hadoop.hbase.ipc.TransactionalRegionInterface</value>
  </property>
  <property>
    <name>hbase.regionserver.impl</name>
    <value>org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer</value>
  </property>
  <property>
    <name>hbase.regionserver.hlog.keyclass</name>
    <value>org.apache.hadoop.hbase.regionserver.transactional.THLogKey</value>
  </property>
トランザクション用の設定。

下2つのプロパティーは
hbase-default.xmlには入っていないので
追加する。
2 hbase-0.20.x-transactional.jar
をlibにコピーする。
cp -p $HBASE_HOME/contrib/transactional/hbase-*-transactional.jar $HBASE_HOME/lib/
 

この状態でHBaseを開始する。

クラスの指定が変わるだけなので、今までのDB内のデータはそのまま使える。


トランザクションのコーディング

コンパイルに必要なので、Eclipseのビルドパスhbase-0.20.x-transactional.jarを追加しておく。
ソースの添付は「HBASE_HOME/src/contrib/transactional/src/java」フォルダー。

トランザクションを使用するコーディングは、通常のJavaのデータアクセスのコーディングとほぼ同じ。
違いは以下の通り。

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
import org.apache.hadoop.hbase.client.transactional.TransactionManager;
import org.apache.hadoop.hbase.client.transactional.TransactionState;
import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.PropertyConfigurator;
	public static void main(String[] args) throws Exception {
		PropertyConfigurator.configure("C:/cygwin/usr/local/hbase/conf/log4j.properties");

		// 設定ファイルを用いてHbaseへ接続
		HBaseConfiguration conf = new HBaseConfiguration();
		if (true) {
			// 実行時のクラスパスに「HBASE_HOME/conf」を入れていれば、このaddResource()は不要
			conf.addResource(new Path("file:///C:/cygwin/usr/local/hbase/conf/hbase-site.xml"));
		}

		// トランザクションマネージャーの準備
		TransactionManager tm = new TransactionManager(conf);

		// テーブルを指定
		TransactionalTable table = new TransactionalTable(conf, Bytes.toBytes("test"));
//		table.setAutoFlush(true);

		try {
			put(table, tm);
//			putNormal(table);
//			put2transaction(table, tm);
//			putThread(table, tm);
		} finally {
			table.close();
		}
	}
	static void put(TransactionalTable table, TransactionManager tm) throws IOException, CommitUnsuccessfulException {

		byte[] family    = Bytes.toBytes("data");
		byte[] qualifier = new byte[0];

		TransactionState transactionState = tm.beginTransaction();
		boolean commit = false;
		try {
			Put put1 = new Put(Bytes.toBytes("key1"));
			put1.add(family, qualifier, Bytes.toBytes("tran1"));
			table.put(transactionState, put1);

			Put put2 = new Put(Bytes.toBytes("key2"));
			put2.add(family, qualifier, Bytes.toBytes("tran2"));
			table.put(transactionState, put2);

			Delete del3 = new Delete(Bytes.toBytes("key3"));
			del3.deleteColumn(family, qualifier);
			table.delete(transactionState, del3);

//			table.flushCommits();

			commit = true;
		} finally {
			if (commit) {
				tm.tryCommit(transactionState);	//コミット
			} else {
				tm.abort(transactionState);	//ロールバック
			}
		}
	}

put1のtable.put()以降で例外が発生しても、ロールバックされる(abort()が呼ばれる)のでput1の更新は実際のDBには反映されない。
tryCommit()doCommit()が実行されるまで、他のアプリから更新内容を照会することは出来ない。

HTableに対するsetAutoFlush()およびflushCommits()は、自動でも手動でもどちらでも動くようだ。
一応、TransactionalTable#put()のJavadocには「オートフラッシュをオフにすると更新がバッファリングされる」と書いてあるので、何かしら違いはあるのだと思うが。

tryCommit()(やprepareCommit()doCommit())の中では、例外が発生するとabort()が呼ばれるようになっているので、自分で呼ぶ必要は無さそう。
(ログ出力とかで例外が発生するとその限りではないようなので、厳密には自分で呼ぶようにしておいた方がいいかも)

コミットするまでの間に別のアプリから(あるいは同じアプリ内でもトランザクションを使わずに)get()すると、更新前の値が取得される。[2010-05-17]
トランザクション付きのget()だと、更新中の値を取得できる。

	byte[] row1 = Bytes.toBytes("key1");
	Put put1 = new Put(row1);
	put1.add(family, qualifier, Bytes.toBytes("tran1-" + new Date()));
	table.put(transactionState, put1);

	Get get1 = new Get(row1);
	get1.addColumn(family, qualifier);

	//トランザクションのget()
	Result r = table.get(transactionState, get1);
	System.out.println("getTrans :" + Bytes.toString(r.getValue(family, qualifier)));

	//通常のget()
	r = table.get(get1);
	System.out.println("getNormal:" + Bytes.toString(r.getValue(family, qualifier)));

コミットしたデータのタイムスタンプは、コミット時点ではなく、トランザクション開始時点の時刻になる模様。[2010-05-17]
(更新した全データが同じタイムスタンプになる)
すなわち、トランザクションを開始してからコミットするまでの間に別アプリから(トランザクションとは無関係に)put()すると、
別アプリ側のデータの方が新しいタイムスタンプになるので、そちらの値が有効になる。


排他のコーディング

このトランザクションの排他制御方式は、「OCC(Optimistic Concurrency Control:楽観的並行性制御)」。[2010-05-17]
つまり、トランザクション開始時点でロックして他のトランザクションのデータ更新を一時停止する(排他する)のではなく、
コミット時点で更新対象データが自分以外によって更新されているかどうかチェックし、更新されていなければ自分のデータを反映させて正常終了、
他者に更新されていたらコミット失敗となる(CommitUnsuccessfulExceptionが発生する)。

このデータ更新チェックの為には、データ取得もトランザクション付きで行う必要がある。
また、排他対象の別アプリでもトランザクション付きでデータ操作する必要がある。
トランザクション無しの通常のget()put()は、データ更新チェックの対象にならない。
(つまりHBase Shellからデータ更新をしても、ちゃんと排他されたかどうかの確認は出来ない。HBase Shellはトランザクションを使っていないから)


排他エラーを発生させる例

	static void put2transaction(TransactionalTable table, TransactionManager tm) throws IOException, CommitUnsuccessfulException {
		byte[] family = Bytes.toBytes("data");
		byte[] qualifier = new byte[0];

		byte[] row1 = Bytes.toBytes("key1");
		byte[] row2 = Bytes.toBytes("key2");

		// データ準備
		if (true) {
			Put put1 = new Put(row1);
			put1.add(family, qualifier, Bytes.toBytes("5000"));
			table.put(put1);

			Put put2 = new Put(row2);
			put2.add(family, qualifier, Bytes.toBytes("3000"));
			table.put(put2);

			table.flushCommits();
		}

		int deltaA = 500;
		int deltaB = 10;

		// トランザクションを2つ用意
		TransactionState tranA = tm.beginTransaction();
		TransactionState tranB = tm.beginTransaction();

		boolean commit = false;
		try {
			{ //トランザクションAでkey1から減算する
				Get getA1 = new Get(row1);
				getA1.addColumn(family, qualifier);
				Result r = table.get(tranA, getA1);
				int value = toInt(r.getValue(family, qualifier));
				value -= deltaA;

				Put putA1 = new Put(row1);
				putA1.add(family, qualifier, toBytes(value));
				table.put(tranA, putA1);
			}

			{ //トランザクションBでkey1から減算する
				Get getB1 = new Get(row1);
				getB1.addColumn(family, qualifier);
				Result r = table.get(tranB, getB1);
				int value = toInt(r.getValue(family, qualifier));
				value -= deltaB;

				Put putB1 = new Put(row1);
				putB1.add(family, qualifier, toBytes(value));
				table.put(tranB, putB1);
			}

			{ //トランザクションAでkey2に加算する
				Get getA2 = new Get(row2);
				getA2.addColumn(family, qualifier);
				Result r = table.get(tranA, getA2);
				int value = toInt(r.getValue(family, qualifier));
				value += deltaA;

				Put putA2 = new Put(row2);
				putA2.add(family, qualifier, toBytes(value));
				table.put(tranA, putA2);
			}

			{ //トランザクションBでkey2に加算する
				Get getB2 = new Get(row2);
				getB2.addColumn(family, qualifier);
				Result r = table.get(tranB, getB2);
				int value = toInt(r.getValue(family, qualifier));
				value += deltaB;

				Put putB2 = new Put(row2);
				putB2.add(family, qualifier, toBytes(value));
				table.put(tranB, putB2);
			}

			commit = true;
		} finally {
			if (commit) {
				tm.tryCommit(tranA);
				tm.tryCommit(tranB); //ここでCommitUnsuccessfulExceptionが発生する!
			} else {
				tm.abort(tranA);
				tm.abort(tranB);
			}
		}
	}
	public static int toInt(byte[] value) {
		return Integer.parseInt(Bytes.toString(value));
	}

	public static byte[] toBytes(int value) {
		return Bytes.toBytes(Integer.toString(value));
	}

トランザクションAもトランザクションBも、key1の値を取得して一定値を減算し、key2に加算している。
同じデータに対して更新しようとしている為、後からコミットを実行した方はCommitUnsuccessfulExceptionが発生する。


送金処理の例

楽観的並行性制御(OCC)においては、コミットが失敗したら、更新をデータ取得からやり直すようなコーディングになるらしい。

以下、ジーク(key1)が弟のベルハルト(key2)に2万ガメルを少しずつ送金する例。(ただしお金が足りなくて途中で諦める(爆))

スレッドで処理しているのは、同時更新したら排他エラーが発生するパターンを試してみたいからであって、通常のアプリならこのスレッド内部のロジックだけコーディングすればいいだろう。

	private static final byte[] family = Bytes.toBytes("data");
	private static final byte[] qualifier = new byte[0];
	static void putThread(TransactionalTable table, TransactionManager tm) throws IOException, CommitUnsuccessfulException {

		byte[] row1 = Bytes.toBytes("key1"); //ジーク
		byte[] row2 = Bytes.toBytes("key2"); //ベルハルト

		// データ準備
		if (true) {
			Put put1 = new Put(row1);
			put1.add(family, qualifier, toBytes(5000)); //ジークの所持金
			table.put(put1);

			Put put2 = new Put(row2);
			put2.add(family, qualifier, toBytes(100000)); //ベルハルトの所持金
			table.put(put2);

			table.flushCommits();
		}

		//返金額
		final int total = 20000;
		int sum = 0;

		Thread[] ts = new Thread[30];
		for (int i = 0; i < ts.length; i++) {
			int amount = (i + 1) * 100; //1回当たりの返金額
			System.out.printf("[%2d]%5d%n", i, amount);

			ts[i] = new RemitThread(i, table, tm, row1, row2, amount);
			ts[i].start();	//送金開始

			sum += amount;
			if (sum >= total) {
				break;
			}
		}
		System.out.printf("送金合計:%d/%d%n", sum, total);

		for (int i = 0; i < ts.length; i++) {
			if (ts[i] != null) {
				try {
					ts[i].join();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}
	/**
	 * row1からrow2へ送金するスレッド
	 */
	public static class RemitThread extends Thread {

		private int no;
		private TransactionalTable table;
		private TransactionManager tm;
		private byte[] row1;
		private byte[] row2;
		private int amount;

		// コンストラクター
		public RemitThread(int no, TransactionalTable table, TransactionManager tm, byte[] row1, byte[] row2, int amount) {
			this.no = no;
			this.table = table;
			this.tm = tm;
			this.row1 = row1;
			this.row2 = row2;
			this.amount = amount;
		}

		// トランザクション処理本体
		@Override
		public void run() {
			for (;;) {
				TransactionState transactionState = tm.beginTransaction();
				boolean rollback = true;
				try {
					execute(transactionState);

					rollback = false;
					try {
						tm.tryCommit(transactionState); //コミット
						// tryCommit()で例外が発生したら、自動的にabort()が呼ばれる
					} catch (CommitUnsuccessfulException e) {
						System.out.printf("[%2d] %s%n", no, e.getMessage());
						continue;	 //他者更新済みの場合は、最初からやり直す
					}

					break;
				} catch (IOException e) {
					throw new RuntimeException(e);
				} finally {
					if (rollback) {
						try {
							tm.abort(transactionState); //ロールバック
						} catch (IOException e) {
							e.printStackTrace();
						}
					}
				}
			}
		}

		// 送金処理本体
		protected void execute(TransactionState transactionState) throws IOException {

			Get get1 = new Get(row1);
			get1.addColumn(family, qualifier);
			Result r1 = table.get(transactionState, get1);
			int value1 = toInt(r1.getValue(family, qualifier));
			if (value1 < amount) {
				throw new RuntimeException("あ、ガメルが足りなかった。すまん。また今度な。" + value1 + "<" + amount);
			}
			value1 -= amount;

			Put put1 = new Put(row1);
			put1.add(family, qualifier, toBytes(value1));
			table.put(transactionState, put1);


			Get get2 = new Get(row2);
			get2.addColumn(family, qualifier);
			Result r2 = table.get(transactionState, get2);
			int value2 = toInt(r2.getValue(family, qualifier));
			value2 += amount;

			Put put2 = new Put(row2);
			put2.add(family, qualifier, toBytes(value2));
			table.put(transactionState, put2);
		}
	}

コミットまたはロールバックすると そのtransactionStateは無効になるので、再度取得し直す必要がある。


HBaseプログラミングへ戻る / HBaseへ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま