HadoopのMap/Reduceを使ってHBaseで集計するサンプル。
|
|
このページのサンプルの目的は、HadoopのMap/Reduceを使ってHBase上のデータの集計を行うこと。
→集計のコーディング例
集計を行う為には元となるデータを作らなければならないので、CSVファイルから読み込んでHBaseに書き込んでみる。
→インポートのコーディング例・サンプルテーブル
また、テーブルに書かれたデータを確認するにはHBase Shellから見ればいい
と思ったら、数値等のバイナリデータは数値の形式では表示されないので、テキストファイルへ出力するプログラムも作ってみる。
→エクスポートのコーディング例
HBaseとファイル間の入出力であれば HadoopのMap/Reduceを使わなくてもHBaseのデータアクセスだけ出来れば普通に行える事だが、大量データを分散してアクセスできるのがHadoopの売りでもあることだし、Hadoopを使ってコーディングしてみる。
(バイナリーファイルとの入出力であれば、HBase標準でサンプルが用意されている→Export・Importクラス)
Hadoopのファイル入出力で集計するサンプルで使ったような、学生の成績表(試験の点数)を集計する例を考えてみる。
#番号,名前, 数学,国語,理科,社会,英語 101,天才, 100, 100, 100, 100, 100 201,無気力, 5, 30, 10, 20, 15 102,ひしだま, 90, 85, 80, 50, 10 202,hoge, 10, 30, 25, 45, 20 103,foo, 60, 60, 60, 60, 25 204,xxx, 80, 77, 90, 40, 44 205,yyy, 65, 90, 55, 80, 65 104,zzz, 40, 60, 60, 70, 20
キーは学生番号で、情報として学生の名前とかを持たせた上で、各教科(数学とか国語とか)の点数と合計点数を入れる項目を用意する。
HBaseは同じ項目(family)に複数のデータを入れられるので、点数という1つのfamilyでqualifierに各教科を入れるという構造も考えられるが、試験ってのは学期毎とかに複数あるので、qualifierには試験名を入れることにする。
ROW (学生番号) |
個人 | 数学 | 国語 | 理科 | 社会 | 英語 | 合計 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
101 |
|
|
|
|
|
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
102 |
|
|
|
|
|
|
|
ROW (学生番号) |
個人 | 点数 | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
101 |
|
|
||||||||||||||||||||
102 |
|
|
HBaseのテーブルは、qualifierの中はデータ追加で自動的に増やせるが、family(列)はテーブルを一旦使用不可にして項目追加を行わないと増やせない。
したがって変化の無いものを列(family)にする方がいいんじゃないかと思う。
(試験名の方を可変にしておけば、模試とか補習(苦笑)とかにも対応できるし)
今回のサンプル用に、HBase Shellからcreateコマンドを実行するか、HBaseのテーブル作成APIを使ってテーブルを作成しておく。
hbase(main):001:0> create 'student','personal','suugaku','kokugo','rika','shakai','eigo','total5'
package jp.hishidama.hadoop.hbase; import org.apache.hadoop.hbase.util.Bytes; public class Student { public static final String TABLE_NAME = "student"; public static final byte[] TABLE_NAME_BYTES = Bytes.toBytes(TABLE_NAME); public static final byte[] 個人 = Bytes.toBytes("personal"); public static final byte[] 名前 = Bytes.toBytes("name"); public static final byte[] 数学 = Bytes.toBytes("suugaku"); public static final byte[] 国語 = Bytes.toBytes("kokugo"); public static final byte[] 理科 = Bytes.toBytes("rika"); public static final byte[] 社会 = Bytes.toBytes("shakai"); public static final byte[] 英語 = Bytes.toBytes("eigo"); public static final byte[] 合計 = Bytes.toBytes("total5"); }
import static jp.hishidama.hadoop.hbase.Student.*; 〜 static void createTable() throws IOException { // 設定ファイルを用いてHbaseへ接続 HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("C:/cygwin/usr/local/hbase-0.20.3/conf/hbase-site.xml")); HBaseAdmin admin = new HBaseAdmin(conf); // 既に存在していたら何もしない if (admin.tableExists(TABLE_NAME_BYTES)) { return; } // テーブルを作成 HTableDescriptor desc = new HTableDescriptor(TABLE_NAME_BYTES); desc.addFamily(new HColumnDescriptor(個人)); desc.addFamily(new HColumnDescriptor(数学)); desc.addFamily(new HColumnDescriptor(国語)); desc.addFamily(new HColumnDescriptor(理科)); desc.addFamily(new HColumnDescriptor(社会)); desc.addFamily(new HColumnDescriptor(英語)); desc.addFamily(new HColumnDescriptor(合計)); admin.createTable(desc); }
HadoopのMapperでCSVファイルを読み込み、HBaseのテーブルに書き込む例。
package jp.hishidama.hadoop.hbase;
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import static jp.hishidama.hadoop.hbase.Student.*;
public class HBaseImport { private static final String KEY_試験名 = HBaseImport.class + "_name";
public static class Map extends Mapper<LongWritable, Text, NullWritable, Put> { private static final byte[][] CSV = { 数学, 国語, 理科, 社会, 英語 }; private byte[] 試験名; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); 試験名 = Bytes.toBytes(context.getConfiguration().get(KEY_試験名)); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if (line.startsWith("#") || line.trim().isEmpty()) { return; } |
|
// CSVの複数項目を1回のPutで更新する例 Put put = null; StringTokenizer tokenizer = new StringTokenizer(line, ","); for (int i = 0; tokenizer.hasMoreTokens(); i++) { String s = tokenizer.nextToken(); switch (i) { case 0: // 学生番号 byte[] no = Bytes.toBytes(s); put = new Put(no); break; case 1: // 名前 put.add(個人, 名前, Bytes.toBytes(s)); break; default: byte[] family = CSV[i - 2]; int ten = Integer.parseInt(s.trim()); put.add(family, 試験名, Bytes.toBytes(ten)); break; } } context.write(NullWritable.get(), put); |
// CSVの項目毎にPutして更新する例 Put put = null; byte[] no = null; StringTokenizer tokenizer = new StringTokenizer(line, ","); for (int i = 0; tokenizer.hasMoreTokens(); i++) { String s = tokenizer.nextToken(); switch (i) { case 0: // 学生番号 no = Bytes.toBytes(s); break; case 1: // 名前 put = new Put(no); put.add(個人, 名前, Bytes.toBytes(s)); context.write(NullWritable.get(), put); break; default: put = new Put(no); byte[] family = CSV[i - 2]; int ten = Integer.parseInt(s.trim()); put.add(family, 試験名, Bytes.toBytes(ten)); context.write(NullWritable.get(), put); break; } } |
} } |
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME); //出力先テーブル名を指定 conf.set(KEY_試験名, args[0]); Job job = new Job(conf, "hbase import"); job.setJarByClass(HBaseImport.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Put.class); job.setMapperClass(Map.class); job.setNumReduceTasks(0); job.setInputFormatClass(TextInputFormat.class); // ファイルから入力 job.setOutputFormatClass(TableOutputFormat.class); // HBaseへ出力 FileInputFormat.setInputPaths(job, new Path(args[1])); boolean success = job.waitForCompletion(true); System.out.println(success); } }
ファイルから入力するので、InputFormatにはTextInputFormatを使う。(ファイル名はmain()のargs[1]で渡すものとする)
この辺りはHadoopでファイルを読み込むのと全く同じ。
HBaseへの出力にはTableOutputFormatを使用する。
このクラスを使う場合、出力先(更新対象)のテーブル名をconfにセットしておく。(パラメーター名TableOutputFormat.OUTPUT_TABLE)
TableOutputFormatへの入力型は、値はPut(またはDelete)で、キーは何でもよい。
したがってここではキーにはNullWritableを指定している。
ただ単にデータをテーブルへ登録するだけの場合は集計は無いので、Reducerは不要。なのでMapperの出力(Put)がそのままTableOutputFormatへ渡り、テーブルのデータが更新される。
→TableOutputFormatの危険性 [2012-04-28]
今回の例では、main()のargs[0]で試験名を渡すことにしている。
この試験名はテーブルの更新対象項目のqualifierに使いたいので、Mapperに渡す必要がある。
Mapper(やReducer)の引数contextからcontext.getConfiguration()でconfを取得できるので、main()でconfに値をセットしておけばよい。
ただ、Mapper#map()で毎回取り直してbyte[]に変換するのは無駄なので、Mapper#setup()で取り出してフィールドに保持しておけばいい。
Mapper#map()の中でのPutへの値のセットの仕方(context.write()のタイミング)は、2通り考えられる。
1つは(Putには同一のROWに対して複数の更新をadd()できるので、)まとめてadd()して1回だけcontext.write()する方法。
もう1つは更新毎に毎回context.write()を呼び出す方法。
RDBなら1レコード内の複数項目更新なら1回だけUPDATEする(前者の方法)のが常識だが、HBaseはレコード単位でデータを保持しているのではないので、もしかすると後者の方が効率がいいかもしれない。(未検証)
(TableOutputFormatのデフォルト状態では、最後のclose()時にまとめてコミットされる模様。という事は、Putのインスタンス数が減る前者の方がいいのかもしれない。
むしろ、コミッターを変更して毎回コミットするようにしたら後者の方がいい?
でも1レコードずつコミットしてたら負荷がかかるかも?)
※ちなみに、このMapperでは点数と一緒に学生名も登録しているが、あるべき設計の姿で言えば、マスター登録専用(学生の名前とか性別とかの情報を登録する)のプログラムが別にあるべきでしょうね。
HadoopのMapperでHBaseのテーブルからデータを読み込み、CSVファイルとして書き込む例。
package jp.hishidama.hadoop.hbase;
import java.io.IOException; import java.util.Arrays; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import static jp.hishidama.hadoop.hbase.Student.*;
public class HBaseExport { private static final TreeMap<byte[], String> CSV_MAP = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR); static { int n = 0; CSV_MAP.put(名前, Integer.toString(n++)); CSV_MAP.put(数学, Integer.toString(n++)); CSV_MAP.put(国語, Integer.toString(n++)); CSV_MAP.put(理科, Integer.toString(n++)); CSV_MAP.put(社会, Integer.toString(n++)); CSV_MAP.put(英語, Integer.toString(n++)); CSV_MAP.put(合計, Integer.toString(n++)); }
public static class Map extends TableMapper<Text, Text> { private Text key = new Text(); private Text word = new Text(); @Override protected void map(ImmutableBytesWritable row, Result result, Context context) throws IOException, InterruptedException { String no = Bytes.toString(row.get()); //学生番号 key.set(no); for (KeyValue kv : result.raw()) { //スキャンされた全項目をループ処理 byte[] k; String s; if (kv.matchingQualifier(名前)) { k = kv.getQualifier(); s = Bytes.toString(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); } else { k = kv.getFamily(); int n = Bytes.toInt(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); s = Integer.toString(n); } word.set(CSV_MAP.get(k) + "=" + s); context.write(key, word); } } }
public static class Reduce extends Reducer<Text, Text, Text, Text> { private Text word = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String[] csv = new String[CSV_MAP.size()]; for (Text value : values) { String[] ss = value.toString().split("="); csv[Integer.parseInt(ss[0])] = ss[1]; } // String s = StringUtils.arrayToString(csv); // word.set(Bytes.toBytes(s)); String s = Arrays.toString(csv); word.set(Bytes.toBytes(s.substring(1, s.length() - 1))); context.write(key, word); } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME); //入力元テーブル名を指定 // スキャンする項目を列挙 String name = args[0]; //試験名 StringBuilder sb = new StringBuilder(128); for (byte[] s : CSV_MAP.keySet()) { if (sb.length() > 0) { sb.append(' '); } if (Bytes.equals(s, 名前)) { sb.append(Bytes.toString(個人)).append(':').append(Bytes.toString(s)); } else { sb.append(Bytes.toString(s)) .append(':').append(name); } } conf.set(TableInputFormat.SCAN_COLUMNS, sb.toString()); //スキャンする項目を指定 Job job = new Job(conf, "hbase export"); job.setJarByClass(HBaseExport.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TableInputFormat.class); // HBaseから入力 job.setOutputFormatClass(TextOutputFormat.class); // ファイルへ出力 FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean success = job.waitForCompletion(true); System.out.println(success); } }
101 天才, 100, 100, 100, 100, 100, null 102 ひしだま, 90, 85, 80, 50, 10, null 103 foo, 60, 60, 60, 60, 25, null 104 zzz, 40, 60, 60, 70, 20, null 201 無気力, 5, 30, 10, 20, 15, null 202 hoge, 10, 30, 25, 45, 20, null 204 xxx, 80, 77, 90, 40, 44, null 205 yyy, 65, 90, 55, 80, 65, null
ファイルへ出力するので、OutputFormatにはTextOutputFormatを使う。(ファイル名はmain()のargs[1]で渡すものとする)
この辺りはHadoopでファイルを書き出すのと全く同じ。
(キーと値の間はタブで区切られている。完全なCSVファイルにしたいならCommaTextOutputFormatを使う
とか)
HBaseからテーブルを読み込む(スキャンする)にはTableInputFormatを使用する。
このクラスを使う場合、読み込む対象のテーブル名をconfにセットしておく。(パラメーター名TableInputFormat.INPUT_TABLE)
TableInputFormatからの出力型(Mapperへの入力型)は、キーはImmutableBytesWritableでROWが入ってくる。値はResult。
通常のMapperにそれらの型を指定してもいいのだが、TableMapperというクラスが用意されているので、それを使うのがいいだろう。
テーブルからどの項目(family:
qualifier)をスキャンするかは、conf(パラメーター名TableInputFormat.SCAN_COLUMNS)に指定できる。
これは列名「family:
qualifier」をスペース区切りで複数指定できる。
今回のサンプルでは試験名をqualifierに使っているので、(インポートのサンプルと同様に)main()のargs[0]で試験名を渡している。
Mapper#map()には、スキャンされたデータがROW毎に入ってくる。
今回のサンプルでは、出力のキーは学生番号、値は「CSVの列番号=点数」という文字列にしている。
そうするとReducerには学生番号毎に全点数が入ってくるので、値を「=」で分離し、CSVの列番号の位置に点数を配置している。
(本来は、こういうのは「=」で区切った文字列にするのではなく、クラスにすべきだと思うけど…)
HBaseのテーブルの各行を集計してテーブルを更新する例。
package jp.hishidama.hadoop.hbase;
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer; import static jp.hishidama.hadoop.hbase.Student.*;
public class HBaseRecordSum { private static final String KEY_試験名 = HBaseRecordSum.class + "_name";
public static class Map extends TableMapper<ImmutableBytesWritable, IntWritable> { private IntWritable intw = new IntWritable(); @Override protected void map(ImmutableBytesWritable row, Result result, Context context) throws IOException, InterruptedException { for (KeyValue kv : result.raw()) { //スキャンされた全項目をループ処理 int n = Bytes.toInt(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); intw.set(n); context.write(row, intw); } } }
public static class Reduce extends TableReducer<ImmutableBytesWritable, IntWritable, NullWritable> { private byte[] 試験名; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); 試験名 = Bytes.toBytes(context.getConfiguration().get(KEY_試験名)); } @Override protected void reduce(ImmutableBytesWritable row, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } Put put = new Put(row.get()); put.add(合計, 試験名, Bytes.toBytes(sum)); context.write(NullWritable.get(), put); } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME); conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME); String name = args[0]; conf.set(KEY_試験名, name); conf.set(TableInputFormat.SCAN_COLUMNS, Bytes.toString(数学) + ":" + name + " " + Bytes.toString(国語) + ":" + name + " " + Bytes.toString(理科) + ":" + name + " " + Bytes.toString(社会) + ":" + name + " " + Bytes.toString(英語) + ":" + name ); Job job = new Job(conf, "hbase personal sum"); job.setJarByClass(HBaseRecordSum.class); job.setMapOutputKeyClass (ImmutableBytesWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass (NullWritable.class); job.setOutputValueClass(Put.class); job.setMapperClass (Map.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass (Reduce.class); job.setInputFormatClass (TableInputFormat.class); // HBaseから入力 job.setOutputFormatClass(TableOutputFormat.class); // HBaseへ出力 boolean success = job.waitForCompletion(true); System.out.println(success); } }
エクスポート(テーブルからのスキャン)とインポート(テーブルへの更新)のテーブル処理部分を抜き出して合わせただけ。
ただ、集計があるのでReducerを用意している。
そしてテーブルからの入力を受け取るTableMapperと同様に、テーブルを更新する為のTableReducerを使用している。
テーブルを扱う為の初期化処理もある程度共通なので、TableMapReduceUtilという初期化用のクラスが提供されている。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer; import static jp.hishidama.hadoop.hbase.Student.*;
public static void main(String[] args) throws Exception {
Job job = new Job();
job.setJobName("hbase personal sum");
job.setJarByClass(HBaseRecordSum2.class);
Configuration conf = job.getConfiguration();
String name = args[0];
conf.set(KEY_試験名, name);
byte[] qualifier = Bytes.toBytes(name);
Scan scan = new Scan();
scan.addColumn(数学, qualifier);
scan.addColumn(国語, qualifier);
scan.addColumn(理科, qualifier);
scan.addColumn(社会, qualifier);
scan.addColumn(英語, qualifier);
TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, Map.class, ImmutableBytesWritable.class, IntWritable.class, job);
TableMapReduceUtil.initTableReducerJob(TABLE_NAME, Reduce.class, job /* , HRegionPartitioner.class */);
job.setCombinerClass(IntSumReducer.class);
boolean success = job.waitForCompletion(true);
System.out.println(success);
}
initTableMapperJob()は、入力(TableInputFormatおよびMapper)の初期化を行う。
すなわち、入力テーブル名とMapperクラス、Mapperの出力型を初期化する。
(TableInputFormat.INPUT_TABLE・setMapperClass()・setMapOutputKeyClass()・setMapOutputValueClass()・setInputFormatClass())
また、テーブルからの取得条件はScanクラスで指定する。
→Scanの条件指定の例
initTableReducerJob()は、出力(ReducerおよびTableOutputFormat)の初期化を行う。
すなわち、更新テーブル名とReducerクラスを初期化する。必要であればPartitionerも指定できる。HRegionPartitionerはHBase用のクラスなので、効果が期待できるのかな?
(TableOutputFormat.OUTPUT_TABLE・setReducerClass()・setPartitionerClass()・setOutputFormatClass())
Combinerは指定できないので、必要であれば別途初期化する。
ちなみに、ConfigurationをJobのコンストラクターに渡すと、内部でクローンが作られるようだ。
つまり以下のようなコーディングでは、セットした値がMap/Reduce実行時には使用されない。
Configuration conf = new Configuration();
Job job = new Job(conf, "sample");
conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
// confとjob.getConfiguration()はインスタンスが異なる。
上記サンプルを実行する為の(単独環境用)build.xml。
Eclipseのプロジェクトでclassesディレクトリーにコンパイルしたファイルを置き、binディレクトリーの下にこのbuild.xmlを置く想定。
また、入力ファイルの置いてあるディレクトリーはHadoopのファイルから読み込むサンプルの場所を想定している。
C:\workspace\HadoopSample
│ .classpath
│ .project
│
├─bin
│ build.xml
│
├─classes
│
└─src |
C:\cygwin\home\hadoop ├─rowsum │ │ rowsum.jar │ │ │ ├─input │ │ file01.utf8.txt │ │ │ └─output │ .part-m-00000.crc │ part-m-00000.sjis.txt │ └─student └─output .part-r-00000.crc part-r-00000.sjis.txt |
<?xml version="1.0" encoding="Windows-31J"?>
<project name="hadoop hbase" basedir=".">
<property environment="env" />
<property name="cygwin" location="C:\cygwin" />
<property name="hadoop.home" location="${cygwin}/usr/local/hadoop-0.20.1" />
<property name="hbase.home" location="${cygwin}/usr/local/hbase-0.20.3" />
<property name="target.home" location="${cygwin}/home/hadoop/student" />
<property name="target.input" location="${target.home}/../rowsum/input" />
<property name="target.output" location="${target.home}/output" />
<property name="main.class.import" value="jp.hishidama.hadoop.hbase.HBaseImport" />
<property name="main.class.export" value="jp.hishidama.hadoop.hbase.HBaseExport" />
<property name="main.class.sum5" value="jp.hishidama.hadoop.hbase.HBaseRecordSum" />
<property name="shiken.name" value="2010-1-1" />
<path id="class.path">
<pathelement location="../classes" />
<fileset dir="${hadoop.home}">
<include name="hadoop-*-core.jar" />
<include name="lib/**/*.jar" />
</fileset>
<fileset dir="${hbase.home}">
<include name="*.jar" />
<exclude name="*test.jar" />
<include name="lib/zookeeper*.jar" />
</fileset>
<pathelement location="${hbase.home}/conf" /> <!-- 設定ファイルを読み込ませる為[2010-03-30] -->
</path>
<target name="0.hbase_start"> <exec executable="${cygwin}/bin/cygpath.exe" outputproperty="start.sh"> <arg value="-u" /> <arg value="${hbase.home}/bin/start-hbase.sh" /> </exec> <exec executable="${cygwin}/bin/bash.exe"> <arg value="--login" /> <arg value="${start.sh}" /> </exec> </target> <target name="0.hbase_stop"> <exec executable="${cygwin}/bin/cygpath.exe" outputproperty="stop.sh"> <arg value="-u" /> <arg value="${hbase.home}/bin/stop-hbase.sh" /> </exec> <exec executable="${cygwin}/bin/bash.exe"> <arg value="--login" /> <arg value="${stop.sh}" /> </exec> </target>
<target name="import" depends="1.1.execute" /> <target name="1.1.execute" description="import"> <java classname="${main.class.import}" fork="true" maxmemory="1024m"> <arg value="${shiken.name}" /> <arg value="${target.input}" /> <classpath refid="class.path" /> <sysproperty key="hadoop.log.dir" path="${hadoop.home}/logs" /> <sysproperty key="hadoop.log.file" value="hadoop.log" /> <sysproperty key="hadoop.home.dir" path="${hadoop.home}/" /> <sysproperty key="hadoop.id.str" value="${env.COMPUTERNAME}" /> <sysproperty key="hadoop.root.logger" value="INFO,console" /> <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" /> <env key="PATH" path="${cygwin}/bin" /> </java> </target>
<target name="export" depends="2.1.rm_output,2.2.execute,2.3.output_encode,2.4.cat_output" /> <target name="2.1.rm_output" description="outputディレクトリーを削除する"> <delete dir="${target.output}" failonerror="true" /> </target> <target name="2.2.execute" description="hadoopを実行する"> <java classname="${main.class.export}" fork="true" maxmemory="1024m"> <arg value="${shiken.name}" /> <arg value="${target.output}" /> <classpath refid="class.path" /> <sysproperty key="hadoop.log.dir" path="${hadoop.home}/logs" /> <sysproperty key="hadoop.log.file" value="hadoop.log" /> <sysproperty key="hadoop.home.dir" path="${hadoop.home}/" /> <sysproperty key="hadoop.id.str" value="${env.COMPUTERNAME}" /> <sysproperty key="hadoop.root.logger" value="INFO,console" /> <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" /> <env key="PATH" path="${cygwin}/bin" /> </java> </target> <target name="2.3.output_encode" description="outputの文字コードをSJISに変換する"> <copy todir="${target.output}" encoding="UTF-8" outputencoding="Windows-31J"> <fileset dir="${target.output}"> <include name="part*" /> <exclude name="*.sjis.txt" /> </fileset> <mapper type="glob" from="*" to="*.sjis.txt" /> </copy> <delete> <fileset dir="${target.output}"> <include name="part*" /> <exclude name="*.sjis.txt" /> </fileset> </delete> </target> <target name="2.4.cat_output" description="outputの内容を表示する"> <exec executable="${cygwin}/bin/cat.exe" dir="${target.output}"> <arg value="*" /> </exec> </target>
<target name="sum" depends="3.1.execute" /> <target name="3.1.execute" description="sum"> <java classname="${main.class.sum5}" fork="true" maxmemory="1024m"> <arg value="${shiken.name}" /> <classpath refid="class.path" /> <sysproperty key="hadoop.log.dir" path="${hadoop.home}/logs" /> <sysproperty key="hadoop.log.file" value="hadoop.log" /> <sysproperty key="hadoop.home.dir" path="${hadoop.home}/" /> <sysproperty key="hadoop.id.str" value="${env.COMPUTERNAME}" /> <sysproperty key="hadoop.root.logger" value="INFO,console" /> <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" /> <env key="PATH" path="${cygwin}/bin" /> </java> </target> </project>
HBaseを使ったMap/ReduceのプログラムをHadoopのシェルから実行すると、HBaseのjarファイルが見つからなくて例外が発生する。[2010-03-01]
なので、HadoopのクラスパスにHBaseのjarファイルを追加してやる必要がある。
また、HBASE_HOME/confをクラスパスに加えておくと、hbase-default.xmlとhbase-site.xmlがデフォルトで読み込まれる。[2010-03-30]
# Extra Java CLASSPATH elements. Optional.
# export HADOOP_CLASSPATH=
HBASE_HOME=/usr/local/hbase-0.20.3
export HADOOP_CLASSPATH=$HBASE_HOME/hbase-0.20.3.jar:$HBASE_HOME/lib/zookeeper-3.2.2.jar:$HBASE_HOME/conf
$ cd /home/hadoop/student
$ ls
studnet.jar
$ /usr/local/hadoop/bin/hadoop jar studnet.jar jp.hishidama.hadoop.hbase.HBaseExport 2010-1-1 output
〜ログがいっぱい出力される〜
$ cat output/*
101 天才, 100, 100, 100, 100, 100, 500
102 ひしだま, 90, 85, 80, 50, 10, 315
103 foo, 60, 60, 60, 60, 25, 265
104 zzz, 40, 60, 60, 70, 20, 250
201 無気力, 5, 30, 10, 20, 15, 80
202 hoge, 10, 30, 25, 45, 20, 130
204 xxx, 80, 77, 90, 40, 44, 331
205 yyy, 65, 90, 55, 80, 65, 355
HBaseのHBASE_HOME/hbase-0.20.3.jarには、Map/Reduceのサンプルとなるクラスが入っている。[2010-03-01]
(実際はサンプルというか、ツールだな^^;)
これを実行するにはhadoopシェルにZooKeeperのjarファイルを追加しておく必要がある。
クラス名 | 内容 |
---|---|
org.apache.hadoop.hbase.mapreduce.RowCounter | テーブルのレコード数をカウントする。 第1引数にテーブル名、第2引数以降(省略可)には条件となるcolumnを指定する。 |
org.apache.hadoop.hbase.mapreduce.Export | テーブルをバイナリーファイル(シーケンスファイル)に出力する。 第1引数にテーブル名、第2引数に出力先ディレクトリーを指定する。 |
org.apache.hadoop.hbase.mapreduce.Import | エクスポートされたバイナリーファイル(シーケンスファイル)をテーブルにインポートする。 第1引数にテーブル名、第2引数に入力元ディレクトリーを指定する。 |
org.apache.hadoop.hbase.migration.nineteen.HStoreFileToStoreFile | バージョン0.19のHStoreFileを0.20のStoreFileに変換するものらしい。 |
org.apache.hadoop.hbase.mapreduce.Driver | 上記のクラスを実行するクラス。 HBASE_HOME/hbase-0.20.3.jarのMain-Classにはこのクラスが指定されている。 第1引数で実行するコマンド、第2引数以降にそのコマンド専用のオプションを指定する。 rowcounter …RowCounterexport …Exportimport …Importhsf2sf …HStoreFileToStoreFile |
$ /usr/local/hadoop/bin/hadoop jar $(cygpath -w /usr/local/hbase-0.20.3/hbase-0.20.3.jar) export student data
$ ls -l data/
total 8
-rwxrwxrwx 1 hishidama なし 4761 2010-03-01 23:56 part-m-00000
$ /usr/local/hadoop/bin/hadoop jar $(cygpath -w /usr/local/hbase-0.20.3/hbase-0.20.3.jar) import student data