S-JIS[2011-07-23/2011-09-14] 変更履歴
|
ここではWindowsのHadoop単独環境を想定している。
使用したのはHadoop0.21.0。
Eclipseでのコンパイルに必要なのは、Hadoop本体のjarファイルと、HADOOP_HOME/lib直下のjarファイル全部。
まずはJavaで書いたWordCountをそのままScalaに変換してみる。
インポート部分はScalaの機能を使ってちょっと短縮できる。
import java.util.StringTokenizer
import org.apache.hadoop.conf.{ Configuration, Configured }
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ IntWritable, LongWritable, Text }
import org.apache.hadoop.mapreduce.{ Job, Mapper, Reducer }
import org.apache.hadoop.mapreduce.lib.input.{ FileInputFormat, TextInputFormat }
import org.apache.hadoop.mapreduce.lib.output.{ FileOutputFormat, TextOutputFormat }
import org.apache.hadoop.util.{ Tool, ToolRunner }
import scala.collection.JavaConverters._
全体はobjectとする。
object WordCount extends Configured with Tool {
def main(args: Array[String]) {
exit(ToolRunner.run(this, args))
}
〜
}
Javaの場合、objectでなくclassとなる為、ToolRunner#run()の第1引数は「new WordCount()」のようになる。
Scalaのobjectは(シングルトン)インスタンスなので、newしなくてもthisでそのまま渡せる。
Mapクラスはobject WordCount{ }の中に書いても大丈夫。
何を気にしているかというと、MapはHadoopが引数なしのコンストラクターを使ってインスタンス生成する為。
(Javaの内部クラスは、コンストラクターが暗黙の引数として外側クラスのインスタンスを要求する。staticクラスにするとそれは無くなるので、JavaでWordCountクラス内にMapクラスを書くときはstaticクラスにする)
Scalaの場合、objectの中で定義したclassはJavaのstaticクラスになるので問題ない。
class Map extends Mapper[LongWritable, Text, Text, IntWritable] {
val word = new Text
val one = new IntWritable(1)
override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) {
val line = value.toString
val tokenizer = new StringTokenizer(line)
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken())
context.write(word, one)
}
}
}
mapメソッドの第3引数の型が、Javaだと「Context」のみで済むのに、Scalaだと親クラス(と型)を明記しなければならない。
これはScalaの内部クラスの仕様がJavaとは異なる為なので、仕方ない。
ただ、Scalaではtypeを使って別名を付けられるので、多少は書きやすくなる。
| 別名利用例 | 備考 |
|---|---|
class Map extends Mapper[LongWritable, Text, Text, IntWritable] {
type Context = Mapper[LongWritable, Text, Text, IntWritable]#Context
override def map(〜, context: Context) {
〜
}
}
|
|
type M = Mapper[LongWritable, Text, Text, IntWritable] class Map extends M { override def map(〜, context: M#Context) { 〜 } } |
ただし、typeはトップレベルでは記述できない。 左記の例はobject WordCount{ }内でないとダメ。 |
また、StringTokenizerを使うとwhileを使う必要が出てきてしまうので、別の方法を考えたい。
| 例 | 備考 |
|---|---|
line.split("[ \t]+").foreach { token =>
word.set(token)
context.write(word, one)
}
|
String#split()で分割する方法。 (でもこれを使うなら、最初からJavaのサンプルもこれを使ってると思うんだよね) |
val tokenizer = new StringTokenizer(line)
tokenizer.asScala.foreach { obj =>
word.set(obj.asInstanceOf[String])
context.write(word, one)
}
|
StringTokenizerを生かす方向なら、JavaConvertersのasScalaを使ってコレクションに変換する方法が考えられる。 ただ、StringTokenizerの場合は型がStringでなくObjectになってしまうようなので、Stringに変換(キャスト)してやる必要がある。 (toStringでもいいけどさ^^;) |
val tokenizer = new StringTokenizer(line)
tokenizer.asScala.foreach {
case token: String =>
word.set(token)
context.write(word, one)
}
|
asInstanceOfはあまり使いたくない(Scalaで使うと敗北っぽく感じる(爆))ので、match〜caseを使ってみる。 (でもこれも型チェックが入るからあまり使いたくないけどね(苦笑)) |
Reduceクラスも、書き方の注意点はMapクラスと同じ。
class Reduce extends Reducer[Text, IntWritable, Text, IntWritable] {
type Context = Reducer[Text, IntWritable, Text, IntWritable]#Context
override def reduce(key: Text, values: java.lang.Iterable[IntWritable], context: Context) {
val sum = values.asScala.map(_.get).sum
context.write(key, new IntWritable(sum))
}
}
reduceメソッドの第2引数はJavaのIterableだが、ScalaにもIterableがあって暗黙にインポートされているので、FQCNで指定する必要がある。
集計は、一番分かりやすい書き方にしてみた。→集計方法いろいろ
map関数内の「_.get」は、「value => value.get()」と同じ。(プレースホルダー構文)
ジョブ実行部分。
def run(args: Array[String]): Int = {
val conf = getConf()
val job = new Job(conf, "WordCount")
job.setJarByClass(getClass)
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[IntWritable])
job.setMapperClass(classOf[Map])
job.setCombinerClass(classOf[Reduce])
job.setReducerClass(classOf[Reduce])
job.setInputFormatClass(classOf[TextInputFormat])
job.setOutputFormatClass(classOf[TextOutputFormat[Text, Text]])
FileInputFormat.setInputPaths(job, new Path(args(0)))
FileOutputFormat.setOutputPath(job, new Path(args(1)))
val success = job.waitForCompletion(true)
if (success) 0 else 1
}
JavaではクラスのClassを取得するのに「クラス.class」と書くが、Scalaでは「classOf[クラス]」。
Scala2.9.0では、job.setMapperClass(classOf[Map])がコンパイルエラーになるバグがあるので、
classOf[]の後ろに「.asInstanceOf[Class[Nothing]]」というのをくっつける。[/2011-09-14]
job.setMapperClass(classOf[Map].asInstanceOf[Class[Nothing]]) job.setCombinerClass(classOf[Reduce].asInstanceOf[Class[Nothing]]) job.setReducerClass(classOf[Reduce].asInstanceOf[Class[Nothing]]) job.setInputFormatClass(classOf[TextInputFormat].asInstanceOf[Class[Nothing]]) job.setOutputFormatClass(classOf[TextOutputFormat[Text, Text]].asInstanceOf[Class[Nothing]])
これが無いと、
type mismatch; found :
java.lang.Class[org.apache.hadoop.mapreduce.lib.input.TextInputFormat](classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat]) required: java.lang.Class[_ <: org.apache.hadoop.mapreduce.InputFormat]
というコンパイルエラーが発生する。
「Class[_ <: InputFormat]が必要なのに、Class[TextInputFormat]が指定されてるぜ」というエラー。
TextInputFormatはInputFormatのサブクラス(TextInputFormat <: InputFormat)なので、正しいはずなんだけどね。
これはScala2.9.0のバグ(SI-4603)らしく、とりあえずの回避方法が、上記の様に「.asInstanceOf[Class[Nothing]]」を付けるというもの。(by kmizuさんのツイート)
(Scala2.8では問題ないらしい)
このバグはScala2.9.1で解消されたので、「.asInstanceOf[Class[Nothing]]」は無くても大丈夫。[2011-09-14]
Scalaで作っているので、Hadoop実行時にScalaのライブラリーをクラスパスに追加してやる必要がある。
HADOOP_CLASSPATH+=:$(cygpath -u 'C:\eclipse\configuration\org.eclipse.osgi\bundles\327\1\.cp\lib\scala-library.jar')
※Windows用の設定。EclipseのScalaプラグインで使っているscala-library.jarをそのまま指定してみた。
(おまけ→EclipseのAntからHadoopのプログラムを実行するbuild.xml)
さて、ここからWordCountを改造してみる。
Scalaでは型の暗黙変換を定義できるので、Int⇔IntWritable、String⇔Textの変換
をやってみる。
これにより、数値(Int)や文字列(String)を直接context.write()に出力する(ように見せかける)ことが出来る。
implicit def string2writable(s: String) = new Text(s) implicit def writable2string(w: Text) = w.toString() implicit def int2writable(n: Int) = new IntWritable(n) implicit def writable2int(w: IntWritable) = w.get()
//Map
value.toString().split("[ \t]+").foreach { token => context.write(new Text(token), new IntWritable(1)) }
↓
value.split("[ \t]+").foreach { token => context.write(token, 1) }
↓
value.split("[ \t]+").foreach { context.write(_, 1) }
implicit object IntWritableNumeric extends Numeric[IntWritable] {
type T = IntWritable
def plus(x: T, y: T): T = x + y
def minus(x: T, y: T): T = x - y
def times(x: T, y: T): T = x * y
def negate(x: T): T = 0 - x
def fromInt(x: Int): T = x
def toInt(x: T): Int = x
def toLong(x: T): Long = x.get
def toFloat(x: T): Float = x.get
def toDouble(x: T): Double = x.get
def compare(x: T, y: T): Int = x.compareTo(y)
}
//Reduce
context.write(key, new IntWritable(values.asScala.map(_.get).sum))
↓
context.write(key, values.asScala.sum)
sumを呼び出すためには(暗黙の)Numericオブジェクトが必要になるので、IntWritableNumericを定義した。
この中で実装している各メソッドも、先に定義したInt⇔IntWritableの暗黙変換を利用している。
うん、見た目はかなりすっきりした(笑)
しかし実行してみたら、IntWritableNumericのplusメソッドがスタックオーバーフロー(爆)
暗黙変換はそれ自体インスタンスを作るからなぁ(苦笑)
明示的に演算を記述してやったら、ちゃんと動いた。
def plus(x: T, y: T): T = new IntWritable(x.get + y.get)
こういったHadoop用の暗黙変換をライブラリー化したら便利そう。と思ったら、既にある。
→Jonhnny WeslleyさんのSHadoop
IntWritableやText以外にも、PathやUTF8・Iteratorまで、色々な暗黙変換が定義されている。(しかしさすがにsum用は無いな(笑))
(なお、同じ場所にSHadoopを使ったWordCountのソースが置いてあるが、旧APIを使用したもののようだ。(例えばデータ出力がcontext.write()でなくoutput.collect()))
(便利そうとは言え、暗黙変換だけのライブラリーなので、ScalaでHadoopをやるのにSHadoopが必須というわけではない)
Hadoopでは「Mapperの出力の型とReducerの入力の型が一致している必要がある」といった制約があり、実行時にチェックを行っている。
Scalaではtypeによって型の別名が付けられるので、これを使ってコンパイル時にチェックできそう。
→Scalaで型が一致しているかどうかをチェックする方法
type MKEYIN = LongWritable
type MVALIN = Text
type MKEYOUT = Text
type MVALOUT = IntWritable
class Map extends Mapper[MKEYIN, MVALIN, MKEYOUT, MVALOUT] {
type Context = Mapper[MKEYIN, MVALIN, MKEYOUT, MVALOUT]#Context
override def map(key: MKEYIN, value: MVALIN, context: Context) {
〜
}
}
type RKEYIN = Text
type RVALIN = IntWritable
type RKEYOUT = Text
type RVALOUT = IntWritable
class Reduce extends Reducer[RKEYIN, RVALIN, RKEYOUT, RVALOUT] {
type Context = Reducer[RKEYIN, RVALIN, RKEYOUT, RVALOUT]#Context
override def reduce(key: RKEYIN, values: java.lang.Iterable[RVALIN], context: Context) {
〜
}
}
def run(args: Array[String]): Int = {
〜
job.setOutputKeyClass(classOf[MKEYOUT])
job.setOutputValueClass(classOf[MVALOUT])
〜
}
/** 型が一致しているかどうかのチェックを記述している。このメソッドを呼び出す必要は無い */
private def typeCheck = {
implicitly[MKEYOUT =:= RKEYIN]
implicitly[MVALOUT =:= RVALIN]
}
もし型が異なっていたら、ちゃんとコンパイルエラーになってくれる。
というか、型を定義できるのだから、同じになるべき型を指定してやれば(わざわざチェックしなくて)いいような気も?
type RKEYIN = MKEYOUT
type RVALIN = MVALOUT
type RKEYOUT = RKEYIN
type RVALOUT = RVALIN
class Reduce extends Reducer[RKEYIN, RVALIN, RKEYOUT, RVALOUT] {
いやー、しかしこうなってくると、実際に何の型が指定されているのかパッと分からなくて、逆に不便かも(汗)
MapやReduce内のContextの別名定義は、Scala用のラッパートレイトを作り、その中で定義してしまえばいいんじゃないだろうか。
trait ScalaMapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT] extends Mapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT] {
type Context = Mapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT]#Context
}
trait ScalaReducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT] extends Reducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT] {
type Context = Reducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT]#Context
}
class Map extends ScalaMapper[LongWritable, Text, Text, IntWritable] {
override def map(key: LongWritable, value: Text, context: Context) {
〜
}
}
class Reduce extends ScalaReducer[Text, IntWritable, Text, IntWritable] {
override def reduce(key: Text, values: java.lang.Iterable[IntWritable], context: Context) {
〜
}
}
ついでに、型を返すようにも出来た。
trait ScalaMapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT] extends Mapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT] {
type Context = Mapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT]#Context
type keyOutType = KEYOUT
type valOutType = VALUEOUT
}
job.setOutputKeyClass(classOf[Map#keyOutType]) job.setOutputValueClass(classOf[Map#valOutType])
さらについでに、reduceメソッドの引数をjava.lang.IterableからScalaのIterableにしておこう。
trait ScalaReducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT] extends Reducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT] {
type Context = Reducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT]#Context
override protected def reduce(key: KEYIN, values: java.lang.Iterable[VALUEIN], context: Context) {
import scala.collection.JavaConverters._
reduce(key, values.asScala, context)
}
protected def reduce(key: KEYIN, values: Iterable[VALUEIN], context: Context) {
values.foreach { value => context.write(key.asInstanceOf[KEYOUT], value.asInstanceOf[VALUEOUT]) }
}
}
class Reduce extends ScalaReducer[Text, IntWritable, Text, IntWritable] {
override def reduce(key: Text, values: Iterable[IntWritable], context: Context) {
val sum = values.map(_.get).sum
context.write(key, new IntWritable(sum))
}
}
こういったHadoopのScala用ラッパークラスは やっぱり既に作っている人がいる。
→bsdfishさんのProgrammer Land: ScalaHadoop(ソース(ScalaHadoop.scala))
(さらにhito_asaさんがforkして改造しているらしい→ソース)
ジョブを作る側でもMapReduceTaskChainのようなクラスを用意して、「-->」でつないでいくような構成になっている。
Mapper・Reducerのラッパーにもメソッドが追加されていて、オーバーライドした際の引数が少なくて済む。
また、SHadoop由来の暗黙変換も取り込まれている。
サンプルがExamples.scalaに書かれていて、WordCountはTokenizerMap1とSumReducer1。
object TokenizerMap1 extends TypedMapper[LongWritable, Text, Text, LongWritable] {
override def doMap : Unit = v split " |\t" foreach ((word) => context.write(word, 1L))
}
objectを使って定義しているね。(objectはgetClassで$付きの方のクラスが取れるから、Hadoopがインスタンス化しても大丈夫って感じなのかな)
TypedMapperというのがMapperのラッパークラス。
doMapというメソッドをオーバーライドして処理を記述している。引数が全く無いが(苦笑)、kとvとcontextというフィールドが定義されているので、その変数を使ってアクセスできる。
(引数があると型を書かないといけないから、こうなっていた方がある意味楽なのか^^;)
Reducerも基本構造は同様。
object SumReducer1 extends TypedReducer[Text, LongWritable, Text, LongWritable] {
override def doReduce :Unit = context.write(k, (0L /: v) ((total, next) => total+next))
}
vはjava.lang.Iterableだが、SHadoop由来の暗黙変換でそのままfoleLeft関数(/:)が呼べるようだ。
def run(args: Array[String]) : Int = {
val c = MapReduceTaskChain.init() -->
IO.Text[LongWritable, Text](args(0)).input -->
MapReduceTask.MapReduceTask(TokenizerMap1, SumReducer) -->
MapReduceTask.MapReduceTask(FlipKeyValueMap, WordListReducer) -->
IO.Text[LongWritable, Text](args(1)).output;
c.execute();
return 0;
}
WordCountのジョブ(MapReduceTask)を実行した後に何か別のジョブを実行している例。
jobのsetほにゃららを全く書かなくていいのは楽だなぁ。