S-JIS[2011-08-15] 変更履歴

Asakusa FrameworkのデータをPigで読み込む

Asakusa Frameworkで出力したデータをApache Pigで読み込んでみる。


概要

Pigでは独自Writableのシーケンスファイルが扱えるので、AsakusaFWで出力したシーケンスファイルも読める。

例として作ったWordCountModelを書き込んだシーケンスファイルを扱うことを考えてみる。

  1. afw-wordcountプロジェクト内に新しいソースフォルダーを作成する。(例:src_ex
  2. そのソースフォルダーの出力先を適当な新しいディレクトリーに変更する。(例:classes_ex
  3. 参照ライブラリー(ビルドパス)にPigのjarファイルを追加しておく。
    jarファイル Eclipseのソースの添付
    PIG_HOME/pig-0.8.1-core.jar C:/cygwin/usr/local/pig-0.8.1/src
  4. WordCountModel用のPigのStorageクラスを作成する。
  5. StorageクラスとWordCountModelを含んだjarファイルを作成する。
  6. Pigからそのjarファイルを読み込む
  7. 後は普通にload命令でAsakusaFWのシーケンスファイルを読み込める。

Storageクラス

WordCountModel用のPigのStorageクラスを作成する。
AsakusaFWのシーケンスファイルはキーがNullWritableでデータがモデル(WordCountModel)になる。
(→AsakusaFW用Storageクラスを生成するツール

なお、パッケージ名はAsakusaFWプロジェクト内で使用しているパッケージ名(afw-wordcountでは「sample」)を使うのが(統一が取れて)良いと思う。
のだが、「sample」はどうもPigのキーワードらしく、load命令でクラスを指定するときにエラーになってしまう。
仕方が無いのでここでは「sample1」とした。

package sample1.afwpig;
import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

import sample.modelgen.dmdl.model.WordCountModel;
public class WordCountModelStorage extends LoadFunc implements StoreFuncInterface {
	private final TupleFactory tupleFactory = TupleFactory.getInstance();

//ここからLoadFunc

	@SuppressWarnings("rawtypes")
	@Override
	public InputFormat getInputFormat() throws IOException {
		return new SequenceFileInputFormat();
	}
	@Override
	public void setLocation(String location, Job job) throws IOException {
		Path path = new Path(location);
		FileInputFormat.setInputPaths(job, path);
	}
	private RecordReader<? extends Writable, WordCountModel> reader;
	private List<Object> cachedList;

	@SuppressWarnings("unchecked")
	@Override
	public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) throws IOException {
		this.reader = reader;
		this.cachedList = new ArrayList<Object>();
	}
	@Override
	public Tuple getNext() throws IOException {
		try {
			if (reader.nextKeyValue()) {
				WordCountModel val = reader.getCurrentValue();

				cachedList.clear();
				cachedList.add(val.getWordAsString());
				cachedList.add(val.getCount());

				return tupleFactory.newTuple(cachedList);
			} else {
				return null;
			}
		} catch (InterruptedException e) {
			throw new IOException(e);
		}
	}

//ここからStoreFunc

	@SuppressWarnings("rawtypes")
	@Override
	public OutputFormat getOutputFormat() throws IOException {
		return new SequenceFileOutputFormat();
	}
	@Override
	public void setStoreLocation(String location, Job job) throws IOException {
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(WordCountModel.class);

		Path path = new Path(location);
		FileOutputFormat.setOutputPath(job, path);
	}
	private RecordWriter<NullWritable, WordCountModel> writer;
	private WordCountModel val;

	@SuppressWarnings("unchecked")
	@Override
	public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer) throws IOException {
		this.writer = writer;
		this.val = new WordCountModel();
	}
	@Override
	public void putNext(Tuple t) throws IOException {
		val.setWordAsString((String) t.get(0));
		val.setCount((Integer) t.get(1));

		try {
			writer.write(NullWritable.get(), val);
		} catch (InterruptedException e) {
			throw new IOException(e);
		}
	}
	@Override
	public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
		return LoadFunc.getAbsolutePath(location, curDir);
	}

	@Override
	public void checkSchema(ResourceSchema s) throws IOException {
	}

	@Override
	public void setStoreFuncUDFContextSignature(String signature) {
	}

	@Override
	public void cleanupOnFailure(String location, Job job) throws IOException {
		StoreFunc.cleanupOnFailureImpl(location, job);
	}
}

jarファイル作成

Pigで読み込む為のjarファイルを作成する。

作成したWordCountModelStorageの他に、対象となるモデルクラス(WordCountModel)も含んでおく。
(実際にファイルをロードするときに必要になる為)

ちなみに、AsakusaFWの流儀に従うならMavenを使うんだろうけど、自分は知らないので旧来通りのAntで…。

afw-wordcountプロジェクト/bin/build.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project name="afw-ex" default="jar" basedir=".">
	<property name="src"     location="../src_ex" />
	<property name="classes" location="../classes_ex" />
	<property name="model_src"     location="../target/generated-sources/modelgen" />
	<property name="model_classes" location="../target/classes" />

	<target name="jar">
		<jar jarfile="C:/cygwin/tmp/afwpig.jar">
			<fileset dir="${classes}" includes="**/*.class" />
			<fileset dir="${src}"     includes="**/*.java" />
			<fileset dir="${model_classes}" includes="**/model/*.class" />
			<fileset dir="${model_src}"     includes="**/model/*.java" />
		</jar>
	</target>
</project>

実行例

Pigから独自に作ったクラスを使用する場合、そのクラスの入ったjarファイルをregister命令で読み込む必要がある。

今回の場合はafwpig.jarを読み込めばいいのだが、これだけでは足りない。

java.lang.NoClassDefFoundError: com/asakusafw/runtime/model/DataModel

WordCountModelがAsakusaFWのDataModelインターフェースを実装しているので、これが入っているjarファイルも必要になる。
DataModelが入っているのはM2_REPO/com/asakusafw/asakusa-runtime/0.2.1/asakusa-runtime-0.2.1.jarなので、これもregisterすればよい。
ところがWindowsの場合、M2_REPOが実際に指しているのは「C:/Documents and Settings/ユーザー/.m2/repository」。つまり空白文字入りのディレクトリー。
register命令は空白文字の入ったパスには対応していないので、読めん!またこのトラップか…!
対処方法は以下のようなものが考えられる。


では本体を実行してみよう。
「result/wc*」がAsakusaFWのWordCount(のジョブフローのテスト)によって出力されたファイル。

grunt> register /cygwin/tmp/afwpig.jar
grunt> a = load '/cygwin/home/hishidama/result/wc*' using sample1.afwpig.WordCountModelStorage() as (word:chararray, count:int);
grunt> dump a
〜
(Hello,2)
(Asakusa,1)
(World,1)

ちゃんと読めてる^^


AsakusaFW目次へ戻る / Pigへ戻る / 技術メモへ戻る
メールの送信先:ひしだま