S-JIS[2010-07-03/2010-07-31] 変更履歴

Cassandra Hadoopサンプル

HadoopのMap/Reduceを使ってCassandraで集計するサンプル。


WordCountサンプルの概要

Cassandraは0.6でHadoopに対応したらしい。
Map/Reduce用のInputFormat(およびそれに関するReader等)が提供されている。
(OutputFormat系は未対応)

そして、apache-cassandra-0.6.x-src/contribの中にword_countが提供されている。
これがCassandraを使ってWordCountを行うサンプルなので、これを実行してみる。


このword_countには、WordCountとWordCountSetupという2つのクラス(Javaのソースファイル)がある。
WordCountSetupでCassandraのカラムファミリー(テーブル)にデータを作成し、WordCountでそれを読み込んで文字列の個数をカウントする。

なので、まずこの2つのソースをEclipseの適当なプロジェクトにコピーしておく。
(WordCountSetupはWordCount内の定数を参照しているので、WordCountSetupのみをコピーしても駄目)
コンパイル時に使用するライブラリーは、apache-cassandra-0.6.x./lib内のjarファイルを全て入れておく。(たぶん全部は使わないだろうけど、面倒なのでw)
ただしHadoopのjarファイル(hadoop-core-0.20.1.jar)も入っているので、もし実際の実行に異なるバージョンのHadoopを使っているなら、そのバージョンのjarファイルに置き換えた方がいいと思う。


使用するカラムファミリー(テーブル)はKeyspace1のStandard1。(つまりCassandraデフォルトのstorage-conf.xmlで用意されているカラムファミリー)
ここにtext1,text2,text3というカラム名でデータを用意し、その中の単語の個数をカウントする。(そのカラム名を持つ全ての行が読み込まれる)

カウント結果の出力先はファイル。(まだOutputFormatが提供されていないから、Cassandra上に出力されるようにはなっていない)


WordCountSetup

WordCountSetupはカラムファミリー(テーブル)にWordCount用のデータを作成(登録)するプログラム。
これは特にHadoop(Map/Reduce)は使わず、直接Cassandraにアクセスしている。
main()で始まる普通のJavaクラスなので、そのままEclipseから実行すればいい。

すると、まずはstorage-conf.xmlが見つからないという例外が出る。

Exception in thread "main" java.lang.ExceptionInInitializerError
	at org.apache.cassandra.service.StorageService.<clinit>(StorageService.java:113)
	at WordCountSetup.main(WordCountSetup.java:36)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Cannot locate storage-conf.xml via storage-config system property or classpath lookup.
	at org.apache.cassandra.config.DatabaseDescriptor.<clinit>(DatabaseDescriptor.java:539)
	... 2 more
Caused by: java.lang.RuntimeException: Cannot locate storage-conf.xml via storage-config system property or classpath lookup.
	at org.apache.cassandra.config.DatabaseDescriptor.getStorageConfigPath(DatabaseDescriptor.java:155)
	at org.apache.cassandra.config.DatabaseDescriptor.<clinit>(DatabaseDescriptor.java:164)
	... 2 more

なので、実行時のクラスパスにstorage-conf.xmlの場所を加えてやる。(C:\apache-cassandra-0.6.3\conf)

が、今後は別の例外が…。

Exception in thread "main" UnavailableException()
	at org.apache.cassandra.service.StorageProxy.assureSufficientLiveNodes(StorageProxy.java:298)
	at org.apache.cassandra.service.StorageProxy.mutateBlocking(StorageProxy.java:208)
	at WordCountSetup.main(WordCountSetup.java:53)

どうも生きているノードが見つからないという事らしいのだが…CLIからだとちゃんとつながるので、原因不明…。
そもそもStorageService#getNaturalEndpoints()もAbstractReplicationStrategy#getWriteEndpoints()・getHintedEndpoints()も空リストを返しているので、設定が何かおかしいんだろう。


とりあえず原因究明は保留して、データはcassandra CLIから手動で入れることにする(苦笑)

set Keyspace1.Standard1['Key0']['text1'] = 'word1'

set Keyspace1.Standard1['Key0']['text2'] = 'word1 word2'

set Keyspace1.Standard1['Key0']['text3'] = 'word1'
set Keyspace1.Standard1['Key1']['text3'] = 'word1'
set Keyspace1.Standard1['Key2']['text3'] = 'word1'

