S-JIS[2010-05-11/2010-08-29] 変更履歴

CassandraにJavaからアクセス

Apache CassandraにJavaからアクセスする例。

CassandraはThriftを使って作られているので、ThriftクライアントのAPIを使ってアクセスすることになる。
なお、このAPIはThriftで生成されたものなので、標準的なJavaの命名規約からは外れている。


ライブラリー

Cassandraクライアントプログラムのコンパイル・実行に必要なjarファイルは以下の通り。

jarファイル 備考
CASSANDRA_HOME\lib\ apache-cassandra-0.6.1.jar Cassandra本体
libthrift-r917130.jar Thriftのライブラリー
log4j-1.2.14.jar
slf4j-api-1.5.8.jar
slf4j-log4j12-1.5.8.jar
log4j(コンパイルには不要、実行には必要)

まぁ、後腐れなくCASSANDRA_HOME\lib\の下のjarファイルを全てEclipseのビルドパスに入れてしまえば楽(笑)


Eclipseへのソースの添付について

Eclipseのソースの添付機能で、apache-cassandra-0.6.1.jarにダウンロードしてきたソースを指定することが出来る。

ただし解凍したディレクトリーの「apache-cassandra-0.6.1-src/src/java」を指定しただけだと、Thrift部分のソースが含まれない。
Thriftで生成されたJavaソースは「apache-cassandra-0.6.1-src/interface/thrift/gen-java」にある。
したがって、ちょっと手間はかかるが、両方をマージしたディレクトリーを作り、apache-cassandra-0.6.1.jarにはそのディレクトリーを指定するのが良い。
(「interface/thrift/gen-java/org〜」を「src/java/」に追加コピーしてしまうのが楽かも?)


Cassandraアクセスのサンプル

Cassandra WikiThriftExamplesによると、Cassandra0.6でコーディング方法が変わったらしいが、自分は最初から0.6.1なので気にしない(笑)

import java.io.UnsupportedEncodingException;
import java.util.Date;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
public class CassandraSample {
	public static final String UTF8 = "UTF-8";

	public static void main(String[] args) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException,
			TException, UnsupportedEncodingException {

		TTransport transport = new TSocket("localhost", 9160);
		TProtocol  protocol  = new TBinaryProtocol(transport);
		Cassandra.Client client = new Cassandra.Client(protocol);

		transport.open();
		try {
			String keyspace     = "Keyspace1";	//キースペース名
			String columnFamily = "Standard2";	//カラムファミリー名(テーブル名)
			String columnAge    = "age";	//カラム名
			String key          = "jsmith";	//キー

			// カラムを指定してデータを取得
			ColumnPath colPathAge = new ColumnPath(columnFamily).setColumn(columnAge.getBytes(UTF8));
			Column col = client.get(keyspace, key, colPathAge, ConsistencyLevel.ONE).getColumn();

			// 取ってきたデータを表示してみる
			System.out.println(col);
			System.out.println(new String(col.getName() , UTF8));
			System.out.println(new String(col.getValue(), UTF8));
			System.out.println(new Date(col.getTimestamp() / 1000));
		} finally {
			transport.close();
		}
	}
}

TTransportやTProtocol・TException等のTから始まっているクラス(パッケージorg.apache.thrift)は、Thrift本体。
Cassandra.ClientColumnPathColumn(パッケージorg.apache.cassandra.thrift)は、Thriftを使って生成されたCassandra本体。

変数colから データを取得するのに、ThriftExamplesでは「col.name」や「col.value」という書き方(フィールドを直接指定)をしている。
Columnクラスの使い方は“クラス”というより“構造体”であり、フィールドもpublicなので、それでもいいとは思うけれど。
Javaの標準的なコーディング規約に従うならば、ゲッターメソッドを使う方が良いと思われる。

あと、タイムスタンプの取得を「col.getTimestamp() / 1000」という様に1000で割っているのは、cassandra CLIから入力したデータはマイクロ秒単位で登録されているから。
Javaの中(System.currentTimeMillis()Dateクラス)ではミリ秒で扱うので、1000で割っている。

