PigのStorage(Load/Store)の作り方のメモ。
Pigで独自形式のファイルを扱いたい場合はStorageクラスを用意する。
loadやstoreは、デフォルトでPigStorageが使われている。
load 'パス' using PigStorage(); store エイリアス into 'パス' using PigStorage();
これと同様にStorageクラスを用意すればいい。
ロード(読み込み)用にはLoadFunc・ストア(保存)用にはStoreFuncというクラスが用意されており、それを継承したクラスを作成する。
PigStorageのようなStorageはその両方の機能を持つクラス。
コンパイルに必要となるjarファイルは通常のUDFと同じ。
PathやWritable・OutputFormatといったHadoopの基本となるクラスもPigのjarファイル内に含まれている。
シーケンスファイルでWordCountWritableを出力するStoreFuncを作ってみる。
package pigudf;
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.pig.StoreFunc; import org.apache.pig.data.Tuple; import sample.writable.WordCountWritable;
public class WordCountStore extends StoreFunc {
@SuppressWarnings("rawtypes") @Override public OutputFormat getOutputFormat() throws IOException { return new SequenceFileOutputFormat(); } |
OutputFormatを返す。 |
@Override public void setStoreLocation(String location, Job job) throws IOException { job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(WordCountWritable.class); Path path = new Path(location); FileOutputFormat.setOutputPath(job, path); } |
出力先をJobに設定する。 Writableのクラスも指定する。 |
private RecordWriter<NullWritable, WordCountWritable> writer; private WordCountWritable wc; @SuppressWarnings("unchecked") @Override public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer) throws IOException { this.writer = writer; this.wc = new WordCountWritable(); } |
書き込みの前準備。Writerを保持する。 |
@Override public void putNext(Tuple t) throws IOException { String word = (String) t.get(0); int count = (Integer) t.get(1); wc.setWord(word); wc.setCount(count); try { writer.write(NullWritable.get(), wc); } catch (InterruptedException e) { throw new IOException(e); } } |
データを出力する。 今回は入力データの個数と型は決め打ちしている^^; |
コンパイルしたJavaクラスをjarファイル化して使用するのも通常のUDFと同じ。
register /cygwin/tmp/pigudf.jar a = load '/cygwin/tmp/pig/wordcount/output/' as (word:chararray, cnt:int); rm /cygwin/tmp/pig/store/output store a into '/cygwin/tmp/pig/store/output' using pigudf.WordCountStore();
WordCountサンプルで出力したテキストファイルを入力にしてみた。
出力先が存在しているとエラーになるので、事前にrmで消している。
これで作ったファイル、ちゃんとHiveから読めたよ!
add jar C:/cygwin/tmp/hiveserde.jar; create external table pigwc (word string, cnt int) row format serde 'sample.WordCountSerDe' stored as sequencefile location '/cygwin/tmp/pig/store/output/'; select * from pigwc;
WordCountWritableが入っているシーケンスファイルを読み込んでみる。
package pigudf;
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.pig.LoadFunc; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import sample.writable.WordCountWritable;
public class WordCountLoad extends LoadFunc { TupleFactory tupleFactory = TupleFactory.getInstance();
@SuppressWarnings("rawtypes") @Override public InputFormat getInputFormat() throws IOException { return new SequenceFileInputFormat(); } |
InputFormatを返す。 |
@Override public void setLocation(String location, Job job) throws IOException { Path path = new Path(location); FileInputFormat.setInputPaths(job, path); } |
入力ファイル名をJobにセットする。 |
private RecordReader<? extends Writable, WordCountWritable> reader; private List<Object> cachedList; @SuppressWarnings("unchecked") @Override public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) throws IOException { this.reader = reader; this.cachedList = new ArrayList<Object>(); } |
読み込みの前準備。Readerを保持する。 |
@Override public Tuple getNext() throws IOException { try { if (reader.nextKeyValue()) { WordCountWritable wc = reader.getCurrentValue(); cachedList.clear(); cachedList.add(wc.getWord()); cachedList.add(wc.getCount()); return tupleFactory.newTuple(cachedList); } else { return null; } } catch (InterruptedException e) { throw new IOException(e); } } |
データを1行分読み込み、タプルに入れて返す。 |
register /cygwin/tmp/pigudf.jar a = load '/cygwin/tmp/pig/store/output' using pigudf.WordCountLoad() as (word:chararray, count:int); dump a
StoreFuncとLoadFuncの両方を1つのクラスで持たせたStorageクラスを作ってみる。
StoreFuncとLoadFuncの両方を継承したクラスが作れれば簡単に解決するのだが、Javaでは多重継承が出来ない。(Scalaならトレイトですぐ解決するのだが)
そこで、親クラスはLoadFuncとし、StoreFuncInterfaceを実装する形となる。
package pigudf;
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.pig.LoadFunc; import org.apache.pig.ResourceSchema; import org.apache.pig.StoreFunc; import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import sample.writable.WordCountWritable;
public class WordCountStorage extends LoadFunc implements StoreFuncInterface { 〜 }
ほとんどのメソッドはStoreFunc・LoadFuncと全く同じ実装でいい。
他にStoreFuncInterfaceで必要なメソッドは以下の通り。
@Override public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { return LoadFunc.getAbsolutePath(location, curDir); } |
@Override public void checkSchema(ResourceSchema s) throws IOException { } |
@Override public void setStoreFuncUDFContextSignature(String signature) { } |
@Override public void cleanupOnFailure(String location, Job job) throws IOException { StoreFunc.cleanupOnFailureImpl(location, job); } |
register /cygwin/tmp/pigudf.jar
a = load '/cygwin/tmp/pig/wordcount/output/' as (word:chararray, cnt:int); rm /cygwin/tmp/pig/store/output2 store a into '/cygwin/tmp/pig/store/output2' using pigudf.WordCountStorage();
b = load '/cygwin/tmp/pig/store/output2' using pigudf.WordCountStorage() as (word:chararray, count:int); dump b
ストアもロードも同じStorage指定で実行できる。