PigでWordCountを作ってみる。
|
|
と言ってもnokunoさんのPig, Hiveなど8種類のMapReduce言語についての比較にずばりPigのWordCountの例が載っていて、ほぼ改造のしようも無い^^;
a = load '/cygwin/tmp/hello.txt' as (text:chararray); b = foreach a generate flatten(TOKENIZE(text)) as word; c = group b by word; d = foreach c generate group as word, COUNT(b) as count; store d into '/cygwin/tmp/pig/wordcount/output';
aのas以降で、ファイルから読み込んだデータに項目名を付けている。
「text」が項目名で、「chararray」は文字列(String)を意味するデータ型。
TOKENIZEで単語毎に分割し、groupで集約キーを指定してCOUNTで件数を数えている。
bは二行に分けて書くことも出来る。
b0 = foreach a generate TOKENIZE(text) as words; b = foreach b0 generate flatten(words) as word;
grunt> cat /cygwin/tmp/hello.txt Hello World Hello Hive Hello Pig
grunt> dump d 〜 (Pig,1) (Hive,1) (Hello,3) (World,1)
grunt> ls /cygwin/tmp/pig/wordcount/output file:/cygwin/tmp/pig/wordcount/output/part-r-00000<r 1> 29 grunt> cat /cygwin/tmp/pig/wordcount/output/part-r-00000 Pig 1 Hive 1 Hello 3 World 1
dのCOUNT()の中がなぜbなのかはちょっと引っかかるかも。
これは、dの入力データであるcの定義を見てみると分かる。
grunt> describe c c: {group: chararray,b: {word: chararray}}
groupという項目とbという項目があることが分かる。
group演算においては、集約キーはgroupという名前になる。
集約された(グループ化された)データの名前は入力データのエイリアス名になる。したがってここではbになっている。
bの型は「{word: chararray}
」となっている。波括弧で囲まれているのはbag(かばん)を表す。
bagは複数個のデータ、いわば配列やListのようなもの。
ここでは“(chararray型の)wordという項目が複数入る”という意味になる。これもcの実行結果を見てみると分かりやすい。
grunt> dump c 〜 (Pig,{(Pig)}) (Hive,{(Hive)}) (Hello,{(Hello),(Hello),(Hello)}) (World,{(World)})
集約キー毎にデータの一覧が出来ているのが分かる。COUNTはこの一覧の個数を数える。
他人のプログラムをそのまま実行しただけでは芸が無いので、関数型言語のScalaと比較をしてみたい。
上記のWordCoutに、“「#」から始まる行はスキップする”という条件も加えてみる。
Pig | Scala | 備考 |
---|---|---|
a = load '/cygwin/tmp/hello.txt'; |
val a = scala.io.Source.fromFile("/cygwin/tmp/hello.txt"). getLines.toList |
ファイルからの読み込み。 |
(#comment) |
a: List[String] = |
|
b = filter a by SUBSTRING($0, 0, 1) != '#'; |
val b = a.filter(_.substring(0, 1) != "#") val b = a.filter( ! _.startsWith("#")) val b = a.filterNot(_.startsWith("#")) |
条件指定による抽出。 |
(Hello World) |
b: List[String] = |
|
c = foreach b generate TOKENIZE($0); |
val c = b.map(_.split(" ")) |
各データの変換(単語の分割)。 |
({(Hello),(World)}) |
c: List[Array[java.lang.String]] = |
|
d = foreach c generate flatten($0); d = foreach b generate flatten(TOKENIZE($0)); |
val d = c.flatMap(a => a) val d = b.flatMap(_.split(" ")) |
コレクションデータ(Pigではバッグ、Scalaでは配列)のフラット化。 |
(Hello) |
d: List[java.lang.String] = |
|
e = group d by $0; |
val e = d.groupBy(w => w) |
指定されたキー毎にリストを作るところが PigもScalaもそっくり。 |
(Pig,{(Pig)}) |
e:
scala.collection.immutable.Map[java.lang.String,List[java.lang.String]]
= |
|
f = foreach e generate group, COUNT(d); |
val f = e.mapValues(_.size) val f = e.mapValues{ list => list.size } val f = e.map{ case (group, list) => (group, list.size) } |
“キーとリスト”というデータに対し リスト部分を要素数に変換している。 |
(Pig,1) |
f:
scala.collection.immutable.Map[java.lang.String,Int] = |
Pigでは、名前の付いていない項目を$0,$1,$2,…で指定できる。
Scalaでは「_」で無名の引数を指定できる。
見ての通り、PigとScalaではデータを加工していく考え方が非常に似ている。
特にフラット化(flatMap)やグループ化(groupBy)は、Scalaのコレクション操作で慣れているとPigの動きも理解しやすい。
ちなみにScalaのListを使った操作では一行実行する度に処理を行って結果が返ってくるが、
Pigでは最後にdumpやstoreを実行するまで実際の処理は行われない。
(処理の定義だけ溜めておいて、まとめて実行する)
ScalaでもIteratorやStream等の遅延評価をするコレクションを使えば、同様の状態になる。