PigのStorage(Load/Store)の作り方のメモ。
Pigで独自形式のファイルを扱いたい場合はStorageクラスを用意する。
loadやstoreは、デフォルトでPigStorageが使われている。
load 'パス' using PigStorage(); store エイリアス into 'パス' using PigStorage();
これと同様にStorageクラスを用意すればいい。
ロード(読み込み)用にはLoadFunc・ストア(保存)用にはStoreFuncというクラスが用意されており、それを継承したクラスを作成する。
PigStorageのようなStorageはその両方の機能を持つクラス。
コンパイルに必要となるjarファイルは通常のUDFと同じ。
PathやWritable・OutputFormatといったHadoopの基本となるクラスもPigのjarファイル内に含まれている。
シーケンスファイルでWordCountWritableを出力するStoreFuncを作ってみる。
package pigudf;
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.pig.StoreFunc; import org.apache.pig.data.Tuple; import sample.writable.WordCountWritable;
public class WordCountStore extends StoreFunc {
@SuppressWarnings("rawtypes")
@Override
public OutputFormat getOutputFormat() throws IOException {
return new SequenceFileOutputFormat();
}
|
OutputFormatを返す。 |
@Override
public void setStoreLocation(String location, Job job) throws IOException {
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(WordCountWritable.class);
Path path = new Path(location);
FileOutputFormat.setOutputPath(job, path);
}
|
出力先をJobに設定する。 Writableのクラスも指定する。 |
private RecordWriter<NullWritable, WordCountWritable> writer; private WordCountWritable wc; @SuppressWarnings("unchecked")
@Override
public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer)
throws IOException {
this.writer = writer;
this.wc = new WordCountWritable();
}
|
書き込みの前準備。Writerを保持する。 |
@Override
public void putNext(Tuple t) throws IOException {
String word = (String) t.get(0);
int count = (Integer) t.get(1);
wc.setWord(word);
wc.setCount(count);
try {
writer.write(NullWritable.get(), wc);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
|
データを出力する。 今回は入力データの個数と型は決め打ちしている^^; |
コンパイルしたJavaクラスをjarファイル化して使用するのも通常のUDFと同じ。
register /cygwin/tmp/pigudf.jar a = load '/cygwin/tmp/pig/wordcount/output/' as (word:chararray, cnt:int); rm /cygwin/tmp/pig/store/output store a into '/cygwin/tmp/pig/store/output' using pigudf.WordCountStore();
WordCountサンプルで出力したテキストファイルを入力にしてみた。
出力先が存在しているとエラーになるので、事前にrmで消している。
これで作ったファイル、ちゃんとHiveから読めたよ!
add jar C:/cygwin/tmp/hiveserde.jar; create external table pigwc (word string, cnt int) row format serde 'sample.WordCountSerDe' stored as sequencefile location '/cygwin/tmp/pig/store/output/'; select * from pigwc;
WordCountWritableが入っているシーケンスファイルを読み込んでみる。
package pigudf;
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.pig.LoadFunc; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import sample.writable.WordCountWritable;
public class WordCountLoad extends LoadFunc {
TupleFactory tupleFactory = TupleFactory.getInstance();
@SuppressWarnings("rawtypes")
@Override
public InputFormat getInputFormat() throws IOException {
return new SequenceFileInputFormat();
}
|
InputFormatを返す。 |
@Override
public void setLocation(String location, Job job) throws IOException {
Path path = new Path(location);
FileInputFormat.setInputPaths(job, path);
}
|
入力ファイル名をJobにセットする。 |
private RecordReader<? extends Writable, WordCountWritable> reader; private List<Object> cachedList; @SuppressWarnings("unchecked")
@Override
public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split)
throws IOException {
this.reader = reader;
this.cachedList = new ArrayList<Object>();
}
|
読み込みの前準備。Readerを保持する。 |
@Override
public Tuple getNext() throws IOException {
try {
if (reader.nextKeyValue()) {
WordCountWritable wc = reader.getCurrentValue();
cachedList.clear();
cachedList.add(wc.getWord());
cachedList.add(wc.getCount());
return tupleFactory.newTuple(cachedList);
} else {
return null;
}
} catch (InterruptedException e) {
throw new IOException(e);
}
}
|
データを1行分読み込み、タプルに入れて返す。 |
register /cygwin/tmp/pigudf.jar a = load '/cygwin/tmp/pig/store/output' using pigudf.WordCountLoad() as (word:chararray, count:int); dump a
StoreFuncとLoadFuncの両方を1つのクラスで持たせたStorageクラスを作ってみる。
StoreFuncとLoadFuncの両方を継承したクラスが作れれば簡単に解決するのだが、Javaでは多重継承が出来ない。(Scalaならトレイトですぐ解決するのだが)
そこで、親クラスはLoadFuncとし、StoreFuncInterfaceを実装する形となる。
package pigudf;
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.pig.LoadFunc; import org.apache.pig.ResourceSchema; import org.apache.pig.StoreFunc; import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import sample.writable.WordCountWritable;
public class WordCountStorage extends LoadFunc implements StoreFuncInterface {
〜
}
ほとんどのメソッドはStoreFunc・LoadFuncと全く同じ実装でいい。
他にStoreFuncInterfaceで必要なメソッドは以下の通り。
@Override
public String relToAbsPathForStoreLocation(String location, Path curDir)
throws IOException {
return LoadFunc.getAbsolutePath(location, curDir);
}
|
@Override
public void checkSchema(ResourceSchema s) throws IOException {
}
|
@Override
public void setStoreFuncUDFContextSignature(String signature) {
}
|
@Override
public void cleanupOnFailure(String location, Job job) throws IOException {
StoreFunc.cleanupOnFailureImpl(location, job);
}
|
register /cygwin/tmp/pigudf.jar
a = load '/cygwin/tmp/pig/wordcount/output/' as (word:chararray, cnt:int); rm /cygwin/tmp/pig/store/output2 store a into '/cygwin/tmp/pig/store/output2' using pigudf.WordCountStorage();
b = load '/cygwin/tmp/pig/store/output2' using pigudf.WordCountStorage() as (word:chararray, count:int); dump b
ストアもロードも同じStorage指定で実行できる。