Pigのユーザー定義関数(User Defined Function:UDF)の作り方のメモ。
|
|
Pigで使う関数を作るには、org.apache.pig.EvalFuncを継承したクラスを作成し、exec()メソッドをオーバーライドする。
EvalFuncの型引数にはexec()の戻り型を指定する。
package pigudf;
import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple;
public class SampleUDF extends EvalFunc<String> { @Override public String exec(Tuple input) throws IOException { if (input == null || input.size() == 0) { return null; } String s = (String) input.get(0); return s + s; } }
exec()の引数Tupleには、関数が呼び出された際の引数が入ってくる。
必要となるjarファイルは以下の通り。
jarファイル | 備考 | Eclipseのソースの添付 |
---|---|---|
PIG_HOME/pig-0.8.1-core.jar | EvalFunc等のクラス | C:/cygwin/usr/local/pig-0.8.1/src |
Pigにはjarファイルを読み込ませるので、作ったクラスをjarファイル化しておく。
<?xml version="1.0" encoding="Shift_JIS"?> <project name="pig0.8.1" default="jar" basedir="."> <property name="src" location="../src" /> <property name="classes" location="../classes" /> <target name="jar"> <jar jarfile="C:/cygwin/tmp/pigudf.jar"> <fileset dir="${classes}" includes="**/*.class" /> <fileset dir="${src}" includes="**/*.java" /> </jar> </target> </project>
Pigから自作のユーザー定義関数を使用するには、まずクラスの入ったjarファイルを読み込ませる。
grunt> register /cygwin/tmp/pigudf.jar
関数を使用する際は(パッケージ名付きで)クラス名を指定する。
a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = foreach a generate pigudf.SampleUDF(text);
grunt> dump b 〜 (#comment#comment) (Hello WorldHello World) (Hello HiveHello Hive) (Hello PigHello Pig)
また、defineによって関数名を付けておくことも出来る。
define mul2 pigudf.SampleUDF(); c = foreach a generate mul2(text); dump c
複数行のデータ(複数のレコード)を返すには、exec()の戻り値をバッグにする。
package pigudf;
import java.io.IOException; import java.util.StringTokenizer; import org.apache.pig.EvalFunc; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory;
public class SplitUDF extends EvalFunc<DataBag> { BagFactory bagFactory = BagFactory.getInstance(); TupleFactory tupleFactory = TupleFactory.getInstance(); @Override public DataBag exec(Tuple input) throws IOException { if (input == null || input.size() == 0) { return null; } DataBag bag = bagFactory.newDefaultBag(); String s = (String) input.get(0); StringTokenizer tokenizer = new StringTokenizer(s); while (tokenizer.hasMoreTokens()) { String token = tokenizer.nextToken(); Tuple tuple = tupleFactory.newTuple(token); bag.add(tuple); } return bag; } }
bagにデータをaddしていき、そのbagを返す。
bagにはTupleを介してデータを入れる。
これで動作はするのだが、ただし、describeで見るとデータ型がnullになってしまう。
register /cygwin/tmp/pigudf.jar a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = foreach a generate pigudf.SplitUDF(text) as words; c = foreach b generate flatten(words) as word;
grunt> describe b b: {words: {null}} grunt> describe c c: {word: bytearray}
そこで、データ型を定義してやる。
関数の戻り値のデータ型を定義するには、outputSchema()というメソッドをオーバーライドする。
import org.apache.pig.data.DataType; import org.apache.pig.impl.logicalLayer.schema.Schema;
@Override public Schema outputSchema(Schema input) { try { Schema bagSchema = new Schema(); // バッグ内の項目の定義(項目名とデータ型) bagSchema.add(new Schema.FieldSchema("token", DataType.CHARARRAY)); return new Schema(new Schema.FieldSchema( getSchemaName(getClass().getName().toLowerCase(), input), bagSchema, DataType.BAG)); } catch (Exception e) { return null; } }
ちなみに、outputSchema()の引数には、Pigからの関数呼び出しの引数のデータ型が入ってくる。
直接returnするのはバッグの情報。
getSchemaName()はクラス名と入力データを利用した名前を返す。ここではデフォルトのバッグ名として使用している。
outputSchema()メソッドは、describeでも呼ばれる(データ型を表示するんだから当然)。
実行例 | 備考 |
---|---|
a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = foreach a generate pigudf.SplitUDF(text) as words; c = foreach b generate flatten(words) as word; grunt> describe b b: {words: {token: chararray}} grunt> describe c c: {word: chararray} |
プログラム内で指定したデータ型とフィールド名が使われている。 |
a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = foreach a generate pigudf.SplitUDF(text); c = foreach b generate flatten($0); grunt> describe b b: {pigudf.splitudf_text_9: {token: chararray}} grunt> describe c c: {pigudf.splitudf_text_12::token: chararray} |
foreachでフィールド名を付けなかった場合、 プログラム内で付けたバッグのデフォルト名が使われる。 末尾の数値はユニークIDらしいが describeする度に変わっていくので名前としては使えないような…。 サンプルで使われているからgetSchemaName()を使ってみたが、 getSchemaName()を使わずに固定でデフォルト名を付けるのが良さそう。 (むしろTOKENIZEはそうなっている) |
複数の項目(カラム・列)のデータを返すには、exec()の戻り値をタプルにする。[2011-09-22]
package pigudf;
import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
public class ColsUDF extends EvalFunc<Tuple> { TupleFactory tupleFactory = TupleFactory.getInstance();
// 関数の1個目の引数に対し、その文字列そのもの・大文字変換・小文字変換した文字列を返す @Override public Tuple exec(Tuple input) throws IOException { if (input == null || input.size() == 0) { return null; } Tuple r = tupleFactory.newTuple(); Object obj = input.get(0); String s = (obj == null) ? "" : obj.toString(); r.append(s); r.append(s.toUpperCase()); r.append(s.toLowerCase()); return r; }
@Override public Schema outputSchema(Schema input) { // タプル名 String name = ""; try { FieldSchema fs = input.getField(0); name = fs.alias; } catch (Exception e) { } try { Schema schema = new Schema(); // タプル内の項目の定義(項目名とデータ型) schema.add(new Schema.FieldSchema("src", DataType.CHARARRAY)); schema.add(new Schema.FieldSchema("upper", DataType.CHARARRAY)); schema.add(new Schema.FieldSchema("lower", DataType.CHARARRAY)); return new Schema(new Schema.FieldSchema(name, schema, DataType.TUPLE)); } catch (Exception e) { return null; } } }
実行例 | 備考 |
---|---|
a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = foreach a generate pigudf.ColsUDF(text); c = foreach b generate text.src, text.upper, text.lower; grunt> describe b b: {text: (src: chararray,upper: chararray,lower: chararray)} grunt> describe c c: {src: chararray,upper: chararray,lower: chararray} |
このoutputSchemaでは、 引数のフィールドに名前(左記のtext)が付いている場合、 それをタプル名に使うようになっている。(fs.aliasの部分) |
a = load '/cygwin/tmp/hello.txt'; b = foreach a generate pigudf.ColsUDF($0); c = foreach b generate $0.src, $0.upper, $0.lower; grunt> describe b b: {(src: chararray,upper: chararray,lower: chararray)} grunt> describe c c: {src: chararray,upper: chararray,lower: chararray} |
outputSchema内でタプル名を空文字列にした場合、 describeで見たときに名前が付いていない。 「$0」等の番号で指定する。 |
a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = foreach a generate flatten(pigudf.ColsUDF(text)); c = foreach b generate text::src, text::upper, text::lower; c2= foreach b generate src, upper, lower; grunt> describe b b: {text::src: chararray,text::upper: chararray,text::lower: chararray} grunt> describe c c: {text::src: chararray,text::upper: chararray,text::lower: chararray} grunt> describe c2 c2: {text::src: chararray,text::upper: chararray,text::lower: chararray} |
flattenを使うと、フィールド名が「タプル名:: 名前」になる。この場合、「タプル名 :: 」を省略して使うことが出来るが、実際にはタプル名が付いて回るようだ。 |
a = load '/cygwin/tmp/hello.txt'; b = foreach a generate flatten(pigudf.ColsUDF($0)); c = foreach b generate src, upper, lower; grunt> describe b b: {src: chararray,upper: chararray,lower: chararray} grunt> describe c c: {src: chararray,upper: chararray,lower: chararray} |
outputSchema内でタプル名を空文字列にしている場合、 flattenするとタプル名が消える。 |
フィルター(抽出条件)で使用する関数のクラスは、org.apache.pig.FilterFuncを継承する。
(実質的には、EvalFuncの戻り値がBooleanになっているだけ)
package pigudf;
import java.io.IOException; import org.apache.pig.FilterFunc; import org.apache.pig.data.Tuple;
public class StartsWith extends FilterFunc { @Override public Boolean exec(Tuple input) throws IOException { if (input == null || input.size() < 2) { return null; } String s = (String) input.get(0); String t = (String) input.get(1); return s.startsWith(t); } }
register /cygwin/tmp/pigudf.jar a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = filter a by not pigudf.StartsWith(text, '#');
grunt> dump b 〜 (Hello World) (Hello Hive) (Hello Pig)