Huahin Coreは、Huahin Frameworkの、HadoopのMapReduceをラップしたフレームワーク本体。
Huahin CoreのページやGitHubのhuahin-coreを見ると、pom.xmlにdependencyを記述してMavenでダウンロードするか、gitコマンドを使ってGitHubからソースコードを取ってくることになるようだ。
が、自分の(VMware上の)Linux開発環境ではgitコマンドが入っていないし、pom.xmlを一から書く知識も無いので(爆)、別の方法を採ることにする。
$ unzip huahin-huahin-core-huahin-core-0.1.0-2-g764eb91.zip
サンプルのソースを見てみる。
public class RankingJobTool extends SimpleJobTool { protected String setInputPath(String[] args) { return args[0]; } protected String setOutputPath(String[] args) { return args[1]; } protected void setup() throws Exception { final String[] labels = new String[] { "USER", "DATE", "REFERER", "URL" }; // Labeled the first job SimpleJob job1 = addJob(labels, StringUtil.TAB); job1.setFilter(FirstFilter.class); job1.setSummaizer(FirstSummarizer.class); // second job SimpleJob job2 = addJob(); job2.setSummaizer(SecondSummarizer.class); } }
SimpleJobToolを継承したクラスを作る。
SimpleJobToolは、HadoopのTool・ToolRunnerの仕組みをラップしたクラスのようだ。
setInputPath()・setOutputPath()で入力・出力のディレクトリーを返すようにするらしい。argsには、mainメソッドの引数が入ってくるのだろう。
(setというメソッド名だが、セッターではない^^;)
addJob()を呼び出すと、Hadoopジョブが登録できるっぽい。
引数に(たぶん入力レコードの)フィールド名一覧と区切り文字を指定していると思われる。
2個目のaddJob()では特に何も指定していないが、たぶん自動的に1個目の出力レコードの情報を引き継いでくれるのだろう。シンプルで素晴らしい(笑)
FilterはMapperクラス、SummarizerはReducerクラスに相当するようだ。
(setSummaizer(サマイザー)って、つづりが間違っているような気がするが^^;)
では、例によってWordCountを作ってみる。
ToolRunnerで実行されるToolクラス相当。 (SimpleJobToolは、Toolインターフェースを実装し、Configuredを継承している)
package example; import org.huahinframework.core.SimpleJob; import org.huahinframework.core.SimpleJobTool;
public class WordCountTool extends SimpleJobTool { @Override protected String setInputPath(String[] args) { return args[0]; } @Override protected String setOutputPath(String[] args) { return args[1]; }
@Override protected void setup() throws Exception { final String[] labels = new String[] { "TEXT" }; SimpleJob job = addJob(labels, "\n"); job.setFilter(WordFilter.class); job.setSummaizer(WordSummarizer.class); } }
addJob()の第1引数でフィールド名(ラベル)を指定する。
第2引数にフィールド区切り文字を指定するようだが、WordCountの場合は区切る必要が無いので、データとしては有り得ない文字(改行コード)を指定してみた。
setup()では、Toolインターフェースのrunメソッドのうち、実行前の設定部分を記述するイメージ。
(実行自体は別途Jobクラスを用意し、そちらから行われる)
Mapper相当のクラス。(FilterはMapperを継承している)
package example; import java.io.IOException; import java.util.StringTokenizer; import org.huahinframework.core.Filter; import org.huahinframework.core.Writer; import org.huahinframework.core.io.Record;
public class WordFilter extends Filter { @Override public void filterSetup() { } @Override public void init() { }
@Override public void filter(Record record, Writer writer) throws IOException, InterruptedException { String text = record.getValueString("TEXT"); StringTokenizer st = new StringTokenizer(text); while (st.hasMoreTokens()) { String word = st.nextToken(); Record emitRecord = new Record(); emitRecord.addGrouping("WORD", word); emitRecord.addValue("COUNT", 1); writer.write(emitRecord); } } }
filterSetup()は、Mapper#setup()相当。
init()は、Mapper#map()の先頭で呼ばれる。
filter()はMapper#map()の末尾で呼ばれる。
filter()の引数recordが入力の1レコード、writerが出力先。
recordからは、getValue系メソッドで値を取り出す。
getValue系メソッドにはフィールド名を指定する。(ToolクラスでaddJob()する際にフィールド名(ラベル)を付けているので、それを指定する)
出力用レコードは、単純にnew Record()で作れる。
addGrouping()でHadoopのKEY相当のフィールドに値を入れる。(複数のフィールドが追加できる)
addValue()で、HadoopのVALUE相当のフィールドに値を入れる。(複数のフィールドが追加できる)
ちなみに、サンプルのDescSortJobToolでは、出力用レコードの変数名がemitRecodeになってますが^^;
Reducer相当のクラス。(SummarizerはReducerを継承している)
package example; import java.io.IOException; import org.huahinframework.core.Summarizer; import org.huahinframework.core.Writer; import org.huahinframework.core.io.Record;
public class WordSummarizer extends Summarizer { @Override public void summarizerSetup() { } @Override public void init() { }
@Override public void summarizer(Writer writer) throws IOException, InterruptedException { Record groupRecord = super.getGroupingRecord(); String word = groupRecord.getGroupingString("WORD"); int count = 0; while (super.hasNext()) { Record record = super.next(writer); count += record.getValueInteger("COUNT"); } Record emitRecord = new Record(); emitRecord.addGrouping("WORD", word); emitRecord.addValue("COUNT", count); writer.write(emitRecord); } }
summarizerSetup()は、Reducer#setup()相当。
init()は、Reducer#reduce()の先頭で呼ばれる。
summarizer()はReducer#reduce()の末尾で呼ばれる。
summarizer()の中で親クラスのhasNext()・next()を呼び出し、グループ内のレコードを順次処理する。
キー(グループ)の値は、親クラスのgetGroupingRecord()でレコードを取得してそのgetGrouping系メソッドを呼び出すか、
またはnext()で返ってくるrecordのgetGrouping系メソッドで取得できるようだ。
最後に、mainメソッドのクラスを作る。
package example; import org.huahinframework.core.Runner;
public class WordCountJob { public static void main(String[] args) { Runner runner = new Runner(); runner.addJob("wordcount-huahin", WordCountTool.class); int status = runner.run("wordcount-huahin", args); System.exit(status); } }
Runnerインスタンスを作り、Toolクラスに名前を付けて登録(addJob)する。
runner.run()でその名前を指定すると、該当するToolクラスが実行される。
つまり、hadoop-exampleでpiやgrepといった名前を指定して色々なジョブが実行できるのと同様に、
Toolクラスを複数用意してaddJob()で全て登録しておき、runner.run()でどれかひとつの名前を指定して実行する想定なのだろう。
サンプル(org.huahinframework.examples.Jobs)では名前もmain()の引数で渡すようになっている。
作ったソースをコンパイルしてjarファイル化し、hadoopシェルに指定して実行する。
<?xml version="1.0" encoding="UTF-8"?> <project name="huahin-jar" default="jar" basedir="." > <target name="jar"> <jar basedir="../classes" jarfile="my-huahin.jar"> </jar> </target> </project>
$ cd $HOME/workspace/huahin-example/bin $ mkdir input $ vi input/hello.txt $ export HADOOP_CLASSPATH=$HOME/workspace/huahin-example/bin/my-huahin.jar $ hadoop jar my-huahin.jar example.WordCountJob input output $ ls output _SUCCESS part-r-00000
huahin-coreのjarファイルをHADOOP_CLASSPATHに指定し、
自分が作ったクラス(今回はWordCountJob類)のjarファイルをhadoop jarで指定する。
今回の例では全部まとめてひとつのjarファイルにしているので、同じファイルを2箇所に指定する状態になっている。
HADOOP_CLASSPATHでhuahin-coreのjarファイルを指定しないと、以下の様なエラーになる。
java.lang.RuntimeException: java.io.IOException: can't find class: org.huahinframework.core.io.KeyDetail because org.huahinframework.core.io.KeyDetail at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:968) at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:70) at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:59) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1254) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1155) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:582) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:649) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210) Caused by: java.io.IOException: can't find class: org.huahinframework.core.io.KeyDetail because org.huahinframework.core.io.KeyDetail at org.apache.hadoop.io.AbstractMapWritable.readFields(AbstractMapWritable.java:204) at org.apache.hadoop.io.SortedMapWritable.readFields(SortedMapWritable.java:165) at org.huahinframework.core.io.AbstractWritable.readFields(AbstractWritable.java:44) at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:122) ... 9 more