S-JIS[2011-07-23/2011-09-14] 変更履歴
|
ここではWindowsのHadoop単独環境を想定している。
使用したのはHadoop0.21.0。
Eclipseでのコンパイルに必要なのは、Hadoop本体のjarファイルと、HADOOP_HOME/lib直下のjarファイル全部。
まずはJavaで書いたWordCountをそのままScalaに変換してみる。
インポート部分はScalaの機能を使ってちょっと短縮できる。
import java.util.StringTokenizer import org.apache.hadoop.conf.{ Configuration, Configured } import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{ IntWritable, LongWritable, Text } import org.apache.hadoop.mapreduce.{ Job, Mapper, Reducer } import org.apache.hadoop.mapreduce.lib.input.{ FileInputFormat, TextInputFormat } import org.apache.hadoop.mapreduce.lib.output.{ FileOutputFormat, TextOutputFormat } import org.apache.hadoop.util.{ Tool, ToolRunner } import scala.collection.JavaConverters._
全体はobjectとする。
object WordCount extends Configured with Tool { def main(args: Array[String]) { exit(ToolRunner.run(this, args)) } 〜 }
Javaの場合、objectでなくclassとなる為、ToolRunner#run()の第1引数は「new WordCount()」のようになる。
Scalaのobjectは(シングルトン)インスタンスなので、newしなくてもthisでそのまま渡せる。
Mapクラスはobject WordCount{ }の中に書いても大丈夫。
何を気にしているかというと、MapはHadoopが引数なしのコンストラクターを使ってインスタンス生成する為。
(Javaの内部クラスは、コンストラクターが暗黙の引数として外側クラスのインスタンスを要求する。staticクラスにするとそれは無くなるので、JavaでWordCountクラス内にMapクラスを書くときはstaticクラスにする)
Scalaの場合、objectの中で定義したclassはJavaのstaticクラスになるので問題ない。
class Map extends Mapper[LongWritable, Text, Text, IntWritable] { val word = new Text val one = new IntWritable(1) override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) { val line = value.toString val tokenizer = new StringTokenizer(line) while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()) context.write(word, one) } } }
mapメソッドの第3引数の型が、Javaだと「Context」のみで済むのに、Scalaだと親クラス(と型)を明記しなければならない。
これはScalaの内部クラスの仕様がJavaとは異なる為なので、仕方ない。
ただ、Scalaではtypeを使って別名を付けられるので、多少は書きやすくなる。
別名利用例 | 備考 |
---|---|
class Map extends Mapper[LongWritable, Text, Text, IntWritable] { type Context = Mapper[LongWritable, Text, Text, IntWritable]#Context override def map(〜, context: Context) { 〜 } } |
|
type M = Mapper[LongWritable, Text, Text, IntWritable] class Map extends M { override def map(〜, context: M#Context) { 〜 } } |
ただし、typeはトップレベルでは記述できない。 左記の例はobject WordCount{ }内でないとダメ。 |
また、StringTokenizerを使うとwhileを使う必要が出てきてしまうので、別の方法を考えたい。
例 | 備考 |
---|---|
line.split("[ \t]+").foreach { token => word.set(token) context.write(word, one) } |
String#split()で分割する方法。 (でもこれを使うなら、最初からJavaのサンプルもこれを使ってると思うんだよね) |
val tokenizer = new StringTokenizer(line) tokenizer.asScala.foreach { obj => word.set(obj.asInstanceOf[String]) context.write(word, one) } |
StringTokenizerを生かす方向なら、JavaConvertersのasScalaを使ってコレクションに変換する方法が考えられる。 ただ、StringTokenizerの場合は型がStringでなくObjectになってしまうようなので、Stringに変換(キャスト)してやる必要がある。 (toStringでもいいけどさ^^;) |
val tokenizer = new StringTokenizer(line) tokenizer.asScala.foreach { case token: String => word.set(token) context.write(word, one) } |
asInstanceOfはあまり使いたくない(Scalaで使うと敗北っぽく感じる(爆))ので、match〜caseを使ってみる。 (でもこれも型チェックが入るからあまり使いたくないけどね(苦笑)) |
Reduceクラスも、書き方の注意点はMapクラスと同じ。
class Reduce extends Reducer[Text, IntWritable, Text, IntWritable] { type Context = Reducer[Text, IntWritable, Text, IntWritable]#Context override def reduce(key: Text, values: java.lang.Iterable[IntWritable], context: Context) { val sum = values.asScala.map(_.get).sum context.write(key, new IntWritable(sum)) } }
reduceメソッドの第2引数はJavaのIterableだが、ScalaにもIterableがあって暗黙にインポートされているので、FQCNで指定する必要がある。
集計は、一番分かりやすい書き方にしてみた。→集計方法いろいろ
map関数内の「_.get
」は、「value => value.get()
」と同じ。(プレースホルダー構文)
ジョブ実行部分。
def run(args: Array[String]): Int = { val conf = getConf() val job = new Job(conf, "WordCount") job.setJarByClass(getClass) job.setOutputKeyClass(classOf[Text]) job.setOutputValueClass(classOf[IntWritable]) job.setMapperClass(classOf[Map]) job.setCombinerClass(classOf[Reduce]) job.setReducerClass(classOf[Reduce]) job.setInputFormatClass(classOf[TextInputFormat]) job.setOutputFormatClass(classOf[TextOutputFormat[Text, Text]]) FileInputFormat.setInputPaths(job, new Path(args(0))) FileOutputFormat.setOutputPath(job, new Path(args(1))) val success = job.waitForCompletion(true) if (success) 0 else 1 }
JavaではクラスのClassを取得するのに「クラス.class
」と書くが、Scalaでは「classOf[クラス]
」。
Scala2.9.0では、job.setMapperClass(classOf[Map])
がコンパイルエラーになるバグがあるので、
classOf[]の後ろに「.asInstanceOf[Class[Nothing]]
」というのをくっつける。[/2011-09-14]
job.setMapperClass(classOf[Map].asInstanceOf[Class[Nothing]]) job.setCombinerClass(classOf[Reduce].asInstanceOf[Class[Nothing]]) job.setReducerClass(classOf[Reduce].asInstanceOf[Class[Nothing]]) job.setInputFormatClass(classOf[TextInputFormat].asInstanceOf[Class[Nothing]]) job.setOutputFormatClass(classOf[TextOutputFormat[Text, Text]].asInstanceOf[Class[Nothing]])
これが無いと、
type mismatch; found :
java.lang.Class[org.apache.hadoop.mapreduce.lib.input.TextInputFormat](classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat]) required: java.lang.Class[_ <: org.apache.hadoop.mapreduce.InputFormat]
というコンパイルエラーが発生する。
「Class[_ <: InputFormat]
が必要なのに、Class[TextInputFormat]
が指定されてるぜ」というエラー。
TextInputFormatはInputFormatのサブクラス(TextInputFormat <: InputFormat
)なので、正しいはずなんだけどね。
これはScala2.9.0のバグ(SI-4603)らしく、とりあえずの回避方法が、上記の様に「.asInstanceOf[Class[Nothing]]
」を付けるというもの。(by kmizuさんのツイート)
(Scala2.8では問題ないらしい)
このバグはScala2.9.1で解消されたので、「.asInstanceOf[Class[Nothing]]
」は無くても大丈夫。[2011-09-14]
Scalaで作っているので、Hadoop実行時にScalaのライブラリーをクラスパスに追加してやる必要がある。
HADOOP_CLASSPATH+=:$(cygpath -u 'C:\eclipse\configuration\org.eclipse.osgi\bundles\327\1\.cp\lib\scala-library.jar')
※Windows用の設定。EclipseのScalaプラグインで使っているscala-library.jarをそのまま指定してみた。
(おまけ→EclipseのAntからHadoopのプログラムを実行するbuild.xml)
さて、ここからWordCountを改造してみる。
Scalaでは型の暗黙変換を定義できるので、Int⇔IntWritable、String⇔Textの変換
をやってみる。
これにより、数値(Int)や文字列(String)を直接context.write()に出力する(ように見せかける)ことが出来る。
implicit def string2writable(s: String) = new Text(s) implicit def writable2string(w: Text) = w.toString() implicit def int2writable(n: Int) = new IntWritable(n) implicit def writable2int(w: IntWritable) = w.get()
//Map value.toString().split("[ \t]+").foreach { token => context.write(new Text(token), new IntWritable(1)) } ↓ value.split("[ \t]+").foreach { token => context.write(token, 1) } ↓ value.split("[ \t]+").foreach { context.write(_, 1) }
implicit object IntWritableNumeric extends Numeric[IntWritable] { type T = IntWritable def plus(x: T, y: T): T = x + y def minus(x: T, y: T): T = x - y def times(x: T, y: T): T = x * y def negate(x: T): T = 0 - x def fromInt(x: Int): T = x def toInt(x: T): Int = x def toLong(x: T): Long = x.get def toFloat(x: T): Float = x.get def toDouble(x: T): Double = x.get def compare(x: T, y: T): Int = x.compareTo(y) }
//Reduce context.write(key, new IntWritable(values.asScala.map(_.get).sum)) ↓ context.write(key, values.asScala.sum)
sumを呼び出すためには(暗黙の)Numericオブジェクトが必要になるので、IntWritableNumericを定義した。
この中で実装している各メソッドも、先に定義したInt⇔IntWritableの暗黙変換を利用している。
うん、見た目はかなりすっきりした(笑)
しかし実行してみたら、IntWritableNumericのplusメソッドがスタックオーバーフロー(爆)
暗黙変換はそれ自体インスタンスを作るからなぁ(苦笑)
明示的に演算を記述してやったら、ちゃんと動いた。
def plus(x: T, y: T): T = new IntWritable(x.get + y.get)
こういったHadoop用の暗黙変換をライブラリー化したら便利そう。と思ったら、既にある。
→Jonhnny WeslleyさんのSHadoop
IntWritableやText以外にも、PathやUTF8・Iteratorまで、色々な暗黙変換が定義されている。(しかしさすがにsum用は無いな(笑))
(なお、同じ場所にSHadoopを使ったWordCountのソースが置いてあるが、旧APIを使用したもののようだ。(例えばデータ出力がcontext.write()でなくoutput.collect()))
(便利そうとは言え、暗黙変換だけのライブラリーなので、ScalaでHadoopをやるのにSHadoopが必須というわけではない)
Hadoopでは「Mapperの出力の型とReducerの入力の型が一致している必要がある」といった制約があり、実行時にチェックを行っている。
Scalaではtypeによって型の別名が付けられるので、これを使ってコンパイル時にチェックできそう。
→Scalaで型が一致しているかどうかをチェックする方法
type MKEYIN = LongWritable type MVALIN = Text type MKEYOUT = Text type MVALOUT = IntWritable class Map extends Mapper[MKEYIN, MVALIN, MKEYOUT, MVALOUT] { type Context = Mapper[MKEYIN, MVALIN, MKEYOUT, MVALOUT]#Context override def map(key: MKEYIN, value: MVALIN, context: Context) { 〜 } }
type RKEYIN = Text type RVALIN = IntWritable type RKEYOUT = Text type RVALOUT = IntWritable class Reduce extends Reducer[RKEYIN, RVALIN, RKEYOUT, RVALOUT] { type Context = Reducer[RKEYIN, RVALIN, RKEYOUT, RVALOUT]#Context override def reduce(key: RKEYIN, values: java.lang.Iterable[RVALIN], context: Context) { 〜 } }
def run(args: Array[String]): Int = { 〜 job.setOutputKeyClass(classOf[MKEYOUT]) job.setOutputValueClass(classOf[MVALOUT]) 〜 }
/** 型が一致しているかどうかのチェックを記述している。このメソッドを呼び出す必要は無い */ private def typeCheck = { implicitly[MKEYOUT =:= RKEYIN] implicitly[MVALOUT =:= RVALIN] }
もし型が異なっていたら、ちゃんとコンパイルエラーになってくれる。
というか、型を定義できるのだから、同じになるべき型を指定してやれば(わざわざチェックしなくて)いいような気も?
type RKEYIN = MKEYOUT type RVALIN = MVALOUT type RKEYOUT = RKEYIN type RVALOUT = RVALIN class Reduce extends Reducer[RKEYIN, RVALIN, RKEYOUT, RVALOUT] {
いやー、しかしこうなってくると、実際に何の型が指定されているのかパッと分からなくて、逆に不便かも(汗)
MapやReduce内のContextの別名定義は、Scala用のラッパートレイトを作り、その中で定義してしまえばいいんじゃないだろうか。
trait ScalaMapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT] extends Mapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT] { type Context = Mapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT]#Context } trait ScalaReducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT] extends Reducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT] { type Context = Reducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT]#Context }
class Map extends ScalaMapper[LongWritable, Text, Text, IntWritable] { override def map(key: LongWritable, value: Text, context: Context) { 〜 } } class Reduce extends ScalaReducer[Text, IntWritable, Text, IntWritable] { override def reduce(key: Text, values: java.lang.Iterable[IntWritable], context: Context) { 〜 } }
ついでに、型を返すようにも出来た。
trait ScalaMapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT] extends Mapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT] {
type Context = Mapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT]#Context
type keyOutType = KEYOUT
type valOutType = VALUEOUT
}
job.setOutputKeyClass(classOf[Map#keyOutType]) job.setOutputValueClass(classOf[Map#valOutType])
さらについでに、reduceメソッドの引数をjava.lang.IterableからScalaのIterableにしておこう。
trait ScalaReducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT] extends Reducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT] { type Context = Reducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT]#Context override protected def reduce(key: KEYIN, values: java.lang.Iterable[VALUEIN], context: Context) { import scala.collection.JavaConverters._ reduce(key, values.asScala, context) } protected def reduce(key: KEYIN, values: Iterable[VALUEIN], context: Context) { values.foreach { value => context.write(key.asInstanceOf[KEYOUT], value.asInstanceOf[VALUEOUT]) } } }
class Reduce extends ScalaReducer[Text, IntWritable, Text, IntWritable] { override def reduce(key: Text, values: Iterable[IntWritable], context: Context) { val sum = values.map(_.get).sum context.write(key, new IntWritable(sum)) } }
こういったHadoopのScala用ラッパークラスは やっぱり既に作っている人がいる。
→bsdfishさんのProgrammer Land: ScalaHadoop(ソース(ScalaHadoop.scala))
(さらにhito_asaさんがforkして改造しているらしい→ソース)
ジョブを作る側でもMapReduceTaskChainのようなクラスを用意して、「-->」でつないでいくような構成になっている。
Mapper・Reducerのラッパーにもメソッドが追加されていて、オーバーライドした際の引数が少なくて済む。
また、SHadoop由来の暗黙変換も取り込まれている。
サンプルがExamples.scalaに書かれていて、WordCountはTokenizerMap1とSumReducer1。
object TokenizerMap1 extends TypedMapper[LongWritable, Text, Text, LongWritable] { override def doMap : Unit = v split " |\t" foreach ((word) => context.write(word, 1L)) }
objectを使って定義しているね。(objectはgetClassで$付きの方のクラスが取れるから、Hadoopがインスタンス化しても大丈夫って感じなのかな)
TypedMapperというのがMapperのラッパークラス。
doMapというメソッドをオーバーライドして処理を記述している。引数が全く無いが(苦笑)、kとvとcontextというフィールドが定義されているので、その変数を使ってアクセスできる。
(引数があると型を書かないといけないから、こうなっていた方がある意味楽なのか^^;)
Reducerも基本構造は同様。
object SumReducer1 extends TypedReducer[Text, LongWritable, Text, LongWritable] { override def doReduce :Unit = context.write(k, (0L /: v) ((total, next) => total+next)) }
vはjava.lang.Iterableだが、SHadoop由来の暗黙変換でそのままfoleLeft関数(/:
)が呼べるようだ。
def run(args: Array[String]) : Int = { val c = MapReduceTaskChain.init() --> IO.Text[LongWritable, Text](args(0)).input --> MapReduceTask.MapReduceTask(TokenizerMap1, SumReducer) --> MapReduceTask.MapReduceTask(FlipKeyValueMap, WordListReducer) --> IO.Text[LongWritable, Text](args(1)).output; c.execute(); return 0; }
WordCountのジョブ(MapReduceTask)を実行した後に何か別のジョブを実行している例。
jobのsetほにゃらら
を全く書かなくていいのは楽だなぁ。