S-JIS[2011-08-14/2011-09-22] 変更履歴

Pigユーザー定義関数

Pigのユーザー定義関数(User Defined Function:UDF)の作り方のメモ。


クラス作成

Pigで使う関数を作るには、org.apache.pig.EvalFuncを継承したクラスを作成し、exec()メソッドをオーバーライドする。
EvalFuncの型引数にはexec()の戻り型を指定する。

package pigudf;
import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class SampleUDF extends EvalFunc<String> {

	@Override
	public String exec(Tuple input) throws IOException {
		if (input == null || input.size() == 0) {
			return null;
		}
		String s = (String) input.get(0);
		return s + s;
	}
}

exec()の引数Tupleには、関数が呼び出された際の引数が入ってくる。


必要となるjarファイルは以下の通り。

jarファイル 備考 Eclipseのソースの添付
PIG_HOME/pig-0.8.1-core.jar EvalFunc等のクラス C:/cygwin/usr/local/pig-0.8.1/src

jarファイル化

Pigにはjarファイルを読み込ませるので、作ったクラスをjarファイル化しておく。

プロジェクト/bin/build.xml:

<?xml version="1.0" encoding="Shift_JIS"?>
<project name="pig0.8.1" default="jar" basedir=".">
	<property name="src" location="../src" />
	<property name="classes" location="../classes" />

	<target name="jar">
		<jar jarfile="C:/cygwin/tmp/pigudf.jar">
			<fileset dir="${classes}" includes="**/*.class" />
			<fileset dir="${src}" includes="**/*.java" />
		</jar>
	</target>
</project>

ユーザー定義関数の使用

Pigから自作のユーザー定義関数を使用するには、まずクラスの入ったjarファイルを読み込ませる。

grunt> register /cygwin/tmp/pigudf.jar

関数を使用する際は(パッケージ名付きで)クラス名を指定する。

a = load '/cygwin/tmp/hello.txt' as (text:chararray);
b = foreach a generate pigudf.SampleUDF(text);
grunt> dump b
〜
(#comment#comment)
(Hello WorldHello World)
(Hello HiveHello Hive)
(Hello PigHello Pig)

また、defineによって関数名を付けておくことも出来る。

define mul2 pigudf.SampleUDF();
c = foreach a generate mul2(text);
dump c

複数レコードを返す関数

複数行のデータ(複数のレコード)を返すには、exec()の戻り値をバッグにする。

package pigudf;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class SplitUDF extends EvalFunc<DataBag> {
	BagFactory   bagFactory   = BagFactory.getInstance();
	TupleFactory tupleFactory = TupleFactory.getInstance();

	@Override
	public DataBag exec(Tuple input) throws IOException {
		if (input == null || input.size() == 0) {
			return null;
		}

		DataBag bag = bagFactory.newDefaultBag();

		String s = (String) input.get(0);
		StringTokenizer tokenizer = new StringTokenizer(s);
		while (tokenizer.hasMoreTokens()) {
			String token = tokenizer.nextToken();
			Tuple tuple = tupleFactory.newTuple(token);
			bag.add(tuple);
		}

		return bag;
	}
}

bagにデータをaddしていき、そのbagを返す。
bagにはTupleを介してデータを入れる。

これで動作はするのだが、ただし、describeで見るとデータ型がnullになってしまう。

register /cygwin/tmp/pigudf.jar
a = load '/cygwin/tmp/hello.txt' as (text:chararray);
b = foreach a generate pigudf.SplitUDF(text) as words;
c = foreach b generate flatten(words) as word;
grunt> describe b
b: {words: {null}}
grunt> describe c
c: {word: bytearray}

そこで、データ型を定義してやる。
関数の戻り値のデータ型を定義するには、outputSchema()というメソッドをオーバーライドする。

import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.schema.Schema;
	@Override
	public Schema outputSchema(Schema input) {
		try {
			Schema bagSchema = new Schema();

			// バッグ内の項目の定義(項目名とデータ型)
			bagSchema.add(new Schema.FieldSchema("token", DataType.CHARARRAY));

			return new Schema(new Schema.FieldSchema(
				getSchemaName(getClass().getName().toLowerCase(), input),
				bagSchema,
				DataType.BAG));
		} catch (Exception e) {
			return null;
		}
	}

ちなみに、outputSchema()の引数には、Pigからの関数呼び出しの引数のデータ型が入ってくる。

直接returnするのはバッグの情報。
getSchemaName()はクラス名と入力データを利用した名前を返す。ここではデフォルトのバッグ名として使用している。

outputSchema()メソッドは、describeでも呼ばれる(データ型を表示するんだから当然)。

実行例 備考
a = load '/cygwin/tmp/hello.txt' as (text:chararray);
b = foreach a generate pigudf.SplitUDF(text) as words;
c = foreach b generate flatten(words) as word;
grunt> describe b
b: {words: {token: chararray}}
grunt> describe c
c: {word: chararray}
プログラム内で指定したデータ型とフィールド名が使われている。
a = load '/cygwin/tmp/hello.txt' as (text:chararray);
b = foreach a generate pigudf.SplitUDF(text);
c = foreach b generate flatten($0);
grunt> describe b
b: {pigudf.splitudf_text_9: {token: chararray}}
grunt> describe c
c: {pigudf.splitudf_text_12::token: chararray}
foreachでフィールド名を付けなかった場合、
プログラム内で付けたバッグのデフォルト名が使われる。
末尾の数値はユニークIDらしいが
describeする度に変わっていくので名前としては使えないような…。
サンプルで使われているからgetSchemaName()を使ってみたが、
getSchemaName()を使わずに固定でデフォルト名を付けるのが良さそう。
(むしろTOKENIZEはそうなっている)

複数カラムを返す関数

複数の項目(カラム・列)のデータを返すには、exec()の戻り値をタプルにする。[2011-09-22]

package pigudf;
import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
public class ColsUDF extends EvalFunc<Tuple> {
	TupleFactory tupleFactory = TupleFactory.getInstance();
	// 関数の1個目の引数に対し、その文字列そのもの・大文字変換・小文字変換した文字列を返す
	@Override
	public Tuple exec(Tuple input) throws IOException {
		if (input == null || input.size() == 0) {
			return null;
		}

		Tuple r = tupleFactory.newTuple();
		Object obj = input.get(0);
		String s = (obj == null) ? "" : obj.toString();
		r.append(s);
		r.append(s.toUpperCase());
		r.append(s.toLowerCase());

		return r;
	}
	@Override
	public Schema outputSchema(Schema input) {
		// タプル名
		String name = "";
		try {
			FieldSchema fs = input.getField(0);
			name = fs.alias;
		} catch (Exception e) { }

		try {
			Schema schema = new Schema();

			// タプル内の項目の定義(項目名とデータ型)
			schema.add(new Schema.FieldSchema("src",   DataType.CHARARRAY));
			schema.add(new Schema.FieldSchema("upper", DataType.CHARARRAY));
			schema.add(new Schema.FieldSchema("lower", DataType.CHARARRAY));

			return new Schema(new Schema.FieldSchema(name, schema, DataType.TUPLE));
		} catch (Exception e) {
			return null;
		}
	}
}
実行例 備考
a = load '/cygwin/tmp/hello.txt' as (text:chararray);
b = foreach a generate pigudf.ColsUDF(text);
c = foreach b generate text.src, text.upper, text.lower;
grunt> describe b
b: {text: (src: chararray,upper: chararray,lower: chararray)}
grunt> describe c
c: {src: chararray,upper: chararray,lower: chararray}
このoutputSchemaでは、
引数のフィールドに名前(左記のtext)が付いている場合、
それをタプル名に使うようになっている。(fs.aliasの部分)
a = load '/cygwin/tmp/hello.txt';
b = foreach a generate pigudf.ColsUDF($0);
c = foreach b generate $0.src, $0.upper, $0.lower;
grunt> describe b
b: {(src: chararray,upper: chararray,lower: chararray)}
grunt> describe c
c: {src: chararray,upper: chararray,lower: chararray}
outputSchema内でタプル名を空文字列にした場合、
describeで見たときに名前が付いていない。
「$0」等の番号で指定する。
a = load '/cygwin/tmp/hello.txt' as (text:chararray);
b = foreach a generate flatten(pigudf.ColsUDF(text));
c = foreach b generate text::src, text::upper, text::lower;
c2= foreach b generate src, upper, lower;
grunt> describe b
b: {text::src: chararray,text::upper: chararray,text::lower: chararray}
grunt> describe c
c: {text::src: chararray,text::upper: chararray,text::lower: chararray}
grunt> describe c2
c2: {text::src: chararray,text::upper: chararray,text::lower: chararray}
flattenを使うと、フィールド名が「タプル名::名前」になる。
この場合、「タプル名::」を省略して使うことが出来るが、
実際にはタプル名が付いて回るようだ。
a = load '/cygwin/tmp/hello.txt';
b = foreach a generate flatten(pigudf.ColsUDF($0));
c = foreach b generate src, upper, lower;
grunt> describe b
b: {src: chararray,upper: chararray,lower: chararray}
grunt> describe c
c: {src: chararray,upper: chararray,lower: chararray}
outputSchema内でタプル名を空文字列にしている場合、
flattenするとタプル名が消える。

フィルター関数

フィルター(抽出条件)で使用する関数のクラスは、org.apache.pig.FilterFuncを継承する。
(実質的には、EvalFuncの戻り値がBooleanになっているだけ)

package pigudf;
import java.io.IOException;

import org.apache.pig.FilterFunc;
import org.apache.pig.data.Tuple;
public class StartsWith extends FilterFunc {

	@Override
	public Boolean exec(Tuple input) throws IOException {
		if (input == null || input.size() < 2) {
			return null;
		}
		String s = (String) input.get(0);
		String t = (String) input.get(1);
		return s.startsWith(t);
	}
}

register /cygwin/tmp/pigudf.jar
a = load '/cygwin/tmp/hello.txt' as (text:chararray);
b = filter a by not pigudf.StartsWith(text, '#');
grunt> dump b
〜
(Hello World)
(Hello Hive)
(Hello Pig)

Pig Latinへ戻る / Pig目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま