S-JIS[2011-08-14] 変更履歴

Pig Storage(Load/Store)

PigのStorage(Load/Store)の作り方のメモ。


概要

Pigで独自形式のファイルを扱いたい場合はStorageクラスを用意する。

loadstoreは、デフォルトでPigStorageが使われている。

load  'パス' using PigStorage();
store エイリアス into 'パス' using PigStorage();

これと同様にStorageクラスを用意すればいい。

ロード(読み込み)用にはLoadFunc・ストア(保存)用にはStoreFuncというクラスが用意されており、それを継承したクラスを作成する。
PigStorageのようなStorageはその両方の機能を持つクラス。

コンパイルに必要となるjarファイルは通常のUDFと同じ。
PathやWritable・OutputFormatといったHadoopの基本となるクラスもPigのjarファイル内に含まれている。


StoreFunc

シーケンスファイルでWordCountWritableを出力するStoreFuncを作ってみる。

package pigudf;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.pig.StoreFunc;
import org.apache.pig.data.Tuple;

import sample.writable.WordCountWritable;
public class WordCountStore extends StoreFunc {
@SuppressWarnings("rawtypes")
@Override
public OutputFormat getOutputFormat() throws IOException {
	return new SequenceFileOutputFormat();
}
OutputFormatを返す。
@Override
public void setStoreLocation(String location, Job job) throws IOException {
	job.setOutputKeyClass(NullWritable.class);
	job.setOutputValueClass(WordCountWritable.class);

	Path path = new Path(location);
	FileOutputFormat.setOutputPath(job, path);
}
出力先をJobに設定する。
Writableのクラスも指定する。
private RecordWriter<NullWritable, WordCountWritable> writer;
private WordCountWritable wc;
@SuppressWarnings("unchecked")
@Override
public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer)
		throws IOException {
	this.writer = writer;
	this.wc     = new WordCountWritable();
}
書き込みの前準備。Writerを保持する。
@Override
public void putNext(Tuple t) throws IOException {
	String word = (String) t.get(0);
	int count  = (Integer) t.get(1);

	wc.setWord(word);
	wc.setCount(count);
	try {
		writer.write(NullWritable.get(), wc);
	} catch (InterruptedException e) {
		throw new IOException(e);
	}
}
データを出力する。
今回は入力データの個数と型は決め打ちしている^^;

コンパイルしたJavaクラスをjarファイル化して使用するのも通常のUDFと同じ。

register /cygwin/tmp/pigudf.jar
a = load '/cygwin/tmp/pig/wordcount/output/' as (word:chararray, cnt:int);
rm /cygwin/tmp/pig/store/output
store a into '/cygwin/tmp/pig/store/output' using pigudf.WordCountStore();

WordCountサンプルで出力したテキストファイルを入力にしてみた。
出力先が存在しているとエラーになるので、事前にrmで消している。

これで作ったファイル、ちゃんとHiveから読めたよ!

add jar C:/cygwin/tmp/hiveserde.jar;

create external table pigwc (word string, cnt int)
row format serde 'sample.WordCountSerDe' stored as sequencefile
location '/cygwin/tmp/pig/store/output/';

select * from pigwc;

LoadFunc

WordCountWritableが入っているシーケンスファイルを読み込んでみる。

package pigudf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

import sample.writable.WordCountWritable;
public class WordCountLoad extends LoadFunc {
	TupleFactory tupleFactory = TupleFactory.getInstance();
@SuppressWarnings("rawtypes")
@Override
public InputFormat getInputFormat() throws IOException {
	return new SequenceFileInputFormat();
}
InputFormatを返す。
@Override
public void setLocation(String location, Job job) throws IOException {
	Path path = new Path(location);
	FileInputFormat.setInputPaths(job, path);
}
入力ファイル名をJobにセットする。
private RecordReader<? extends Writable, WordCountWritable> reader;
private List<Object> cachedList;
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split)
		throws IOException {
	this.reader     = reader;
	this.cachedList = new ArrayList<Object>();
}
読み込みの前準備。Readerを保持する。
@Override
public Tuple getNext() throws IOException {
	try {
		if (reader.nextKeyValue()) {
			WordCountWritable wc = reader.getCurrentValue();

			cachedList.clear();
			cachedList.add(wc.getWord());
			cachedList.add(wc.getCount());

			return tupleFactory.newTuple(cachedList);
		} else {
			return null;
		}
	} catch (InterruptedException e) {
		throw new IOException(e);
	}
}
データを1行分読み込み、タプルに入れて返す。

register /cygwin/tmp/pigudf.jar
a = load '/cygwin/tmp/pig/store/output' using pigudf.WordCountLoad() as (word:chararray, count:int);
dump a

Storage

StoreFuncLoadFuncの両方を1つのクラスで持たせたStorageクラスを作ってみる。

StoreFuncとLoadFuncの両方を継承したクラスが作れれば簡単に解決するのだが、Javaでは多重継承が出来ない。(Scalaならトレイトですぐ解決するのだが)
そこで、親クラスはLoadFuncとし、StoreFuncInterfaceを実装する形となる。

package pigudf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

import sample.writable.WordCountWritable;
public class WordCountStorage extends LoadFunc implements StoreFuncInterface {
	〜
}

ほとんどのメソッドはStoreFuncLoadFuncと全く同じ実装でいい。
他にStoreFuncInterfaceで必要なメソッドは以下の通り。

@Override
public String relToAbsPathForStoreLocation(String location, Path curDir)
		throws IOException {
	return LoadFunc.getAbsolutePath(location, curDir);
}
@Override
public void checkSchema(ResourceSchema s) throws IOException {
}
@Override
public void setStoreFuncUDFContextSignature(String signature) {
}
@Override
public void cleanupOnFailure(String location, Job job) throws IOException {
	StoreFunc.cleanupOnFailureImpl(location, job);
}

register /cygwin/tmp/pigudf.jar
a = load '/cygwin/tmp/pig/wordcount/output/' as (word:chararray, cnt:int);
rm /cygwin/tmp/pig/store/output2
store a into '/cygwin/tmp/pig/store/output2' using pigudf.WordCountStorage();
b = load '/cygwin/tmp/pig/store/output2' using pigudf.WordCountStorage() as (word:chararray, count:int);
dump b

ストアもロードも同じStorage指定で実行できる。


Pig Latinへ戻る / Pig目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま