HadoopのMap/Reduceを使ってCassandraで集計するサンプル。
Cassandraは0.6でHadoopに対応したらしい。
Map/Reduce用のInputFormat(およびそれに関するReader等)が提供されている。
(OutputFormat系は未対応)
そして、apache-cassandra-0.6.x-src/contribの中にword_countが提供されている。
これがCassandraを使ってWordCountを行うサンプルなので、これを実行してみる。
このword_countには、WordCountとWordCountSetupという2つのクラス(Javaのソースファイル)がある。
WordCountSetupでCassandraのカラムファミリー(テーブル)にデータを作成し、WordCountでそれを読み込んで文字列の個数をカウントする。
なので、まずこの2つのソースをEclipseの適当なプロジェクトにコピーしておく。
(WordCountSetupはWordCount内の定数を参照しているので、WordCountSetupのみをコピーしても駄目)
コンパイル時に使用するライブラリーは、apache-cassandra-0.6.x./lib内のjarファイルを全て入れておく。(たぶん全部は使わないだろうけど、面倒なのでw)
ただしHadoopのjarファイル(hadoop-core-0.20.1.jar)も入っているので、もし実際の実行に異なるバージョンのHadoopを使っているなら、そのバージョンのjarファイルに置き換えた方がいいと思う。
使用するカラムファミリー(テーブル)はKeyspace1のStandard1。(つまりCassandraデフォルトのstorage-conf.xmlで用意されているカラムファミリー)
ここにtext1,text2,text3というカラム名でデータを用意し、その中の単語の個数をカウントする。(そのカラム名を持つ全ての行が読み込まれる)
カウント結果の出力先はファイル。(まだOutputFormatが提供されていないから、Cassandra上に出力されるようにはなっていない)
WordCountSetupはカラムファミリー(テーブル)にWordCount用のデータを作成(登録)するプログラム。
これは特にHadoop(Map/Reduce)は使わず、直接Cassandraにアクセスしている。
main()で始まる普通のJavaクラスなので、そのままEclipseから実行すればいい。
すると、まずはstorage-conf.xmlが見つからないという例外が出る。
Exception in thread "main" java.lang.ExceptionInInitializerError at org.apache.cassandra.service.StorageService.<clinit>(StorageService.java:113) at WordCountSetup.main(WordCountSetup.java:36) Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Cannot locate storage-conf.xml via storage-config system property or classpath lookup. at org.apache.cassandra.config.DatabaseDescriptor.<clinit>(DatabaseDescriptor.java:539) ... 2 more Caused by: java.lang.RuntimeException: Cannot locate storage-conf.xml via storage-config system property or classpath lookup. at org.apache.cassandra.config.DatabaseDescriptor.getStorageConfigPath(DatabaseDescriptor.java:155) at org.apache.cassandra.config.DatabaseDescriptor.<clinit>(DatabaseDescriptor.java:164) ... 2 more
なので、実行時のクラスパスにstorage-conf.xmlの場所を加えてやる。(C:\apache-cassandra-0.6.3\conf)
が、今後は別の例外が…。
Exception in thread "main" UnavailableException()
at org.apache.cassandra.service.StorageProxy.assureSufficientLiveNodes(StorageProxy.java:298)
at org.apache.cassandra.service.StorageProxy.mutateBlocking(StorageProxy.java:208)
at WordCountSetup.main(WordCountSetup.java:53)
どうも生きているノードが見つからないという事らしいのだが…CLIからだとちゃんとつながるので、原因不明…。
そもそもStorageService#getNaturalEndpoints()もAbstractReplicationStrategy#getWriteEndpoints()・getHintedEndpoints()も空リストを返しているので、設定が何かおかしいんだろう。
とりあえず原因究明は保留して、データはcassandra CLIから手動で入れることにする(苦笑)
set Keyspace1.Standard1['Key0']['text1'] = 'word1' set Keyspace1.Standard1['Key0']['text2'] = 'word1 word2' set Keyspace1.Standard1['Key0']['text3'] = 'word1' set Keyspace1.Standard1['Key1']['text3'] = 'word1' set Keyspace1.Standard1['Key2']['text3'] = 'word1'
text1・text2・text3がそれぞれ別々にカウントされることになる。
text1は最小限のパターン。
text2は1データ内に複数の単語というパターン。
text3は複数行にまたがるパターン。WordCountSetupだとtext3は1000件のデータを作っているのだが、手動だと面倒なので、適当な個数にしておく。
text0はカウントを行うが対象データが無いので0件になる、というパターン。なのでデータは登録しない。
WordCountがHadoopを使って単語を数えるプログラム本体。
とりあえずEclipseにソースをコピーすると、ソース上に警告が出ているのが気になるで、修正する。
public static class TokenizerMapper extends Mapper<String, SortedMap<byte[], IColumn>, Text, IntWritable> { 〜 protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) ↓ protected void setup(Context context)
このWordCountは、Keyspace1のStandard1のカラムtext0〜text3について、1カラムずつ4回のMap/Reduceを実行するようになっている。
出力先は「/tmp/word_count0」〜「/tmp/word_count3」というディレクトリー。
肝はColumnFamilyInputFormat。これがCassandra用のInputFormat。
また、読み込むキースペース名・カラムファミリー名・カラム名をセットする為にConfigHelperというクラスが用意されている。
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.thrift.SlicePredicate;
job.setInputFormatClass(ColumnFamilyInputFormat.class); ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes())); ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
これをWindowsで実行する一番お手軽な方法は、Hadoopの単独環境を使用すること。
WordCountに対するEclipseの実行構成で、以下の設定追加をしておく。
設定箇所 | 設定内容 | 設定例 | |
---|---|---|---|
クラスパス | Hadoopの単独環境用の設定ファイルのある場所 | C:\cygwin\usr\local\hadoop-0.20.2\standalone-conf | |
Hadoopのライブラリー(jarファイル) |
C:\cygwin\usr\local\hadoop-0.20.2\hadoop-0.20.2-core.jar C:\cygwin\usr\local\hadoop-0.20.2\lib\*.jar |
||
Cassandraの設定ファイルのある場所 | C:\apache-cassandra-0.6.3\conf | ||
Cassandraのライブラリー(jarファイル) | C:\apache-cassandra-0.6.3\lib\*.jar | ||
環境 | WindowsのHadoopはCygwinを使うので、その場所 | PATH | C:\cygwin\bin |
単独環境の場合はHadoopを起動しておく必要も無いので、Cassandraが起動している状態でそのまま実行すればいい。
Windowsの単独環境なので、出力先は「C:\tmp」の下になる。
text0〜text3の4つのカラムについて別々にカウントされるので、4つのディレクトリーが作られることになる。
なお、例によって、再実行する場合は「C:\tmp\word_count0〜3」を事前に消しておく必要がある。
出力ファイル | 内容 | 備考 |
---|---|---|
C:\tmp\word_count0\part-r-00000 | 0バイト | text0は1件もデータが無いので、出力ファイルは0バイトになる。 |
C:\tmp\word_count1\part-r-00000 |
word1 1 |
text1のカウント結果。 |
C:\tmp\word_count2\part-r-00000 |
word1 1 word2 1 |
text2のカウント結果。 |
C:\tmp\word_count3\part-r-00000 |
word1 5 |
←おかしい! |
さて、無事に出力されたなーと思ったら、パターン3における件数がおかしい。
データは3レコードしか作っていないので、3件になるはず。
色々試してみた結果分かったことは、複数行のデータを読み込む場合、1行を除いてその他の行全てが再度読み込まれてしまっており、それらに対して毎回Mapper#map()が呼ばれるので、その分だけ二重にカウントされてしまう。
これは実行するノードを決める為に呼ばれているCassandra$Client#describe_splits()というメソッドのバグ(CASSANDRA-1042)で、少数ノードの場合にこのような現象になってしまうらしい。(今回はWindowsマシン1台で試しており、カラムファミリーの複製数も1個)
このバグはバージョン0.6で見つかったようだが、0.6.4で修正された。[/2010-07-31]
出力ファイル | 内容 | 備考 |
---|---|---|
C:\tmp\word_count0\part-r-00000 | 0バイト | text0は1件もデータが無いので、出力ファイルは0バイトになる。 |
C:\tmp\word_count1\part-r-00000 |
word1 1 |
text1のカウント結果。 |
C:\tmp\word_count2\part-r-00000 |
word1 1 word2 1 |
text2のカウント結果。 |
C:\tmp\word_count3\part-r-00000 |
word1 3 |