S-JIS[2011-07-23/2011-09-14] 変更履歴

ScalaでHadoopのWordCount

ScalaHadoopWordCountを書いてみる。


Eclipseの環境設定

ここではWindowsのHadoop単独環境を想定している。

使用したのはHadoop0.21.0
Eclipseでのコンパイルに必要なのは、Hadoop本体のjarファイルと、HADOOP_HOME/lib直下のjarファイル全部。


JavaのWordCountそのまま

まずはJavaで書いたWordCountをそのままScalaに変換してみる。

ソース全体(WordCount0.scala)


インポート

インポート部分はScalaの機能を使ってちょっと短縮できる。

import java.util.StringTokenizer

import org.apache.hadoop.conf.{ Configuration, Configured }
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ IntWritable, LongWritable, Text }
import org.apache.hadoop.mapreduce.{ Job, Mapper, Reducer }
import org.apache.hadoop.mapreduce.lib.input.{ FileInputFormat, TextInputFormat }
import org.apache.hadoop.mapreduce.lib.output.{ FileOutputFormat, TextOutputFormat }
import org.apache.hadoop.util.{ Tool, ToolRunner }

import scala.collection.JavaConverters._

main

全体はobjectとする。

object WordCount extends Configured with Tool {

  def main(args: Array[String]) {
    exit(ToolRunner.run(this, args))
  }

〜
}

Javaの場合、objectでなくclassとなる為、ToolRunner#run()の第1引数は「new WordCount()」のようになる。
Scalaのobjectは(シングルトン)インスタンスなので、newしなくてもthisでそのまま渡せる。


Map

Mapクラスはobject WordCount{ }の中に書いても大丈夫。
何を気にしているかというと、MapはHadoopが引数なしのコンストラクターを使ってインスタンス生成する為。
Javaの内部クラスは、コンストラクターが暗黙の引数として外側クラスのインスタンスを要求する。staticクラスにするとそれは無くなるので、JavaでWordCountクラス内にMapクラスを書くときはstaticクラスにする)
Scalaの場合、objectの中で定義したclassJavaのstaticクラスになるので問題ない。

class Map extends Mapper[LongWritable, Text, Text, IntWritable] {
  val word = new Text
  val one = new IntWritable(1)

  override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) {
    val line = value.toString
    val tokenizer = new StringTokenizer(line)
    while (tokenizer.hasMoreTokens()) {
      word.set(tokenizer.nextToken())
      context.write(word, one)
    }
  }
}

mapメソッドの第3引数の型が、Javaだと「Context」のみで済むのに、Scalaだと親クラス(と型)を明記しなければならない。
これはScalaの内部クラスの仕様がJavaとは異なる為なので、仕方ない。
ただ、Scalaではtypeを使って別名を付けられるので、多少は書きやすくなる。

別名利用例 備考
class Map extends Mapper[LongWritable, Text, Text, IntWritable] {
  type Context = Mapper[LongWritable, Text, Text, IntWritable]#Context

  override def map(〜, context: Context) {
〜
  }
}
 
type M = Mapper[LongWritable, Text, Text, IntWritable]
class Map extends M {

  override def map(〜, context: M#Context) {
〜
  }
}
ただし、typeはトップレベルでは記述できない。
左記の例はobject WordCount{ }内でないとダメ。

また、StringTokenizerを使うとwhileを使う必要が出てきてしまうので、別の方法を考えたい。

備考
line.split("[ \t]+").foreach { token =>
  word.set(token)
  context.write(word, one)
}
String#split()で分割する方法。
(でもこれを使うなら、最初からJavaのサンプルもこれを使ってると思うんだよね)
val tokenizer = new StringTokenizer(line)
tokenizer.asScala.foreach { obj =>
  word.set(obj.asInstanceOf[String])
  context.write(word, one)
}
StringTokenizerを生かす方向なら、JavaConvertersのasScalaを使ってコレクションに変換する方法が考えられる。
ただ、StringTokenizerの場合は型がStringでなくObjectになってしまうようなので、Stringに変換(キャスト)してやる必要がある。
(toStringでもいいけどさ^^;)
val tokenizer = new StringTokenizer(line)
tokenizer.asScala.foreach {
  case token: String =>
    word.set(token)
    context.write(word, one)
}
asInstanceOfはあまり使いたくない(Scalaで使うと敗北っぽく感じる(爆))ので、match〜caseを使ってみる。
(でもこれも型チェックが入るからあまり使いたくないけどね(苦笑))

Reduce