TTransportクラスはCloseableインターフェースを実装していない。
したがってTTransport#close()には「throws IOException」といった宣言が無いので、close()そのもののtry〜catchは不要。

実行結果の例

Column(name:61 67 65, value:34 32, timestamp:1273492960937000)
age
42
Mon May 10 21:02:40 JST 2010

データアクセスのコーディング

Cassandraのデータを操作する為のメソッド。(というか、CassandraのThriftクライアントのメソッド)


情報取得系のメソッド。

  コーディング例 備考
クラスター名の取得
show cluster name
String clusterName = client.describe_cluster_name();
Test Cluster」といった文字列が取れる。
キースペース一覧の取得
show keyspaces
Set<String> set = client.describe_keyspaces();
Setの実体はHashSet。つまりソートされていない。
カラムファミリー定義の取得
describe keyspace
Map<String, Map<String, String>> ksMap =
	client.describe_keyspace("Keyspace1");
Map<String, String> map = ksMap.get("Standard2");
mapの中身は以下のような感じ。
キー
CompareWith org.〜.UTF8Type
Desc Keyspace1.Standard2
Column Family Type: Standard
Columns Sorted By: org.〜.UTF8Type@883644
Type Standard
Descの値はなぜか文字列が改行区切りで入っている。
スーパーカラムだとCompareSubcolumnsWithもあったりする。
リング情報の取得
List<TokenRange> list =
	client.describe_ring("Keyspace1");
 
APIバージョンの取得
(show api version)
String ver = client.describe_version();
 

データ取得系のメソッド。

  コーディング例 備考
データ(カラム)の取得
String key = "jsmith";

ColumnPath colPath = new ColumnPath("Standard2");
colPath.setColumn("age".getBytes(UTF8));

Column col = client.get(
  "Keyspace1", key, colPath, ConsistencyLevel.ONE
).getColumn();
通常のカラムのカラムファミリーの場合、
ColumnPathにはカラムファミリー名を指定する。
そこにsetColumn()でカラム名を指定する。

SQLの「SELECT age FROM Standard2 WHERE key='jsmith'」に相当。
ColumnPath colPath = new ColumnPath("Super1");
colPath.setSuper_column("attr".getBytes(UTF8));
colPath.setColumn("age".getBytes(UTF8));
スーパーカラムのカラムを取得する場合、ColumnPathに
カラムファミリー名・スーパーカラム名・カラム名
を指定する。
スーパーカラムのデータの取得
String key = "jsmith";

ColumnPath colPath = new ColumnPath("Super1");
colPath.setSuper_column("name".getBytes(UTF8));

SuperColumn scol = client.get(
  "Keyspace1", key, colPath, ConsistencyLevel.ONE
).getSuper_column();

//スーパーカラム内の全カラム
List<Column> list = scol.getColumns();
スーパーカラムを取得する場合、ColumnPathに
カラムファミリー名・スーパーカラム名
だけを指定する。
スーパーカラム内の全カラムのデータが取得できる。
カラム数の取得
count
String key = "jsmith";

ColumnParent colParent = new ColumnParent("Standard2");

int count = client.get_count(
  "Keyspace1", key, colParent, ConsistencyLevel.ONE
);
指定されたレコード(キー)内のカラム数を取得する。
カラム数はレコードによって異なるので、
わざわざ取得方法が用意されているのだろう。
ColumnParent colParent = new ColumnParent("Super1");

int count = client.get_count(
  "Keyspace1", key, colParent, ConsistencyLevel.ONE
);
スーパーカラムの場合、
指定されたレコード(キー)内のスーパーカラム数の取得になる。[2010-05-12]
ColumnParent colParent = new ColumnParent("Super1");
colParent.setSuper_column("name".getBytes(UTF8));

int count = client.get_count(
  "Keyspace1", key, colParent, ConsistencyLevel.ONE
);
スーパーカラムでスーパーカラム名を指定した場合、
指定されたレコード(キー)のスーパーカラム内のカラム数の取得になる。[2010-05-12]
複数カラムのデータ取得
(通常カラムの場合)
String key = "jsmith";

ColumnParent colParent = new ColumnParent("Standard2");

SlicePredicate predicate = new SlicePredicate();
predicate.addToColumn_names("first".getBytes(UTF8));
predicate.addToColumn_names("last" .getBytes(UTF8));
predicate.addToColumn_names("age"  .getBytes(UTF8));

List<ColumnOrSuperColumn> list = client.get_slice(
  "Keyspace1", key, colParent, predicate, ConsistencyLevel.ONE
);

for (ColumnOrSuperColumn cs : list) {
  Column col = cs.getColumn();
  〜
}
複数カラムを同時に取得する場合、get_slice()を使う。
この場合、ColumnPathでなくColumnParentでカラムファミリーを指定する。
そしてSlicePredicateでカラム名を指定する。

取得されるカラムのリストの並び順は、
SlicePredicateに指定した順ではなく、
カラムファミリーで定義されている順番。

SQLの「SELECT first,last,age FROM Standard2 WHERE key='jsmith'」に相当。
SlicePredicate predicate = new SlicePredicate();
predicate.setColumn_names(Arrays.asList(
  "first".getBytes(UTF8),
  "last" .getBytes(UTF8),
  "age"  .getBytes(UTF8)
));
カラム名のリストで指定する方法。
SliceRange range = new SliceRange();
range.setStart ("age" .getBytes(UTF8));
range.setFinish("last".getBytes(UTF8));
range.setCount(3);

SlicePredicate predicate = new SlicePredicate();
predicate.setSlice_range(range);
SlicePredicateでは、カラム名を列挙する使い方の他に、
カラム名の範囲(開始と終了)を指定することが出来る。
カラム名に日付とか数値を使う場合には便利かも。
(カラム名のソート順は、たぶんCompareWithCompareSubcolumnsWithで指定する)

SliceRange#setCount()で、取得する最大カラム数を指定する。
デフォルトは100個。
SliceRange range = new SliceRange();
range.setStart (new byte[0]);
range.setFinish(new byte[0]);
range.setCount(count);

SlicePredicate predicate = new SlicePredicate();
predicate.setSlice_range(range);
SliceRangeのstart・finishに0バイトの配列を指定すると、
全カラムを指定した事になる。
(nullを指定すると例外が発生する)
カラム数の取得方法
複数スーパーカラムのデータ取得
String key = "jsmith";

ColumnParent colParent = new ColumnParent("Super1");

SlicePredicate predicate = new SlicePredicate();
predicate.addToColumn_names("name".getBytes(UTF8));
predicate.addToColumn_names("attr".getBytes(UTF8));

List<ColumnOrSuperColumn> list = client.get_slice(
  "Keyspace1", key, colParent, predicate, ConsistencyLevel.ONE
);

for (ColumnOrSuperColumn cs : list) {
  SuperColumn scol = cs.getSuper_column();
  〜
}
スーパーカラムのカラムファミリーの場合、
SlicePredicateに指定するのはスーパーカラム名。
複数カラムのデータ取得
(スーパーカラムの場合)
String key = "jsmith";

ColumnParent colParent = new ColumnParent("Super1");
colParent.setSuper_column("name".getBytes(UTF8));

SlicePredicate predicate = new SlicePredicate();
predicate.addToColumn_names("first".getBytes(UTF8));
predicate.addToColumn_names("last" .getBytes(UTF8));

List<ColumnOrSuperColumn> list = client.get_slice(
  "Keyspace1", key, colParent, predicate, ConsistencyLevel.ONE
);

for (ColumnOrSuperColumn cs : list) {
  Column col = cs.getColumn();
  〜
}
スーパーカラムのカラムファミリーの場合、
ColumnParentにsetSuper_column()でスーパーカラム名を指定すると
SlicePredicateにはその中のカラム名を指定することになる。
複数キーの指定
List<String> keys = new ArrayList<String>();
keys.add("jsmith");
keys.add("hishidama");

ColumnParent colParent = new ColumnParent("Standard2");

SlicePredicate predicate = new SlicePredicate();
predicate.addToColumn_names("first".getBytes(UTF8));
predicate.addToColumn_names("last" .getBytes(UTF8));

Map<String, List<ColumnOrSuperColumn>> map = client.multiget_slice(
  "Keyspace1", keys, colParent, predicate, ConsistencyLevel.ONE
);

for (String key : keys) {
  List<ColumnOrSuperColumn> list = map.get(key);
  for (ColumnOrSuperColumn cs : list) {
    Column col = cs.getColumn();
    〜
  }
}
multiget_slice()は、複数のキーを指定してデータを取得する。
get_slice()を並列(並行)で実行するらしい。

返り値のmapはHashMap
DB内に存在しないキーであっても、mapにはキーは入ってくる。
(値は空のリストになる)

SQLの「SELECT first,last FROM Standard2 WHERE key in ('jsmith', 'hishidama')」に相当。
キー範囲の指定
KeyRange keyRange = new KeyRange();
keyRange.setStart_key("jsmith");
keyRange.setEnd_key  ("jsmith99");
keyRange.setCount(100);

ColumnParent colParent = new ColumnParent("Standard2");

SlicePredicate predicate = new SlicePredicate();
predicate.addToColumn_names("first".getBytes(UTF8));
predicate.addToColumn_names("last" .getBytes(UTF8));

List<KeySlice> list = client.get_range_slices(
  "Keyspace1", colParent, predicate, keyRange, ConsistencyLevel.ONE
);

for (KeySlice slice : list) {
  String key = slice.getKey();
  List<ColumnOrSuperColumn> cols = slice.getColumns();
  for (ColumnOrSuperColumn cs : cols) {
    Column col = cs.getColumn();
    〜
  }
}
get_range_slices()は、キーの範囲を指定してデータを取得する。
start_keyおよびend_keyに空文字列("")を指定すると、全キーが対象になるようだ。(nullは不可)

KeyRange#setCount()で取得する最大件数を指定する。
デフォルトは100件。

SQLの「SELECT first,last FROM Standard2 WHERE key BETWEEN 'jsmith' AND 'jsmith99'」に相当。

HBaseだとScanやFilterを使ってデータ値自体を取得条件にすることが出来るが、Cassandraでは出来ないようだ。[2010-05-12]

HBaseだとデータ(Cell)の履歴(過去のタイムスタンプのデータ)を取得することが出来るが、Cassandraでは出来ないようだ。[2010-05-12]


データ更新系のメソッド。[2010-05-12]

  コーディング例 備考
データ(カラム)の登録
set
String key = "jsmith2";

ColumnPath colPath = new ColumnPath("Standard2");
colPath.setColumn("age".getBytes(UTF8));

byte[] value = "42".getBytes(UTF8);

long timestamp = System.currentTimeMillis() * 1000;

client.insert(
  "Keyspace1", key, colPath, value, timestamp, ConsistencyLevel.ONE
);
カラムにデータを登録する。
(既に有ったら上書き)

データが既存の場合、SQLの「UPDATE Standard2 SET age='42' WHERE key='jsmith'」に相当。
ColumnPath colPath = new ColumnPath("Super1");
colPath.setSuper_column("attr".getBytes(UTF8));
colPath.setColumn("age".getBytes(UTF8));
スーパーカラムの場合、スーパーカラム名も指定する。
データの一括登録 複数のデータを一括して登録する場合、batch_mutate()を使う。
ただしMutionクラスをマップで扱う必要があるので、マップを生成するメソッドを別途用意した方が分かりやすくなると思う。
Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();

long timestamp = System.currentTimeMillis() * 1000;

{
  String key = "jsmith3";

  Map<String, List<Mutation>> colMap	= new HashMap<String, List<Mutation>>();
  addMutationColumn(colMap, "Standard2", "first".getBytes(UTF8), "John3".getBytes(UTF8),  timestamp);
  addMutationColumn(colMap, "Standard2", "last".getBytes(UTF8),  "Smith3".getBytes(UTF8), timestamp);
  addMutationColumn(colMap, "Standard2", "age".getBytes(UTF8),   "43".getBytes(UTF8),     timestamp);

  mutationMap.put(key, colMap);
}

client.batch_mutate("Keyspace1", mutationMap, ConsistencyLevel.ONE);
public static void addMutationColumn(
	Map<String, List<Mutation>> map,
	String columnFamily,
	byte[] name, byte[] value, long timestamp) {

  Column col = new Column(name, value, timestamp);

  ColumnOrSuperColumn cs = new ColumnOrSuperColumn()	.setColumn(col);
  Mutation mutation = new Mutation().setColumn_or_supercolumn(cs);

  List<Mutation> list = map.get(columnFamily);
  if (list == null) {
    list = new ArrayList<Mutation>();
    map.put(columnFamily, list);
  }
  list.add(mutation);
}
Cassandra0.6.4までは、キーはtrim()して処理される。[2010-08-29]
Cassandra0.6.5で、trim()せずに処理されるようになった。
スーパーカラムのカラムファミリーにbatch_mutate()で登録する例。
Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();

long timestamp = System.currentTimeMillis() * 1000;

{
  String key = "jsmith4";

  Map<String, List<Mutation>> colMap = new HashMap<String, List<Mutation>>();
  addMutationSuperColumn(colMap, "Super1", "name".getBytes(UTF8), "first".getBytes(UTF8), "John4".getBytes(UTF8),  timestamp);
  addMutationSuperColumn(colMap, "Super1", "name".getBytes(UTF8), "last".getBytes(UTF8),  "Smith4".getBytes(UTF8), timestamp);
  addMutationSuperColumn(colMap, "Super1", "attr".getBytes(UTF8), "age".getBytes(UTF8),   "44".getBytes(UTF8),     timestamp);

  mutationMap.put(key, colMap);
}

client.batch_mutate("Keyspace1", mutationMap, ConsistencyLevel.ONE);
public static void addMutationSuperColumn(
	Map<String, List<Mutation>> map,
	String columnFamily,
	byte[] superName,
	byte[] name, byte[] value, long timestamp) {

  Column col = new Column(name, value, timestamp);
  SuperColumn scol = new SuperColumn().setName(superName);
  scol.addToColumns(col);

  ColumnOrSuperColumn cs = new ColumnOrSuperColumn().setSuper_column(scol);
  Mutation mutation = new Mutation().setColumn_or_supercolumn(cs);

  List<Mutation> list = map.get(columnFamily);
  if (list == null) {
    list = new ArrayList<Mutation>();
    map.put(columnFamily, list);
  }
  list.add(mutation);
}
レコード削除
String key = "jsmith2";

ColumnPath colPath = new ColumnPath("Standard2");

long timestamp = System.currentTimeMillis() * 1000;

client.remove(
  "Keyspace1", key, colPath, timestamp, ConsistencyLevel.ONE
);
ColumnPathにカラムファミリー名だけ指定すると、レコードの削除になる。

SQLの「DELETE FROM Standard2 WHERE key='jsmith2'」に相当。
複数レコードの削除
Map<String, Map<String, List<Mutation>>> mutationMap
	= new HashMap<String, Map<String, List<Mutation>>>();

long timestamp = System.currentTimeMillis() * 1000;

Map<String, List<Mutation>> colMap = new HashMap<String, List<Mutation>>();
addDeletion(colMap, "Standard2",
  Arrays.asList(
    "first".getBytes(UTF8), "last".getBytes(UTF8), "age".getBytes(UTF8)
  ),
  timestamp
);
mutationMap.put("jsmith2", colMap);
mutationMap.put("jsmith3", colMap);

client.batch_mutate("Keyspace1", mutationMap, ConsistencyLevel.ONE);
public static void addDeletion(
	Map<String, List<Mutation>> map,
	String columnFamily,
	List<byte[]> names,
	long timestamp) {

  SlicePredicate predicate = new SlicePredicate().setColumn_names(names);
  Deletion deletion = new Deletion(timestamp).setPredicate(predicate);
  Mutation mutation = new Mutation().setDeletion(deletion);

  List<Mutation> list = map.get(columnFamily);
  if (list == null) {
    list = new ArrayList<Mutation>();
    map.put(columnFamily, list);
  }
  list.add(mutation);
}
batch_mutate()を利用して、複数のデータをまとめて削除できる。

