Asakusa Frameworkで出力したデータをApache Pigで読み込んでみる。
Pigでは独自Writableのシーケンスファイルが扱えるので、AsakusaFWで出力したシーケンスファイルも読める。
例として作ったWordCountModelを書き込んだシーケンスファイルを扱うことを考えてみる。
jarファイル | Eclipseのソースの添付 |
---|---|
PIG_HOME/pig-0.8.1-core.jar | C:/cygwin/usr/local/pig-0.8.1/src |
WordCountModel用のPigのStorageクラスを作成する。
AsakusaFWのシーケンスファイルはキーがNullWritableでデータがモデル(WordCountModel)になる。
(→AsakusaFW用Storageクラスを生成するツール)
なお、パッケージ名はAsakusaFWプロジェクト内で使用しているパッケージ名(afw-wordcountでは「sample」)を使うのが(統一が取れて)良いと思う。
のだが、「sample」はどうもPigのキーワードらしく、load命令でクラスを指定するときにエラーになってしまう。
仕方が無いのでここでは「sample1」とした。
package sample1.afwpig;
import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.pig.LoadFunc; import org.apache.pig.ResourceSchema; import org.apache.pig.StoreFunc; import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import sample.modelgen.dmdl.model.WordCountModel;
public class WordCountModelStorage extends LoadFunc implements StoreFuncInterface { private final TupleFactory tupleFactory = TupleFactory.getInstance();
//ここからLoadFunc
@SuppressWarnings("rawtypes") @Override public InputFormat getInputFormat() throws IOException { return new SequenceFileInputFormat(); }
@Override public void setLocation(String location, Job job) throws IOException { Path path = new Path(location); FileInputFormat.setInputPaths(job, path); }
private RecordReader<? extends Writable, WordCountModel> reader; private List<Object> cachedList; @SuppressWarnings("unchecked") @Override public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) throws IOException { this.reader = reader; this.cachedList = new ArrayList<Object>(); }
@Override public Tuple getNext() throws IOException { try { if (reader.nextKeyValue()) { WordCountModel val = reader.getCurrentValue(); cachedList.clear(); cachedList.add(val.getWordAsString()); cachedList.add(val.getCount()); return tupleFactory.newTuple(cachedList); } else { return null; } } catch (InterruptedException e) { throw new IOException(e); } }
//ここからStoreFunc
@SuppressWarnings("rawtypes") @Override public OutputFormat getOutputFormat() throws IOException { return new SequenceFileOutputFormat(); }
@Override public void setStoreLocation(String location, Job job) throws IOException { job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(WordCountModel.class); Path path = new Path(location); FileOutputFormat.setOutputPath(job, path); }
private RecordWriter<NullWritable, WordCountModel> writer; private WordCountModel val; @SuppressWarnings("unchecked") @Override public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer) throws IOException { this.writer = writer; this.val = new WordCountModel(); }
@Override public void putNext(Tuple t) throws IOException { val.setWordAsString((String) t.get(0)); val.setCount((Integer) t.get(1)); try { writer.write(NullWritable.get(), val); } catch (InterruptedException e) { throw new IOException(e); } }
@Override public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { return LoadFunc.getAbsolutePath(location, curDir); } @Override public void checkSchema(ResourceSchema s) throws IOException { } @Override public void setStoreFuncUDFContextSignature(String signature) { } @Override public void cleanupOnFailure(String location, Job job) throws IOException { StoreFunc.cleanupOnFailureImpl(location, job); } }
Pigで読み込む為のjarファイルを作成する。
作成したWordCountModelStorageの他に、対象となるモデルクラス(WordCountModel)も含んでおく。
(実際にファイルをロードするときに必要になる為)
ちなみに、AsakusaFWの流儀に従うならMavenを使うんだろうけど、自分は知らないので旧来通りのAntで…。
<?xml version="1.0" encoding="UTF-8"?> <project name="afw-ex" default="jar" basedir="."> <property name="src" location="../src_ex" /> <property name="classes" location="../classes_ex" /> <property name="model_src" location="../target/generated-sources/modelgen" /> <property name="model_classes" location="../target/classes" /> <target name="jar"> <jar jarfile="C:/cygwin/tmp/afwpig.jar"> <fileset dir="${classes}" includes="**/*.class" /> <fileset dir="${src}" includes="**/*.java" /> <fileset dir="${model_classes}" includes="**/model/*.class" /> <fileset dir="${model_src}" includes="**/model/*.java" /> </jar> </target> </project>
Pigから独自に作ったクラスを使用する場合、そのクラスの入ったjarファイルをregister命令で読み込む必要がある。
今回の場合はafwpig.jarを読み込めばいいのだが、これだけでは足りない。
java.lang.NoClassDefFoundError: com/asakusafw/runtime/model/DataModel
WordCountModelがAsakusaFWのDataModelインターフェースを実装しているので、これが入っているjarファイルも必要になる。
DataModelが入っているのはM2_REPO/com/asakusafw/asakusa-runtime/0.2.1/asakusa-runtime-0.2.1.jarなので、これもregisterすればよい。
ところがWindowsの場合、M2_REPOが実際に指しているのは「C:/Documents and Settings/ユーザー/.m2/repository
」。つまり空白文字入りのディレクトリー。
register命令は空白文字の入ったパスには対応していないので、読めん!またこのトラップか…!
対処方法は以下のようなものが考えられる。
$ cp -p "/cygdrive/c/Documents and Settings/hishidama/.m2/repository/com/asakusafw/asakusa-runtime/0.2.1/asakusa-runtime-0.2.1.jar" /tmp/ $ pig -x local grunt> register /cygwin/tmp/asakusa-runtime-0.2.1.jar
$ export PIG_CLASSPATH="/cygdrive/c/Documents and Settings/hishidama/.m2/repository/com/asakusafw/asakusa-runtime/0.2.1/asakusa-runtime-0.2.1.jar" $ pig -x local
$ ln -s "/cygdrive/c/Documents and Settings/hishidama/.m2/repository/com/asakusafw/asakusa-runtime/0.2.1/asakusa-runtime-0.2.1.jar" /usr/local/pig-0.8.1/lib/ $ pig -x local
では本体を実行してみよう。
「result/wc*」がAsakusaFWのWordCount(のジョブフローのテスト)によって出力されたファイル。
grunt> register /cygwin/tmp/afwpig.jar grunt> a = load '/cygwin/home/hishidama/result/wc*' using sample1.afwpig.WordCountModelStorage() as (word:chararray, count:int); grunt> dump a 〜 (Hello,2) (Asakusa,1) (World,1)
ちゃんと読めてる^^