S-JIS[2011-07-21/2011-07-26] 変更履歴

Spark0.3サンプル

Spark0.3(RDD)のサンプル。


概要

Sparkでのプログラミングは、Scalaのコレクションの関数の記述と似ている。

ScalaのコレクションではRangeやList等のインスタンスを作ってそれに対してmapやfilter関数を呼び出すが、
SparkではまずSparkContextというオブジェクトを作り、そこからコレクションっぽいオブジェクトやファイルを読み込むイテレーターを生成する。これらはRDDという名前のクラスで表されており、RDDにmapやfilterといった関数が定義されている。


import spark._
import SparkContext._

SparkContextは「spark.SparkContext」なので、spark配下のクラスをインポートする。
RDD関連は暗黙変換を利用しているので、SparkContextオブジェクトのメソッドもインポートしておく必要がある。


object Sample {
  def main(args: Array[String]) {
    val master = "local"
    val name = "sample"
    val spark = new SparkContext(master ,name)

    println("並列性:" + spark.defaultParallelism);
〜
  }
}

master(SparkContextのコンストラクターの第1引数)に「local」を指定すると、ローカル(単独環境)での実行となる。
local」は「local[*]」と等価で、括弧内の数値はスレッドプールの個数(並列性)を表している。
local[n]以外を指定した場合はMesosのマスターの指定という扱いになる。この場合はMesosのライブラリー(JNI)が必要となるので、Windowsでは動かせない。


サンプル

Spark0.3のプログラムをローカルの単独環境(Windows)で実行してみたサンプル。

備考
import scala.math.random
val master = "local[2]"
val slices = 2
val range = 100000

val spark = new SparkContext(master, "SparkPi")

var count = spark.accumulator(0)
for (i <- spark.parallelize(1 to range, slices)) {
  val x = random * 2 - 1
  val y = random * 2 - 1
  if (x * x + y * y < 1) count += 1
}
println("Pi is roughly " + 4d * count.value / range)
Sparkのexamplesの中にある円周率算出プログラムをちょっと変えたもの。
グローバル変数に当たるcountを、spark.accumulator(0)によって作り出している。

parallelizeはコレクションを並列処理させる為のメソッドらしい。
第1引数にコレクション、第2引数に分割数を指定する。
masterに「local[2]」を指定しているのでスレッドプール数は2、slicesが2なのでスレッド2つで並列実行しそうな感じがするが
タスクマネージャで見てみると2つのCPUを使ってCPU使用率100%にはなるものの、実行速度は1スレッドの場合とほとんど差が無い…。
val count = spark.parallelize(1 to range, slices).map { _ =>
  val x = random * 2 - 1
  val y = random * 2 - 1
  if (x * x + y * y < 1) 1 else 0
}.reduce { _ + _ }
println("Pi is roughly " + 4d * count / range)
Spark Examplesで公開されている円周率算出プログラムとほぼ同じ。
グローバル変数が無くなったので、通常のコレクションを使ったプログラミングとそっくり。
今のところRDDにはsumが無いようなので、reduceで合算処理をしている。
val count = spark.parallelize(1 to range, slices).flatMap { _ =>
  val x = random * 2 - 1
  val y = random * 2 - 1
  if (x * x + y * y < 1) Some(1) else None
}.count
println("Pi is roughly " + 4d * count / range)
flatMapを使って円の範囲内になった時の個数をカウントする方法にしてみたもの。
ただ、この方法の方が実行時間は遅かった(苦笑)
val count = spark.parallelize(1 to range, slices).filter { _ =>
  val x = random * 2 - 1
  val y = random * 2 - 1
  x * x + y * y < 1
}.count
println("Pi is roughly " + 4d * count / range)
よく考えたら、flatMapで1個か0個の値を返してカウントするなら、filterを使えばよかった(爆)
val master = "local[2]"

val spark = new SparkContext(master, "SparkWordCount")

val file = spark.textFile("C:/temp/a.txt")
val counts = file.flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)

counts.foreach { t =>
  val (word, count) = t
  println(word + "\t" + count)
}
Spark Examplesで公開されているWordCount。

spark.textFile()で読み込むファイルを指定する。(UTF-8として読み込むっぽい)
これは内部でHadoopFileInputFormat.setInputPaths()を呼び出してパス指定しているので、hdfsや「file://」も指定可能。
また、ワイルドカードで複数ファイルを指定することも出来る。例:「file://C:/temp/a*.txt
val master = "local[2]"
〜
counts.saveAsTextFile("C:/temp/wc")
saveAsTextFile()で結果をファイルに出力できる。[/2011-07-26]
ただし指定するのはファイルでなくディレクトリーで、スレッドプール数に応じた個数のファイルが作られる。
saveAsTextFileの詳細

WindowsでsaveAsTextFile

RDDのsaveAsTextFile()で結果をファイルに出力できる。[2011-07-26]
しかしWindowsでそのまま実行すると、内部で(Hadoopが)chmodを呼んでいるので(そんなものはWindowsには無いから)落ちる。

val master = "local[2]"

val spark = new SparkContext(master, "SparkWordCount")

val file = spark.textFile("C:/temp/a.txt")
val counts = file.flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)

counts.saveAsTextFile("C:/temp/wc")

この場合、Eclipseの実行時の環境でCygwinのパスを指定してやるとよい。つまりCygwinのchmodが呼ばれるようになる。

  1. Eclipseのメニューバーの「Run」→「Run Configrations」で実行用の設定ダイアログを開く。
  2. 実行する対象を選ぶ。
  3. Environmentタブを選択する。
  4. 「New」ボタンで環境変数を登録する。
    Name PATH  
    Value C:\cygwin\bin Cygwinをインストールした場所。C:\cygwin\binの下にchmod.exeがあるはず。

※もしくは、普通にWindowsの環境変数PATHにC:\cygwin\binが含まれていればよい。


ちなみに、saveAsTextFile()で指定するのはファイル名ではなくディレクトリー名となる。
このディレクトリーの下に、part-00000・part-00001…といったファイルがいくつか作られる。
このファイル数は、分散処理した並列数、すなわちローカル環境においては「local[n]」で指定したスレッドプールの個数になるようだ。
(要するにMapReduceのMapperやReducerの個数だけファイルが作られるのと同じことだろうから、処理の内容によってはファイル数は減るかも)

なので、データ数が少なければ(処理対象データの無いスレッドがあって)空のファイルが作られることになる。

なお、出力の際に以前のファイルが消されることは無い。
つまり並列数を減らして実行すると、(以前の)余分なファイルが残ることになる。


Spark目次へ戻る / Scalaへ戻る / 技術メモへ戻る
メールの送信先:ひしだま