Pigのユーザー定義関数(User Defined Function:UDF)の作り方のメモ。
|
|
Pigで使う関数を作るには、org.apache.pig.EvalFuncを継承したクラスを作成し、exec()メソッドをオーバーライドする。
EvalFuncの型引数にはexec()の戻り型を指定する。
package pigudf;
import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple;
public class SampleUDF extends EvalFunc<String> {
@Override
public String exec(Tuple input) throws IOException {
if (input == null || input.size() == 0) {
return null;
}
String s = (String) input.get(0);
return s + s;
}
}
exec()の引数Tupleには、関数が呼び出された際の引数が入ってくる。
必要となるjarファイルは以下の通り。
| jarファイル | 備考 | Eclipseのソースの添付 |
|---|---|---|
| PIG_HOME/pig-0.8.1-core.jar | EvalFunc等のクラス | C:/cygwin/usr/local/pig-0.8.1/src |
Pigにはjarファイルを読み込ませるので、作ったクラスをjarファイル化しておく。
<?xml version="1.0" encoding="Shift_JIS"?>
<project name="pig0.8.1" default="jar" basedir=".">
<property name="src" location="../src" />
<property name="classes" location="../classes" />
<target name="jar">
<jar jarfile="C:/cygwin/tmp/pigudf.jar">
<fileset dir="${classes}" includes="**/*.class" />
<fileset dir="${src}" includes="**/*.java" />
</jar>
</target>
</project>
Pigから自作のユーザー定義関数を使用するには、まずクラスの入ったjarファイルを読み込ませる。
grunt> register /cygwin/tmp/pigudf.jar
関数を使用する際は(パッケージ名付きで)クラス名を指定する。
a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = foreach a generate pigudf.SampleUDF(text);
grunt> dump b 〜 (#comment#comment) (Hello WorldHello World) (Hello HiveHello Hive) (Hello PigHello Pig)
また、defineによって関数名を付けておくことも出来る。
define mul2 pigudf.SampleUDF(); c = foreach a generate mul2(text); dump c
複数行のデータ(複数のレコード)を返すには、exec()の戻り値をバッグにする。
package pigudf;
import java.io.IOException; import java.util.StringTokenizer; import org.apache.pig.EvalFunc; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory;
public class SplitUDF extends EvalFunc<DataBag> {
BagFactory bagFactory = BagFactory.getInstance();
TupleFactory tupleFactory = TupleFactory.getInstance();
@Override
public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() == 0) {
return null;
}
DataBag bag = bagFactory.newDefaultBag();
String s = (String) input.get(0);
StringTokenizer tokenizer = new StringTokenizer(s);
while (tokenizer.hasMoreTokens()) {
String token = tokenizer.nextToken();
Tuple tuple = tupleFactory.newTuple(token);
bag.add(tuple);
}
return bag;
}
}
bagにデータをaddしていき、そのbagを返す。
bagにはTupleを介してデータを入れる。
これで動作はするのだが、ただし、describeで見るとデータ型がnullになってしまう。
register /cygwin/tmp/pigudf.jar a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = foreach a generate pigudf.SplitUDF(text) as words; c = foreach b generate flatten(words) as word;
grunt> describe b
b: {words: {null}}
grunt> describe c
c: {word: bytearray}
そこで、データ型を定義してやる。
関数の戻り値のデータ型を定義するには、outputSchema()というメソッドをオーバーライドする。
import org.apache.pig.data.DataType; import org.apache.pig.impl.logicalLayer.schema.Schema;
@Override
public Schema outputSchema(Schema input) {
try {
Schema bagSchema = new Schema();
// バッグ内の項目の定義(項目名とデータ型)
bagSchema.add(new Schema.FieldSchema("token", DataType.CHARARRAY));
return new Schema(new Schema.FieldSchema(
getSchemaName(getClass().getName().toLowerCase(), input),
bagSchema,
DataType.BAG));
} catch (Exception e) {
return null;
}
}
ちなみに、outputSchema()の引数には、Pigからの関数呼び出しの引数のデータ型が入ってくる。
直接returnするのはバッグの情報。
getSchemaName()はクラス名と入力データを利用した名前を返す。ここではデフォルトのバッグ名として使用している。
outputSchema()メソッドは、describeでも呼ばれる(データ型を表示するんだから当然)。
| 実行例 | 備考 |
|---|---|
a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = foreach a generate pigudf.SplitUDF(text) as words; c = foreach b generate flatten(words) as word; grunt> describe b
b: {words: {token: chararray}}
grunt> describe c
c: {word: chararray}
|
プログラム内で指定したデータ型とフィールド名が使われている。 |
a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = foreach a generate pigudf.SplitUDF(text); c = foreach b generate flatten($0); grunt> describe b
b: {pigudf.splitudf_text_9: {token: chararray}}
grunt> describe c
c: {pigudf.splitudf_text_12::token: chararray}
|
foreachでフィールド名を付けなかった場合、 プログラム内で付けたバッグのデフォルト名が使われる。 末尾の数値はユニークIDらしいが describeする度に変わっていくので名前としては使えないような…。 サンプルで使われているからgetSchemaName()を使ってみたが、 getSchemaName()を使わずに固定でデフォルト名を付けるのが良さそう。 (むしろTOKENIZEはそうなっている) |
複数の項目(カラム・列)のデータを返すには、exec()の戻り値をタプルにする。[2011-09-22]
package pigudf;
import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
public class ColsUDF extends EvalFunc<Tuple> {
TupleFactory tupleFactory = TupleFactory.getInstance();
// 関数の1個目の引数に対し、その文字列そのもの・大文字変換・小文字変換した文字列を返す
@Override
public Tuple exec(Tuple input) throws IOException {
if (input == null || input.size() == 0) {
return null;
}
Tuple r = tupleFactory.newTuple();
Object obj = input.get(0);
String s = (obj == null) ? "" : obj.toString();
r.append(s);
r.append(s.toUpperCase());
r.append(s.toLowerCase());
return r;
}
@Override
public Schema outputSchema(Schema input) {
// タプル名
String name = "";
try {
FieldSchema fs = input.getField(0);
name = fs.alias;
} catch (Exception e) { }
try {
Schema schema = new Schema();
// タプル内の項目の定義(項目名とデータ型)
schema.add(new Schema.FieldSchema("src", DataType.CHARARRAY));
schema.add(new Schema.FieldSchema("upper", DataType.CHARARRAY));
schema.add(new Schema.FieldSchema("lower", DataType.CHARARRAY));
return new Schema(new Schema.FieldSchema(name, schema, DataType.TUPLE));
} catch (Exception e) {
return null;
}
}
}
| 実行例 | 備考 |
|---|---|
a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = foreach a generate pigudf.ColsUDF(text); c = foreach b generate text.src, text.upper, text.lower; grunt> describe b
b: {text: (src: chararray,upper: chararray,lower: chararray)}
grunt> describe c
c: {src: chararray,upper: chararray,lower: chararray}
|
このoutputSchemaでは、 引数のフィールドに名前(左記のtext)が付いている場合、 それをタプル名に使うようになっている。(fs.aliasの部分) |
a = load '/cygwin/tmp/hello.txt'; b = foreach a generate pigudf.ColsUDF($0); c = foreach b generate $0.src, $0.upper, $0.lower; grunt> describe b
b: {(src: chararray,upper: chararray,lower: chararray)}
grunt> describe c
c: {src: chararray,upper: chararray,lower: chararray}
|
outputSchema内でタプル名を空文字列にした場合、 describeで見たときに名前が付いていない。 「$0」等の番号で指定する。 |
a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = foreach a generate flatten(pigudf.ColsUDF(text)); c = foreach b generate text::src, text::upper, text::lower; c2= foreach b generate src, upper, lower; grunt> describe b
b: {text::src: chararray,text::upper: chararray,text::lower: chararray}
grunt> describe c
c: {text::src: chararray,text::upper: chararray,text::lower: chararray}
grunt> describe c2
c2: {text::src: chararray,text::upper: chararray,text::lower: chararray}
|
flattenを使うと、フィールド名が「タプル名::名前」になる。この場合、「タプル名 ::」を省略して使うことが出来るが、実際にはタプル名が付いて回るようだ。 |
a = load '/cygwin/tmp/hello.txt'; b = foreach a generate flatten(pigudf.ColsUDF($0)); c = foreach b generate src, upper, lower; grunt> describe b
b: {src: chararray,upper: chararray,lower: chararray}
grunt> describe c
c: {src: chararray,upper: chararray,lower: chararray}
|
outputSchema内でタプル名を空文字列にしている場合、 flattenするとタプル名が消える。 |
フィルター(抽出条件)で使用する関数のクラスは、org.apache.pig.FilterFuncを継承する。
(実質的には、EvalFuncの戻り値がBooleanになっているだけ)
package pigudf;
import java.io.IOException; import org.apache.pig.FilterFunc; import org.apache.pig.data.Tuple;
public class StartsWith extends FilterFunc {
@Override
public Boolean exec(Tuple input) throws IOException {
if (input == null || input.size() < 2) {
return null;
}
String s = (String) input.get(0);
String t = (String) input.get(1);
return s.startsWith(t);
}
}
register /cygwin/tmp/pigudf.jar a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = filter a by not pigudf.StartsWith(text, '#');
grunt> dump b 〜 (Hello World) (Hello Hive) (Hello Pig)