Asakusa Frameworkで出力したデータをApache Pigで読み込んでみる。
Pigでは独自Writableのシーケンスファイルが扱えるので、AsakusaFWで出力したシーケンスファイルも読める。
例として作ったWordCountModelを書き込んだシーケンスファイルを扱うことを考えてみる。
| jarファイル | Eclipseのソースの添付 |
|---|---|
| PIG_HOME/pig-0.8.1-core.jar | C:/cygwin/usr/local/pig-0.8.1/src |
WordCountModel用のPigのStorageクラスを作成する。
AsakusaFWのシーケンスファイルはキーがNullWritableでデータがモデル(WordCountModel)になる。
(→AsakusaFW用Storageクラスを生成するツール)
なお、パッケージ名はAsakusaFWプロジェクト内で使用しているパッケージ名(afw-wordcountでは「sample」)を使うのが(統一が取れて)良いと思う。
のだが、「sample」はどうもPigのキーワードらしく、load命令でクラスを指定するときにエラーになってしまう。
仕方が無いのでここでは「sample1」とした。
package sample1.afwpig;
import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.pig.LoadFunc; import org.apache.pig.ResourceSchema; import org.apache.pig.StoreFunc; import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import sample.modelgen.dmdl.model.WordCountModel;
public class WordCountModelStorage extends LoadFunc implements StoreFuncInterface {
private final TupleFactory tupleFactory = TupleFactory.getInstance();
//ここからLoadFunc
@SuppressWarnings("rawtypes")
@Override
public InputFormat getInputFormat() throws IOException {
return new SequenceFileInputFormat();
}
@Override
public void setLocation(String location, Job job) throws IOException {
Path path = new Path(location);
FileInputFormat.setInputPaths(job, path);
}
private RecordReader<? extends Writable, WordCountModel> reader;
private List<Object> cachedList;
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) throws IOException {
this.reader = reader;
this.cachedList = new ArrayList<Object>();
}
@Override
public Tuple getNext() throws IOException {
try {
if (reader.nextKeyValue()) {
WordCountModel val = reader.getCurrentValue();
cachedList.clear();
cachedList.add(val.getWordAsString());
cachedList.add(val.getCount());
return tupleFactory.newTuple(cachedList);
} else {
return null;
}
} catch (InterruptedException e) {
throw new IOException(e);
}
}
//ここからStoreFunc
@SuppressWarnings("rawtypes")
@Override
public OutputFormat getOutputFormat() throws IOException {
return new SequenceFileOutputFormat();
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(WordCountModel.class);
Path path = new Path(location);
FileOutputFormat.setOutputPath(job, path);
}
private RecordWriter<NullWritable, WordCountModel> writer;
private WordCountModel val;
@SuppressWarnings("unchecked")
@Override
public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer) throws IOException {
this.writer = writer;
this.val = new WordCountModel();
}
@Override
public void putNext(Tuple t) throws IOException {
val.setWordAsString((String) t.get(0));
val.setCount((Integer) t.get(1));
try {
writer.write(NullWritable.get(), val);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
return LoadFunc.getAbsolutePath(location, curDir);
}
@Override
public void checkSchema(ResourceSchema s) throws IOException {
}
@Override
public void setStoreFuncUDFContextSignature(String signature) {
}
@Override
public void cleanupOnFailure(String location, Job job) throws IOException {
StoreFunc.cleanupOnFailureImpl(location, job);
}
}
Pigで読み込む為のjarファイルを作成する。
作成したWordCountModelStorageの他に、対象となるモデルクラス(WordCountModel)も含んでおく。
(実際にファイルをロードするときに必要になる為)
ちなみに、AsakusaFWの流儀に従うならMavenを使うんだろうけど、自分は知らないので旧来通りのAntで…。
<?xml version="1.0" encoding="UTF-8"?>
<project name="afw-ex" default="jar" basedir=".">
<property name="src" location="../src_ex" />
<property name="classes" location="../classes_ex" />
<property name="model_src" location="../target/generated-sources/modelgen" />
<property name="model_classes" location="../target/classes" />
<target name="jar">
<jar jarfile="C:/cygwin/tmp/afwpig.jar">
<fileset dir="${classes}" includes="**/*.class" />
<fileset dir="${src}" includes="**/*.java" />
<fileset dir="${model_classes}" includes="**/model/*.class" />
<fileset dir="${model_src}" includes="**/model/*.java" />
</jar>
</target>
</project>
Pigから独自に作ったクラスを使用する場合、そのクラスの入ったjarファイルをregister命令で読み込む必要がある。
今回の場合はafwpig.jarを読み込めばいいのだが、これだけでは足りない。
java.lang.NoClassDefFoundError: com/asakusafw/runtime/model/DataModel
WordCountModelがAsakusaFWのDataModelインターフェースを実装しているので、これが入っているjarファイルも必要になる。
DataModelが入っているのはM2_REPO/com/asakusafw/asakusa-runtime/0.2.1/asakusa-runtime-0.2.1.jarなので、これもregisterすればよい。
ところがWindowsの場合、M2_REPOが実際に指しているのは「C:/Documents and Settings/ユーザー/.m2/repository」。つまり空白文字入りのディレクトリー。
register命令は空白文字の入ったパスには対応していないので、読めん!またこのトラップか…!
対処方法は以下のようなものが考えられる。
$ cp -p "/cygdrive/c/Documents and Settings/hishidama/.m2/repository/com/asakusafw/asakusa-runtime/0.2.1/asakusa-runtime-0.2.1.jar" /tmp/ $ pig -x local grunt> register /cygwin/tmp/asakusa-runtime-0.2.1.jar
$ export PIG_CLASSPATH="/cygdrive/c/Documents and Settings/hishidama/.m2/repository/com/asakusafw/asakusa-runtime/0.2.1/asakusa-runtime-0.2.1.jar" $ pig -x local
$ ln -s "/cygdrive/c/Documents and Settings/hishidama/.m2/repository/com/asakusafw/asakusa-runtime/0.2.1/asakusa-runtime-0.2.1.jar" /usr/local/pig-0.8.1/lib/ $ pig -x local
では本体を実行してみよう。
「result/wc*」がAsakusaFWのWordCount(のジョブフローのテスト)によって出力されたファイル。
grunt> register /cygwin/tmp/afwpig.jar grunt> a = load '/cygwin/home/hishidama/result/wc*' using sample1.afwpig.WordCountModelStorage() as (word:chararray, count:int); grunt> dump a 〜 (Hello,2) (Asakusa,1) (World,1)
ちゃんと読めてる^^