text1・text2・text3がそれぞれ別々にカウントされることになる。

text1は最小限のパターン。
text2は1データ内に複数の単語というパターン。
text3は複数行にまたがるパターン。WordCountSetupだとtext3は1000件のデータを作っているのだが、手動だと面倒なので、適当な個数にしておく。
text0はカウントを行うが対象データが無いので0件になる、というパターン。なのでデータは登録しない。


WordCount

WordCountがHadoopを使って単語を数えるプログラム本体。

とりあえずEclipseにソースをコピーすると、ソース上に警告が出ているのが気になるで、修正する。

	public static class TokenizerMapper extends Mapper<String, SortedMap<byte[], IColumn>, Text, IntWritable>
	{
〜
		protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
↓
		protected void setup(Context context)

このWordCountは、Keyspace1のStandard1のカラムtext0〜text3について、1カラムずつ4回のMap/Reduceを実行するようになっている。
出力先は「/tmp/word_count0」〜「/tmp/word_count3」というディレクトリー。


肝はColumnFamilyInputFormat。これがCassandra用のInputFormat
また、読み込むキースペース名・カラムファミリー名・カラム名をセットする為にConfigHelperというクラスが用意されている。

import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
	job.setInputFormatClass(ColumnFamilyInputFormat.class);

	ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
	SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
	ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);

これをWindowsで実行する一番お手軽な方法は、Hadoopの単独環境を使用すること。
WordCountに対するEclipseの実行構成で、以下の設定追加をしておく。

設定箇所 設定内容 設定例
クラスパス Hadoopの単独環境用の設定ファイルのある場所 C:\cygwin\usr\local\hadoop-0.20.2\standalone-conf
Hadoopのライブラリー(jarファイル) C:\cygwin\usr\local\hadoop-0.20.2\hadoop-0.20.2-core.jar
C:\cygwin\usr\local\hadoop-0.20.2\lib\*.jar
Cassandraの設定ファイルのある場所 C:\apache-cassandra-0.6.3\conf
Cassandraのライブラリー(jarファイル) C:\apache-cassandra-0.6.3\lib\*.jar
環境 WindowsのHadoopはCygwinを使うので、その場所 PATH C:\cygwin\bin

単独環境の場合はHadoopを起動しておく必要も無いので、Cassandraが起動している状態でそのまま実行すればいい。

Windowsの単独環境なので、出力先は「C:\tmp」の下になる。
text0〜text3の4つのカラムについて別々にカウントされるので、4つのディレクトリーが作られることになる。
なお、例によって、再実行する場合は「C:\tmp\word_count0〜3」を事前に消しておく必要がある。

実行結果(0.6.3)
出力ファイル 内容 備考
C:\tmp\word_count0\part-r-00000 0バイト text0は1件もデータが無いので、出力ファイルは0バイトになる。
C:\tmp\word_count1\part-r-00000
word1	1
text1のカウント結果。
C:\tmp\word_count2\part-r-00000
word1	1
word2	1
text2のカウント結果。
C:\tmp\word_count3\part-r-00000
word1	5
おかしい!

さて、無事に出力されたなーと思ったら、パターン3における件数がおかしい。
データは3レコードしか作っていないので、3件になるはず。

色々試してみた結果分かったことは、複数行のデータを読み込む場合、1行を除いてその他の行全てが再度読み込まれてしまっており、それらに対して毎回Mapper#map()が呼ばれるので、その分だけ二重にカウントされてしまう。
これは実行するノードを決める為に呼ばれているCassandra$Client#describe_splits()というメソッドのバグ(CASSANDRA-1042)で、少数ノードの場合にこのような現象になってしまうらしい。(今回はWindowsマシン1台で試しており、カラムファミリーの複製数も1個)
このバグはバージョン0.6で見つかったようだが、0.6.4で修正された。[/2010-07-31]

実行結果(0.6.4)[2010-07-31]
出力ファイル 内容 備考
C:\tmp\word_count0\part-r-00000 0バイト text0は1件もデータが無いので、出力ファイルは0バイトになる。
C:\tmp\word_count1\part-r-00000
word1	1
text1のカウント結果。
C:\tmp\word_count2\part-r-00000
word1	1
word2	1
text2のカウント結果。
C:\tmp\word_count3\part-r-00000
word1	3
 

Cassandraへ戻る / HBaseへ行く / 技術メモへ戻る
メールの送信先:ひしだま