Reduceクラスも、書き方の注意点はMapクラスと同じ。

class Reduce extends Reducer[Text, IntWritable, Text, IntWritable] {
  type Context = Reducer[Text, IntWritable, Text, IntWritable]#Context

  override def reduce(key: Text, values: java.lang.Iterable[IntWritable], context: Context) {
    val sum = values.asScala.map(_.get).sum
    context.write(key, new IntWritable(sum))
  }
}

reduceメソッドの第2引数はJavaのIterableだが、ScalaにもIterableがあって暗黙にインポートされているので、FQCNで指定する必要がある。

集計は、一番分かりやすい書き方にしてみた。→集計方法いろいろ
map関数内の「_.get」は、「value => value.get()」と同じ。(プレースホルダー構文)


run

ジョブ実行部分。

def run(args: Array[String]): Int = {
  val conf = getConf()
  val job = new Job(conf, "WordCount")
  job.setJarByClass(getClass)
  job.setOutputKeyClass(classOf[Text])
  job.setOutputValueClass(classOf[IntWritable])

  job.setMapperClass(classOf[Map])
  job.setCombinerClass(classOf[Reduce])
  job.setReducerClass(classOf[Reduce])

  job.setInputFormatClass(classOf[TextInputFormat])
  job.setOutputFormatClass(classOf[TextOutputFormat[Text, Text]])

  FileInputFormat.setInputPaths(job, new Path(args(0)))
  FileOutputFormat.setOutputPath(job, new Path(args(1)))

  val success = job.waitForCompletion(true)
  if (success) 0 else 1
}

JavaではクラスのClassを取得するのに「クラス.class」と書くが、Scalaでは「classOf[クラス]」。


Scala2.9.0では、job.setMapperClass(classOf[Map])がコンパイルエラーになるバグがあるので、
classOf[]の後ろに「.asInstanceOf[Class[Nothing]]」というのをくっつける。[/2011-09-14]

  job.setMapperClass(classOf[Map].asInstanceOf[Class[Nothing]])
  job.setCombinerClass(classOf[Reduce].asInstanceOf[Class[Nothing]])
  job.setReducerClass(classOf[Reduce].asInstanceOf[Class[Nothing]])

  job.setInputFormatClass(classOf[TextInputFormat].asInstanceOf[Class[Nothing]])
  job.setOutputFormatClass(classOf[TextOutputFormat[Text, Text]].asInstanceOf[Class[Nothing]])