SlicePredicateにはカラム名を範囲で指定する機能があるのだが、
Deletionの場合には(まだ)対応していない。
したがってSlicePredicateを使う際にはカラム名を列挙する必要がある。
(行全体を削除したい場合は、SlicePredicateを指定しない(0.6.3以降)[2010-07-03]
Map<String, Map<String, List<Mutation>>> mutationMap
	= new HashMap<String, Map<String, List<Mutation>>>();

long timestamp = System.currentTimeMillis() * 1000;

Map<String, List<Mutation>> colMap = createDeletion(timestamp, "Standard2");
mutationMap.put("jsmith2", colMap);
mutationMap.put("jsmith3", colMap);

client.batch_mutate("Keyspace1", mutationMap, ConsistencyLevel.ONE);
public static Map<String, List<Mutation>> createDeletion(
	long timestamp,
	String... columnFamilies) {

  Deletion deletion = new Deletion(timestamp);
  Mutation mutation = new Mutation().setDeletion(deletion);

  List<Mutation> list = new ArrayList<Mutation>(1);
  list.add(mutation);

  Map<String, List<Mutation>> map
	= new HashMap<String, List<Mutation>>(columnFamilies.length);
  for (String columnFamily : columnFamilies) {
    map.put(columnFamily, list);
  }
  return map;
}
0.6.3で、カラム名指定なしで行削除が出来るようになった。[2010-07-03]
(DeletionにSlicePredicateを指定しない)

SQLの「DELETE FROM Standard2 WHERE key in ('jsmith2', 'jsmith3')」に相当。
Map<String, Map<String, List<Mutation>>> mutationMap
	= new HashMap<String, Map<String, List<Mutation>>>();

long timestamp = System.currentTimeMillis() * 1000;

Map<String, List<Mutation>> colMap = new HashMap<String, List<Mutation>>();
addDeletionSuper(colMap, "Super1", "name".getBytes(UTF8), timestamp);
addDeletionSuper(colMap, "Super1", "attr".getBytes(UTF8), timestamp);

mutationMap.put("jsmith2", colMap);
mutationMap.put("jsmith4", colMap);

client.batch_mutate("Keyspace1", mutationMap, ConsistencyLevel.ONE);
public static void addDeletionSuper(
	Map<String, List<Mutation>> map,
	String columnFamily,
	byte[] superName,
	long timestamp) {

  Deletion deletion = new Deletion(timestamp).setSuper_column(superName);
  Mutation mutation = new Mutation().setDeletion(deletion);

  List<Mutation> list = map.get(columnFamily);
  if (list == null) {
    list = new ArrayList<Mutation>();
    map.put(columnFamily, list);
  }
  list.add(mutation);
}
スーパーカラムの場合、Deletionにスーパーカラム名を指定することが出来る。

cassandra CLIのsetコマンドでは「System.currentTimeMillis() * 1000」によってタイムスタンプが設定されるので、自分のアプリから更新するときも同様にしておくのが無難だと思う。

データの登録(更新)では、タイムスタンプが新しい(タイムスタンプの値が大きい)データが(最終的に)優先される。
以下の例では、登録順序は"44"のデータの方が後から登録されているがタイムスタンプは古いので、DB上は前のデータの方が残る。

	value = "43".getBytes(UTF8);
	client.insert(KS_KEYSPACE1, key, colPath, value, timestamp,     ConsistencyLevel.ONE);

	value = "44".getBytes(UTF8);
	client.insert(KS_KEYSPACE1, key, colPath, value, timestamp - 1, ConsistencyLevel.ONE);

したがって、レコードを削除する時も、その時点のタイムスタンプ(日時)をセットする必要がある。
(次に登録したデータのタイムスタンプが削除時のタイムスタンプより古いと、削除の方が優先される)


HBaseにはレコードのロック(行ロック)の機能があるが、Cassandraには無い。


Cassandraへ戻る / HBaseへ行く / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま