S-JIS[2014-08-17/2014-09-15] 変更履歴

SparkContext

Apache Sparkで最初に使うSparkContextクラスについて。


概要

org.apache.spark.SparkContextは、Sparkで操作を行うための主たる入り口。
最初にdriverでSparkContextのインスタンスを作り、そこからRDDを生成することになる。
ちなみに、SparkContextインスタンスの変数名は、SparkシェルだとscSpark Examplesだとsparkになっている。

SparkContextをインポートする際は、org.apache.sparkパッケージに他に色々なクラスがあるので、丸ごとインポートするのが良い。
また、SparkContextに定義されているメソッド(主に暗黙変換用)もインポートしておく。

import org.apache.spark._
import org.apache.spark.SparkContext._

SparkContextの生成

SparkContextインスタンスを生成するには、SparkConfに設定値を書き、それを渡す。

  val conf = new SparkConf().setMaster("local").setAppName("example")
  val sc = new SparkContext(conf)

最低限masterとappNameは必要だが、逆にそれ以外を設定しないのであれば、以下のように簡略化できる。
(内部ではSparkConfが生成される)

  val sc = new SparkContext("local", "example")

SparkContextインスタンスを生成すると、HTTPサーバーとしての機能が起動する。
ので、終了するときはstopメソッドを呼び出す。

  sc.stop

stopすることで、各executorも終了する。


SparkConf

SparkConfは、Sparkのコンフィグレーションを保持するクラス。

セッターメソッド(setで始まるメソッド)で値をセットする。
これらのメソッドは自分自身のインスタンスを返すので、メソッドチェーンで連続してメソッドを呼び出せる。

SparkConfの内部ではHashMap[String, String]でキーと値を保持している。
デフォルトでは、実行時のVM引数に「spark.」で始まるプロパティーが定義されていれば、それを読み込む。
例えば実行時のVM引数に「-Dspark.master=local」と書いておけば、setMaster("local")を指定したのと同じになる。

  val conf = new SparkConf()	// new SparkConf(true)と同じ
  val conf = new SparkConf(true)	// VM引数を読み込む
  val conf = new SparkConf(false)	// VM引数を読み込まない
プロパティー名 メソッド 説明 プロパティーの例 メソッドの例
spark.master setMaster(master: String) Sparkのマスターノードの接続URLを指定する。
masterに指定できる値
-Dspark.master=local[2] setMaster("local[2]")
spark.app.name setAppName(name: String) アプリケーション名を指定する。
Spark web UIから見るのに使われる。
-Dspark.app.name=example setAppName("example")
spark.jars setJars(jars: Seq[String]) jarファイルを指定する。 -Dspark.jars=a.jar,b.jar setJars(Seq("a.jar", "b.jar"))
spark.executorEnv setExecutorEnv(variable: String, value: String)
setExecutorEnv(variables: Seq[(String, String)]
  -Dspark.executorEnv.VAR1=value1 setExecutorEnv("VAR1", "value1")
setExecutorEnv(Seq(("VAR1", "value1")))
spark.home setSparkHome(home: String) SPARK_HOMEを指定する。 -Dspark.home=D:/spark setSparkHome("D:/spark")
spark.logConf   trueにすると、SparkContext生成時にSparkConfの内容をログ出力する。 -Dspark.logConf=true set("spark.logConf", "true")
spark.files   executorのワークディレクトリーに配布されるファイル一覧。    
spark.hadoop   Hadoopの設定を指定する。
「spark.hadoop.VAR1」と指定すると、HadoopのコンフィグレーションのVAR1に設定される。
-Dspark.hadoop.mapred.reduce.tasks=1  
  set(key: String, value: String) 値を設定する。
keyやvalueがnullだと例外が発生する。
   
  setAll(settings: Traversable[(String, String)]) 値を設定する(上書きする)。    
  setIfMissing(key: String, value: String) まだ設定が無い場合だけ値を設定する。    
  remove(key: String) 設定を削除する。    

使用できるプロパティーは、Spark ConfigurationAvailable Propertiesに載っている。[2014-09-15]


マスターURL

マスターURL(SparkConfmasterに指定する値)により、Sparkアプリケーションをどの環境で動かすかが決まる。[2014-09-03]
(Spark1.0.2では、SparkContextオブジェクトのcreateTaskSchedulerメソッド内で値のチェックをしている)

説明
local ローカルモードで稼動する。
スレッド数は1、最大失敗回数も1。
local
local[スレッド数] ローカルモードで稼動する。
スレッド数に「*」を指定するとCPUのコア数になる。
最大失敗回数は1。
local[2]
local[*]
local[スレッド数, 最大失敗回数] ローカルモードで稼動する。
“タスク実行に失敗した回数”が“最大失敗回数”以上になるとジョブが異常終了する。
local[2, 3]
local-cluster[スレーブ数, コア数, メモリーサイズ]

ローカルクラスター(擬似分散モード)で稼動する。
コア数・メモリーサイズ[MB]は各スレーブ(executor)毎の値。

ローカルクラスターは、単体テスト用にクラスターをエミュレートする目的で存在しているらしい。

ローカルクラスターで動かす場合は、SPARK_HOME(Sparkをインストールした場所)の下にworkディレクトリーが作れる権限が必要になる。
(あらかじめworkディレクトリーを作っておいて、chmod 777やchownしておくのも可)

local-cluster[2, 2, 1024]
spark://ホスト:ポート Spark Standalone Clusterで稼動する。→Spark Standalone Mode
ホスト名・ポート番号はマスターサーバーのものを指定する。
ポート番号のデフォルトは7077だが、省略不可。
spark://192.168.1.4:7077
yarn-cluster YARNで稼動する。→Running Spark on YARN
リソースマネージャーの場所はHadoopのconfigurationから取得される(ので、マスターURLにIPアドレス等は指定しない)。
driverはYARNに管理されたアプリケーションマスター内で実行される。
yarn-cluster
yarn-standalone 1.0で非推奨。→yarn-cluster  
yarn-client YARNで稼動する。→Running Spark on YARN
リソースマネージャーの場所はHadoopのconfigurationから取得される(ので、マスターURLにIPアドレス等は指定しない)。
driverもクライアントプロセスとして実行される。(アプリケーションマスターはYARNからリソースを取得する為だけに使われる)
yarn-client
mesos://〜 Mesosで稼動する。→Running Spark on Mesos mesos://host:5050
simr://〜 HadoopのMapReduceで稼動する。(Spark In MapReduce)
Hadoop2系ならYARNで動かせるが、Hadoop1系で動かす為にあるらしい?
 

SparkContextのメソッド

SparkContextの主なメソッド。

設定系

メソッド 説明
getConf: SparkConf SparkConfを取得する。
これはクローンなので、書き換えても元の設定には影響しない。
val conf = sc.getConf
jars: Seq[String] SparkConfで指定されたjarファイル一覧を返す。  
files: Seq[String] SparkConfで指定されたexetutorのワークディレクトリーに配布されるファイル一覧を返す。  
master: String SparkConfで指定されたマスター接続URLを返す。  
appName: String SparkConfで指定されたアプリケーション名を返す。  
isLocal: Boolean ローカルモード(masterがlocal)かどうかを返す。  
hadoopConfiguration: Configuration Hadoopのコンフィグレーションを返す。  
startTime: Long Sparkの開始時刻(≒SparkContextを生成した時刻)を返す。  
sparkUser: String Sparkを実行しているユーザーを返す。
(環境変数SPARK_USERか、システムプロパティーuser.nameの値。
 どちらも無い場合は「<unknown>」になる)
 
setLocalProperty(key: String, value: String): Unit ローカルプロパティーを設定する。  
getLocalProperty(key: String): String ローカルプロパティーを取得する。無い場合はnullが返る。  
setJobGroup(
  groupId: String,
  description: String,
  interruptOnCancel: Boolean = false
): Unit
   
clearJobGroup(): Unit    
version: String Sparkのバージョンを返す。  
getExecutorMemoryStatus: Map[String, (Long, Long)] スレーブノードの使用可能メモリーサイズを返す。  
getSchedulingMode: SchedulingMode タスクスケジューラーのスケジューリングモードを返す。  
addJar(path: String): Unit jarファイルを追加する。  
setCheckpointDir(directory: String) チェックポイントで使用するディレクトリー(HDFSのパス)を指定する。  
getCheckpointDir: Option[String] チェックポイントディレクトリーを返す。  
defaultParallelism: Int デフォルトの並列数を返す。  
defaultMinPartitions: Int デフォルトの最小分割数(デフォルト並列数と同じ、ただし最低は2)を返す。  

実行系

メソッド 説明
stop() Sparkアプリケーションの実行を終了する。[2014-09-14]
各ワーカーノード上に起動されているexecutorも終了する。
sc.stop()
submitJob[T, U, R](
  rdd: RDD[T],
  processPartition: Iterator[T] => U,
  partitions: Seq[Int],
  resultHandler: (Int, U) => Unit,
  resultFunc: => R
): SimpleFutureAction[R]
ジョブを登録する。[2014-09-14]
RDDから使われる。
 

RDD系

メソッド 説明
コーディング 実行結果
parallelize[T](
  seq: Seq[T],
  numSlices: Int = defaultParallelism
): RDD[T]
Scalaの普通のコレクションであるSeqからRDDを生成する。
要素の型Tはシリアライズできる必要がある。[2014-09-15]
ケースクラスJava標準のシリアライズが可能なので使用できる)
   
makeRDD[T](
  seq: Seq[T],
  numSlices: Int = defaultParallelism
): RDD[T]
parallelizeメソッドと同じ。 val rdd = sc.makeRDD(Seq("a", "b", "c"))
rdd.collect.foreach(println)
a
b
c
makeRDD[T](
  seq: Seq[(T, Seq[String])]
): RDD[T]
     
textFile(
  path: String,
  minPartitions: Int = defaultMinPartitions
): RDD[String]
テキストファイルの行毎のRDDを生成する。
Hadoopがサポートしているファイルシステム(HDFSやローカルファイル等)のパスを指定することが出来る。
ワイルドカードを使って複数ファイルを指定することも出来る。
(全ファイルの行数分のレコードが作られる)

実際の読み込みは各executorで行われるので、ローカルパスを指定する場合は、全ノードにファイルが置かれている必要がある。
SparkContext.addFile()
val lines = sc.textFile("/tmp/text/a.txt")
lines.collect.foreach(println)
a
aa
aaa
val lines = sc.textFile("/tmp/text/*.txt")
lines.collect.foreach(println)
a
aa
aaa
b
bb
bbb
wholeTextFiles(
  path: String,
  minPartitions: Int = defaultMinPartitions
): RDD[(String, String)]
ディレクトリーを指定し、その中の全ファイルのRDDを生成する。
このRDDの値は、ファイル名ファイル内容全体のペア(タプル)。
(ファイル数分のレコードが作られる)
val files = sc.wholeTextFiles("/tmp/text/")
files.collect.foreach(println)
(/tmp/text/a.txt,a
aa
aaa)
(/tmp/text/b.txt,b
bb
bbb)
hadoopFile[K, V](
  path: String,
  inputFormatClass: Class[_ <: InputFormat[K, V]],
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]
HadoopのInputFormat(旧API)を使ってファイルを読み込むRDDを生成する。    
newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
  path: String,
  fClass: Class[F],
  kClass: Class[K],
  vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)]
HadoopのInputFormat(新API)を使ってファイルを読み込むRDDを生成する。    
sequenceFile[K, V](
  path: String,
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]
HadoopのSequenceFileを読み込むRDDを生成する。    
objectFile[T](
  path: String,
  minPartitions: Int = defaultMinPartitions
): RDD[T]
キーがNullWritable、値がByteWritableであるHadoopのSequenceFileを読み込むRDDを生成する。
このRDDの値は、ByteWritableをTに変換したもの。
   
union[T](rdds: Seq[RDD[T]]): RDD[T]
union[T](first: RDD[T], rest: RDD[T]*): RDD[T]
複数のRDDを結合して1つのRDDにする。
RDD.union
val rdd1 = sc.makeRDD(Seq("a", "b", "c"))
val rdd2 = sc.makeRDD(Seq("A", "B", "C"))
val rdd = sc.union(rdd1, rdd2)
rdd.collect.foreach(println)
a
b
c
A
B
C
emptyRDD[T] 空のRDDを生成する。 val rdd = sc.emptyRDD[String]
println(rdd.count)
0
getPersistentRDDs: Map[Int, RDD[_]] cacheメソッドが呼ばれて永続化されたRDDの一覧を返す。    

共有系

メソッド 説明
accumulator[T](initialValue: T)
(implicit param: AccumulatorParam[T])
: Accumulator[T]
アキュムレーターを生成する。
デフォルトではInt,Long,Float,Doubleが使用できる。
val n = sc.accumulator(0) //初期値0
val rdd = sc.makeRDD(Seq("a", "bc", "def"))
rdd.foreach {
  n += _.length  //n.valueに対する加算
}
println(n.value)
accumulable[T, R](initialValue: T)
(implicit param: AccumulableParam[T, R])
: Accumulable[T, R]
Accumulableを生成する。
AccumulableParamを自分で実装する必要がある。
 
accumulableCollection[R, T](initialValue: R)
: Accumulable[R, T]
Growableトレイトをミックスインしているコレクション(ArrayBufferやListBuffer) に値を追加するAccumulableを生成する。 val ac = sc.accumulableCollection(scala.collection.mutable.ArrayBuffer.empty[String])
val rdd = sc.makeRDD(Seq("a", "bc", "def"))
rdd.foreach(ac += _)
println(ac.value)
broadcast[T](value: T): Broadcast[T] ブロードキャストを生成する。 val n = sc.broadcast(2)
val rdd = sc.makeRDD(Seq("a", "bc", "def"))
rdd.filter(_.length >= n.value).foreach(println)
addFile(path: String): Unit 指定されたファイル(driverのマシン上にあるローカルファイルも可)を各ノードに転送し、各executorからローカルファイルとして読めるようにする。
addFile()にはフルパスでファイルを指定する。
読み込む為にはSparkFiles.get()でファイル名のみを指定し、転送されたパス(絶対パス)を取得する。
(SparkFiles.get()はFileクラスを使ってローカルの絶対パスを取得する。Windowsではドライブ名が入るが、いざファイルを読む段になるとスキーマ名に誤認されて例外が発生する)
sc.addFile("/tmp/text/a.txt")
// import org.apache.spark.SparkFiles
val rdd = sc.textFile(SparkFiles.get("a.txt"))
rdd.foreach(println)

HDFSのローカリティー

HDFSからファイルを読み込む場合、ファイルのローカリティーが考慮される。[2014-09-04]

つまり、HDFSのファイルのブロックに応じたパーティションが作られる。→RDD.preferredLocations
また、そのブロックを読み込む処理も、(なるべく)そのブロックのあるマシン上で実行される。


共有変数

Sparkでは、各executorに定数を転送したり、各executorで集計した値をdriverで受け取ったりする機能がある。[2014-08-21]

SparkはScalaの関数(クロージャー)を使って処理を記述するので、関数の外側で定義した変数を関数内で使うことは出来る(ようになっている)が、
実行はexecutor(分散した各ノード上)で行われるので、そこで変数に対して行われた変更はdriver側には反映されない。
当然、executor間で参照し合うことも出来ない。

そのため、driverとexecutorとの間で値を共有するための仕組みがSparkには用意されている。


ブロードキャスト

ブロードキャスト変数(broadcast variables)は、driverで定義した定数(固定値)を各executorに転送する為の変数。[2014-08-21]

SparkはScalaの関数(クロージャー)を使って処理を記述するので、ブロードキャストを使わなくても定数を使用することは出来るのだが。

Scalaの定数を使った例
val rdd = sc.makeRDD(Seq(123, 456, 789))
val CONSTANT = 123
val filter = rdd.filter(_ != CONSTANT)
filter.foreach(println)
Sparkのブロードキャストを使った例
val rdd = sc.makeRDD(Seq(123, 456, 789))
val CONSTANT = sc.broadcast(123)
val filter = rdd.filter(_ != CONSTANT.value)
filter.foreach(println)

Scalaの関数で定数を使用している場合、関数を転送する度に値の転送も発生することになる。
ブロードキャストを使うと、定数の内容は1度だけ各executorに転送される。

したがって、ある程度大きなサイズの定数(バイト列とかMapとか)を共有したい場合はブロードキャストを使う方が良い。
逆に小さいサイズの定数なら、ブロードキャストの方がオーバーヘッドが大きい可能性がある。

なお、ブロードキャストで転送する値は、ブロードキャスト生成後に変更してはならない。
例えば、後から新しいノードが追加になった場合に、そのノードに対してブロードキャストを転送することがある為。


アキュムレーター

アキュムレーター(accumulator)は、“追加”のみを行う変数。[2014-08-21]
driverでアキュムレーターを生成し、各executorでアキュムレーターに対して値の追加(加算・蓄積)を行い、driverでその結果(総計)を受け取ることが出来る。

アキュムレーターは、Hadoopのカウンターのようなもの。(Hadoopでは、各タスクでカウンターに値を加算していく)
ただし、Hadoopのカウンターの結果はアプリケーション内からは利用できない(Mapタスクで集計したカウンターをReduceタスクで読み込むことは出来ない)が、Sparkのアキュムレーターはアプリケーション内の後続処理に利用することが出来る。

  val rdd = sc.makeRDD(Seq(1, 2, 3))
  val sum = sc.accumulator(0)
  rdd.foreach(sum += _)
  println(sum.value)

アキュムレーターはvalueというフィールドを持っており、アキュムレーターの+=メソッドを使うと、valueに対して“追加(加算)”が実行される。


アキュムレーターはデフォルトではInt・Long・Float・Doubleでしか使えないが、自分でAccumulatorParamを実装すれば、どんな型でも扱える。
(Int・Long・Float・Doubleに関しては、暗黙のAccumulatorParamが用意されている)

import org.apache.spark.AccumulatorParam
object StringAccumulatorParam extends AccumulatorParam[String] {

  def zero(initialValue: String): String = ""

  def addInPlace(t1: String, t2: String): String = t1 + t2
}
  val rdd = sc.makeRDD(Seq(1, 2, 3))
  val s = sc.accumulator("0")(StringAccumulatorParam)
  rdd.foreach(s += _.toString)
  println(s.value)

※foreachメソッドでは処理順序は保証されないので、上記の結果が「0123」になるとは限らない。「0312」とかにもなりうる。

AccumulatorParamのオブジェクトをimplicit objectにしておけば、accumulatorメソッドでAccumulatorParamを指定する必要が無くなる。

implicit object StringAccumulatorParam extends AccumulatorParam[String] {

  def zero(initialValue: String): String = ""

  def addInPlace(t1: String, t2: String): String = t1 + t2
}
  val rdd = sc.makeRDD(Seq(1, 2, 3))
  val s = sc.accumulator("0")

Accumulableはアキュムレーターの汎用版で、保持する値の型と追加する値の型が異なっていてもよい。
(Accumulator[T]はAccumulable[T,T]である)

import org.apache.spark.AccumulableParam

Seq[String]の例

object SeqAccumulableParam extends AccumulableParam[Seq[String], String] {

  def zero(initialValue: Seq[String]): Seq[String] = Seq.empty

  def addInPlace(r1: Seq[String], r2: Seq[String]): Seq[String] = r1 ++ r2

  def addAccumulator(r: Seq[String], t: String): Seq[String] = r :+ t
}
  val rdd = sc.makeRDD(Seq("a", "b", "c"))
  val s = sc.accumulable(Seq.empty[String])(SeqAccumulableParam)
  rdd.foreach(s += _)
  println(s.value) //→List(b, c, a)

Map[K,V]の例

class MapAccumulableParam[K, V] extends AccumulableParam[Map[K, V], (K, V)] {

  def zero(initialValue: Map[K, V]): Map[K, V] = Map.empty

  def addInPlace(r1: Map[K, V], r2: Map[K, V]): Map[K, V] = r1 ++ r2

  def addAccumulator(r: Map[K, V], t: (K, V)): Map[K, V] = r + t
}
  val rdd = sc.makeRDD(Seq("a", "bc", "def"))
  val m = sc.accumulable(Map.empty[String, Int])(new MapAccumulableParam[String, Int])
  rdd.foreach(key => m += (key, key.length))
  println(m.value) //→Map(a -> 1, bc -> 2, def -> 3)

Mapでカウントする例

object MapCounterAccumulableParam extends AccumulableParam[scala.collection.mutable.Map[String, Int], String] {

  def zero(initialValue: scala.collection.mutable.Map[String, Int]) = scala.collection.mutable.Map.empty[String, Int]

  def addInPlace(r1: scala.collection.mutable.Map[String, Int], r2: scala.collection.mutable.Map[String, Int]): scala.collection.mutable.Map[String, Int] = {
    r2.foreach{ kv => add(r1, kv._1, kv._2) }
    r1
  }

  def addAccumulator(r: scala.collection.mutable.Map[String, Int], t: String): scala.collection.mutable.Map[String, Int] = {
    add(r, t, 1)
    r
  }

  private def add(r: scala.collection.mutable.Map[String, Int], key: String, value: Int): Unit = {
    r.put(key, r.getOrElse(key, 0) + value)
  }
}
  val rdd = sc.makeRDD(Seq("a", "b", "c", "a", "b", "a"))
  val m = sc.accumulable(scala.collection.mutable.Map.empty[String, Int])(MapCounterAccumulableParam)
  rdd.foreach(m += _)
  println(m.value) //→Map(b -> 2, a -> 3, c -> 1)

ArrayBuffer[String]の例

Growableトレイトをミックスインしているコレクション(mutableのArrayBufferやListBuffer)を使う場合はaccumulableCollectionメソッドが便利。

  val rdd = sc.makeRDD(Seq("a", "b", "c"))
  val ac = sc.accumulableCollection(scala.collection.mutable.ArrayBuffer.empty[String])
  rdd.foreach(ac += _)
  println(ac.value) //→ArrayBuffer(a, b, c)

Spark目次へ戻る / Scalaへ戻る / 技術メモへ戻る
メールの送信先:ひしだま