これが無いと、
type mismatch; found : java.lang.Class[org.apache.hadoop.mapreduce.lib.input.TextInputFormat](classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat]) required: java.lang.Class[_ <: org.apache.hadoop.mapreduce.InputFormat]
というコンパイルエラーが発生する。
Class[_ <: InputFormat]が必要なのに、Class[TextInputFormat]が指定されてるぜ」というエラー。
TextInputFormatはInputFormatのサブクラス(TextInputFormat <: InputFormat)なので、正しいはずなんだけどね。
これはScala2.9.0のバグ(SI-4603)らしく、とりあえずの回避方法が、上記の様に「.asInstanceOf[Class[Nothing]]」を付けるというもの。(by kmizuさんのツイート
(Scala2.8では問題ないらしい)

このバグはScala2.9.1で解消されたので、「.asInstanceOf[Class[Nothing]]」は無くても大丈夫。[2011-09-14]


実行方法

Scalaで作っているので、Hadoop実行時にScalaのライブラリーをクラスパスに追加してやる必要がある。

HADOOP_HOME/conf/hadoop-env.sh:

HADOOP_CLASSPATH+=:$(cygpath -u 'C:\eclipse\configuration\org.eclipse.osgi\bundles\327\1\.cp\lib\scala-library.jar')

※Windows用の設定。EclipseのScalaプラグインで使っているscala-library.jarをそのまま指定してみた。

(おまけ→EclipseのAntからHadoopのプログラムを実行するbuild.xml


暗黙変換

さて、ここからWordCountを改造してみる。

Scalaでは型の暗黙変換を定義できるので、Int⇔IntWritable、String⇔Textの変換 をやってみる。
これにより、数値(Int)や文字列(String)を直接context.write()に出力する(ように見せかける)ことが出来る。

implicit def string2writable(s: String) = new Text(s)
implicit def writable2string(w: Text) = w.toString()
implicit def int2writable(n: Int) = new IntWritable(n)
implicit def writable2int(w: IntWritable) = w.get()
//Map
    value.toString().split("[ \t]+").foreach { token => context.write(new Text(token), new IntWritable(1)) }
↓
    value.split("[ \t]+").foreach { token => context.write(token, 1) }
↓
    value.split("[ \t]+").foreach { context.write(_, 1) }

implicit object IntWritableNumeric extends Numeric[IntWritable] {
  type T = IntWritable

  def plus(x: T, y: T): T = x + y
  def minus(x: T, y: T): T = x - y
  def times(x: T, y: T): T = x * y
  def negate(x: T): T = 0 - x
  def fromInt(x: Int): T = x
  def toInt(x: T): Int = x
  def toLong(x: T): Long = x.get
  def toFloat(x: T): Float = x.get
  def toDouble(x: T): Double = x.get

  def compare(x: T, y: T): Int = x.compareTo(y)
}
//Reduce
    context.write(key, new IntWritable(values.asScala.map(_.get).sum))
↓
    context.write(key, values.asScala.sum)

sumを呼び出すためには(暗黙の)Numericオブジェクトが必要になるので、IntWritableNumericを定義した。
この中で実装している各メソッドも、先に定義したInt⇔IntWritableの暗黙変換を利用している。

うん、見た目はかなりすっきりした(笑)

しかし実行してみたら、IntWritableNumericのplusメソッドがスタックオーバーフロー(爆)
暗黙変換はそれ自体インスタンスを作るからなぁ(苦笑)
明示的に演算を記述してやったら、ちゃんと動いた。

  def plus(x: T, y: T): T = new IntWritable(x.get + y.get)

ソース全体(WordCount1.scala)


SHadoop

こういったHadoop用の暗黙変換をライブラリー化したら便利そう。と思ったら、既にある。
→Jonhnny WeslleyさんのSHadoop

IntWritableやText以外にも、PathやUTF8・Iteratorまで、色々な暗黙変換が定義されている。(しかしさすがにsum用は無いな(笑))
(なお、同じ場所にSHadoopを使ったWordCountのソースが置いてあるが、旧APIを使用したもののようだ。(例えばデータ出力がcontext.write()でなくoutput.collect()))

(便利そうとは言え、暗黙変換だけのライブラリーなので、ScalaでHadoopをやるのにSHadoopが必須というわけではない)


型チェック

Hadoopでは「Mapperの出力の型とReducerの入力の型が一致している必要がある」といった制約があり、実行時にチェックを行っている。

Scalaではtypeによって型の別名が付けられるので、これを使ってコンパイル時にチェックできそう。
Scalaで型が一致しているかどうかをチェックする方法

type MKEYIN = LongWritable
type MVALIN = Text
type MKEYOUT = Text
type MVALOUT = IntWritable
class Map extends Mapper[MKEYIN, MVALIN, MKEYOUT, MVALOUT] {
  type Context = Mapper[MKEYIN, MVALIN, MKEYOUT, MVALOUT]#Context

  override def map(key: MKEYIN, value: MVALIN, context: Context) {
    〜
  }
}
type RKEYIN = Text
type RVALIN = IntWritable
type RKEYOUT = Text
type RVALOUT = IntWritable
class Reduce extends Reducer[RKEYIN, RVALIN, RKEYOUT, RVALOUT] {
  type Context = Reducer[RKEYIN, RVALIN, RKEYOUT, RVALOUT]#Context

  override def reduce(key: RKEYIN, values: java.lang.Iterable[RVALIN], context: Context) {
    〜
  }
}
def run(args: Array[String]): Int = {
〜
  job.setOutputKeyClass(classOf[MKEYOUT])
  job.setOutputValueClass(classOf[MVALOUT])
〜
}
/** 型が一致しているかどうかのチェックを記述している。このメソッドを呼び出す必要は無い */
private def typeCheck = {
  implicitly[MKEYOUT =:= RKEYIN]
  implicitly[MVALOUT =:= RVALIN]
}

もし型が異なっていたら、ちゃんとコンパイルエラーになってくれる。

ソース全体(WordCount2.scala)


というか、型を定義できるのだから、同じになるべき型を指定してやれば(わざわざチェックしなくて)いいような気も?

type RKEYIN = MKEYOUT
type RVALIN = MVALOUT
type RKEYOUT = RKEYIN
type RVALOUT = RVALIN
class Reduce extends Reducer[RKEYIN, RVALIN, RKEYOUT, RVALOUT] {

いやー、しかしこうなってくると、実際に何の型が指定されているのかパッと分からなくて、逆に不便かも(汗)


Scala用Mapper・Reducer

MapやReduce内のContextの別名定義は、Scala用のラッパートレイトを作り、その中で定義してしまえばいいんじゃないだろうか。

trait ScalaMapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT] extends Mapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT] {
  type Context = Mapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT]#Context
}

trait ScalaReducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT] extends Reducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT] {
  type Context = Reducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT]#Context
}
class Map extends ScalaMapper[LongWritable, Text, Text, IntWritable] {

  override def map(key: LongWritable, value: Text, context: Context) {
    〜
  }
}

class Reduce extends ScalaReducer[Text, IntWritable, Text, IntWritable] {

  override def reduce(key: Text, values: java.lang.Iterable[IntWritable], context: Context) {
    〜
  }
}

ついでに、型を返すようにも出来た。

trait ScalaMapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT] extends Mapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT] {
  type Context = Mapper[KEYIN, VALUEIN, KEYOUT, VALUEOUT]#Context

  type keyOutType = KEYOUT
  type valOutType = VALUEOUT
}
  job.setOutputKeyClass(classOf[Map#keyOutType])
  job.setOutputValueClass(classOf[Map#valOutType])

さらについでに、reduceメソッドの引数をjava.lang.IterableからScalaのIterableにしておこう。

trait ScalaReducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT] extends Reducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT] {
  type Context = Reducer[KEYIN, VALUEIN, KEYOUT, VALUEOUT]#Context

  override protected def reduce(key: KEYIN, values: java.lang.Iterable[VALUEIN], context: Context) {
    import scala.collection.JavaConverters._
    reduce(key, values.asScala, context)
  }

  protected def reduce(key: KEYIN, values: Iterable[VALUEIN], context: Context) {
    values.foreach { value => context.write(key.asInstanceOf[KEYOUT], value.asInstanceOf[VALUEOUT]) }
  }
}
class Reduce extends ScalaReducer[Text, IntWritable, Text, IntWritable] {

  override def reduce(key: Text, values: Iterable[IntWritable], context: Context) {
    val sum = values.map(_.get).sum
    context.write(key, new IntWritable(sum))
  }
}

ソース全体(WordCount4.scala)


ScalaHadoop

こういったHadoopのScala用ラッパークラスは やっぱり既に作っている人がいる。
→bsdfishさんのProgrammer Land: ScalaHadoopソースScalaHadoop.scala))
(さらにhito_asaさんがforkして改造しているらしい→ソース

ジョブを作る側でもMapReduceTaskChainのようなクラスを用意して、「-->」でつないでいくような構成になっている。
Mapper・Reducerのラッパーにもメソッドが追加されていて、オーバーライドした際の引数が少なくて済む。
また、SHadoop由来の暗黙変換も取り込まれている。


サンプルがExamples.scalaに書かれていて、WordCountはTokenizerMap1とSumReducer1。

object TokenizerMap1 extends TypedMapper[LongWritable, Text, Text, LongWritable] {
  override def doMap : Unit = v split " |\t" foreach ((word) => context.write(word, 1L))
}

objectを使って定義しているね。(objectはgetClassで$付きの方のクラスが取れるから、Hadoopがインスタンス化しても大丈夫って感じなのかな)
TypedMapperというのがMapperのラッパークラス。
doMapというメソッドをオーバーライドして処理を記述している。引数が全く無いが(苦笑)、kとvとcontextというフィールドが定義されているので、その変数を使ってアクセスできる。
(引数があると型を書かないといけないから、こうなっていた方がある意味楽なのか^^;)


Reducerも基本構造は同様。

object SumReducer1 extends TypedReducer[Text, LongWritable, Text, LongWritable] {
  override def doReduce :Unit = context.write(k, (0L /: v) ((total, next) => total+next))
}

vはjava.lang.Iterableだが、SHadoop由来の暗黙変換でそのままfoleLeft関数(/:が呼べるようだ。


def run(args: Array[String]) : Int = {
  val c = MapReduceTaskChain.init() -->
  IO.Text[LongWritable, Text](args(0)).input                    -->
  MapReduceTask.MapReduceTask(TokenizerMap1, SumReducer)        -->
  MapReduceTask.MapReduceTask(FlipKeyValueMap, WordListReducer) -->
  IO.Text[LongWritable, Text](args(1)).output;
  c.execute();
  return 0;
}

WordCountのジョブ(MapReduceTask)を実行した後に何か別のジョブを実行している例。
jobのsetほにゃららを全く書かなくていいのは楽だなぁ。


Scala実験へ戻る / Scala目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま