S-JIS[2012-06-29] 変更履歴

Huahin Core

Huahin(ほあひん) Coreは、Huahin Frameworkの、HadoopのMapReduceをラップしたフレームワーク本体。


インストール

Huahin CoreのページやGitHubのhuahin-coreを見ると、pom.xmlにdependencyを記述してMavenでダウンロードするか、gitコマンドを使ってGitHubからソースコードを取ってくることになるようだ。

が、自分の(VMware上の)Linux開発環境ではgitコマンドが入っていないし、pom.xmlを一から書く知識も無いので(爆)、別の方法を採ることにする。

  1. GitHubhuahin-coreのページの上の方にある「ZIP」ボタンを押し、アーカイブファイル(huahin-huahin-core-huahin-core-0.1.0-2-g764eb91.zip)をダウンロードする。
  2. アーカイブファイルを適当な場所に解凍する。
    $ unzip huahin-huahin-core-huahin-core-0.1.0-2-g764eb91.zip
  3. 解凍した中にあるsrcディレクトリーをEclipseにインポートする。
  4. インポートした中の、src/main/javaをソースディレクトリーにする。
    1. パッケージエクスプローラー上でsrc/main/javaを右クリックし、「Build Path」→「Use as Source Folder」を選択する。
  5. 必要なjarファイルをビルドパスに追加する。
  6. AWS(Amazon Web Service)用の開発環境を持っていない場合はS3Utilsが(ライブラリーが無くて)コンパイルエラーになるので、思い切ってS3Utils.javaを削除する。

サンプルを見てみる

サンプルのソースを見てみる。

public class RankingJobTool extends SimpleJobTool {

	protected String setInputPath(String[] args) {
		return args[0];
	}

	protected String setOutputPath(String[] args) {
		return args[1];
	}

	protected void setup() throws Exception {
		final String[] labels = new String[] { "USER", "DATE", "REFERER", "URL" };

		// Labeled the first job
		SimpleJob job1 = addJob(labels, StringUtil.TAB);
		job1.setFilter(FirstFilter.class);
		job1.setSummaizer(FirstSummarizer.class);

		// second job
		SimpleJob job2 = addJob();
		job2.setSummaizer(SecondSummarizer.class);
	}
}

SimpleJobToolを継承したクラスを作る。
SimpleJobToolは、HadoopのTool・ToolRunnerの仕組みをラップしたクラスのようだ。

setInputPath()・setOutputPath()で入力・出力のディレクトリーを返すようにするらしい。argsには、mainメソッドの引数が入ってくるのだろう。
(setというメソッド名だが、セッターではない^^;)

addJob()を呼び出すと、Hadoopジョブが登録できるっぽい。
引数に(たぶん入力レコードの)フィールド名一覧と区切り文字を指定していると思われる。
2個目のaddJob()では特に何も指定していないが、たぶん自動的に1個目の出力レコードの情報を引き継いでくれるのだろう。シンプルで素晴らしい(笑)

FilterはMapperクラス、SummarizerはReducerクラスに相当するようだ。
(setSummaizer(サマイザー)って、つづりが間違っているような気がするが^^;)


WordCountを作ってみる

では、例によってWordCountを作ってみる。

Toolクラス

ToolRunnerで実行されるToolクラス相当。 (SimpleJobToolは、Toolインターフェースを実装し、Configuredを継承している)

package example;

import org.huahinframework.core.SimpleJob;
import org.huahinframework.core.SimpleJobTool;
public class WordCountTool extends SimpleJobTool {

	@Override
	protected String setInputPath(String[] args) {
		return args[0];
	}

	@Override
	protected String setOutputPath(String[] args) {
		return args[1];
	}
	@Override
	protected void setup() throws Exception {
		final String[] labels = new String[] { "TEXT" };

		SimpleJob job = addJob(labels, "\n");
		job.setFilter(WordFilter.class);
		job.setSummaizer(WordSummarizer.class);
	}
}

addJob()の第1引数でフィールド名(ラベル)を指定する。
第2引数にフィールド区切り文字を指定するようだが、WordCountの場合は区切る必要が無いので、データとしては有り得ない文字(改行コード)を指定してみた。

setup()では、Toolインターフェースのrunメソッドのうち、実行前の設定部分を記述するイメージ。
(実行自体は別途Jobクラスを用意し、そちらから行われる)


Filterクラス

Mapper相当のクラス。(FilterはMapperを継承している)

package example;

import java.io.IOException;
import java.util.StringTokenizer;

import org.huahinframework.core.Filter;
import org.huahinframework.core.Writer;
import org.huahinframework.core.io.Record;
public class WordFilter extends Filter {

	@Override
	public void filterSetup() {
	}

	@Override
	public void init() {
	}
	@Override
	public void filter(Record record, Writer writer) throws IOException, InterruptedException {

		String text = record.getValueString("TEXT");
		StringTokenizer st = new StringTokenizer(text);
		while (st.hasMoreTokens()) {
			String word = st.nextToken();

			Record emitRecord = new Record();
			emitRecord.addGrouping("WORD", word);
			emitRecord.addValue("COUNT", 1);

			writer.write(emitRecord);
		}
	}
}

filterSetup()は、Mapper#setup()相当。
init()は、Mapper#map()の先頭で呼ばれる。
filter()はMapper#map()の末尾で呼ばれる。

filter()の引数recordが入力の1レコード、writerが出力先。

recordからは、getValue系メソッドで値を取り出す。
getValue系メソッドにはフィールド名を指定する。(ToolクラスでaddJob()する際にフィールド名(ラベル)を付けているので、それを指定する)

出力用レコードは、単純にnew Record()で作れる。
addGrouping()でHadoopのKEY相当のフィールドに値を入れる。(複数のフィールドが追加できる)
addValue()で、HadoopのVALUE相当のフィールドに値を入れる。(複数のフィールドが追加できる)

ちなみに、サンプルのDescSortJobToolでは、出力用レコードの変数名がemitRecodeになってますが^^;


Summarizerクラス

Reducer相当のクラス。(SummarizerはReducerを継承している)

package example;

import java.io.IOException;

import org.huahinframework.core.Summarizer;
import org.huahinframework.core.Writer;
import org.huahinframework.core.io.Record;
public class WordSummarizer extends Summarizer {

	@Override
	public void summarizerSetup() {
	}

	@Override
	public void init() {
	}
	@Override
	public void summarizer(Writer writer) throws IOException, InterruptedException {

		Record groupRecord = super.getGroupingRecord();
		String word = groupRecord.getGroupingString("WORD");

		int count = 0;
		while (super.hasNext()) {
			Record record = super.next(writer);
			count += record.getValueInteger("COUNT");
		}

		Record emitRecord = new Record();
		emitRecord.addGrouping("WORD", word);
		emitRecord.addValue("COUNT", count);
		writer.write(emitRecord);
	}
}

summarizerSetup()は、Reducer#setup()相当。
init()は、Reducer#reduce()の先頭で呼ばれる。
summarizer()はReducer#reduce()の末尾で呼ばれる。

summarizer()の中で親クラスのhasNext()・next()を呼び出し、グループ内のレコードを順次処理する。

キー(グループ)の値は、親クラスのgetGroupingRecord()でレコードを取得してそのgetGrouping系メソッドを呼び出すか、
またはnext()で返ってくるrecordのgetGrouping系メソッドで取得できるようだ。


Jobsクラス

最後に、mainメソッドのクラスを作る。

package example;

import org.huahinframework.core.Runner;
public class WordCountJob {

	public static void main(String[] args) {
		Runner runner = new Runner();
		runner.addJob("wordcount-huahin", WordCountTool.class);

		int status = runner.run("wordcount-huahin", args);
		System.exit(status);
	}
}

Runnerインスタンスを作り、Toolクラス名前を付けて登録(addJob)する。
runner.run()でその名前を指定すると、該当するToolクラスが実行される。

つまり、hadoop-exampleでpiやgrepといった名前を指定して色々なジョブが実行できるのと同様に、
Toolクラスを複数用意してaddJob()で全て登録しておき、runner.run()でどれかひとつの名前を指定して実行する想定なのだろう。
サンプル(org.huahinframework.examples.Jobs)では名前もmain()の引数で渡すようになっている。


実行方法

作ったソースをコンパイルしてjarファイル化し、hadoopシェルに指定して実行する。

$HOME/workspace/huahin-example/bin/build.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project name="huahin-jar" default="jar" basedir="." >
	<target name="jar">
		<jar basedir="../classes" jarfile="my-huahin.jar">
		</jar>
	</target>
</project>
$ cd $HOME/workspace/huahin-example/bin
$ mkdir input
$ vi input/hello.txt

$ export HADOOP_CLASSPATH=$HOME/workspace/huahin-example/bin/my-huahin.jar
$ hadoop jar my-huahin.jar example.WordCountJob input output

$ ls output
_SUCCESS  part-r-00000

huahin-coreのjarファイルをHADOOP_CLASSPATHに指定し、
自分が作ったクラス(今回はWordCountJob類)のjarファイルをhadoop jarで指定する。

今回の例では全部まとめてひとつのjarファイルにしているので、同じファイルを2箇所に指定する状態になっている。


HADOOP_CLASSPATHでhuahin-coreのjarファイルを指定しないと、以下の様なエラーになる。

java.lang.RuntimeException: java.io.IOException: can't find class: org.huahinframework.core.io.KeyDetail because org.huahinframework.core.io.KeyDetail
        at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:968)
        at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:70)
        at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:59)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1254)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1155)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:582)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:649)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
Caused by: java.io.IOException: can't find class: org.huahinframework.core.io.KeyDetail because org.huahinframework.core.io.KeyDetail
        at org.apache.hadoop.io.AbstractMapWritable.readFields(AbstractMapWritable.java:204)
        at org.apache.hadoop.io.SortedMapWritable.readFields(SortedMapWritable.java:165)
        at org.huahinframework.core.io.AbstractWritable.readFields(AbstractWritable.java:44)
        at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:122)
        ... 9 more

HuahinFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま