S-JIS[2014-08-17/2014-09-15] 変更履歴
Apache Sparkで最初に使うSparkContextクラスについて。
|
|
org.apache.spark.SparkContextは、Sparkで操作を行うための主たる入り口。
最初にdriverでSparkContextのインスタンスを作り、そこからRDDを生成することになる。
ちなみに、SparkContextインスタンスの変数名は、Sparkシェルだとsc
、Spark
Examplesだとspark
になっている。
SparkContextをインポートする際は、org.apache.sparkパッケージに他に色々なクラスがあるので、丸ごとインポートするのが良い。
また、SparkContextに定義されているメソッド(主に暗黙変換用)もインポートしておく。
import org.apache.spark._ import org.apache.spark.SparkContext._
SparkContextインスタンスを生成するには、SparkConfに設定値を書き、それを渡す。
val conf = new SparkConf().setMaster("local").setAppName("example") val sc = new SparkContext(conf)
最低限masterとappNameは必要だが、逆にそれ以外を設定しないのであれば、以下のように簡略化できる。
(内部ではSparkConfが生成される)
val sc = new SparkContext("local", "example")
SparkContextインスタンスを生成すると、HTTPサーバーとしての機能が起動する。
ので、終了するときはstopメソッドを呼び出す。
sc.stop
stopすることで、各executorも終了する。
SparkConfは、Sparkのコンフィグレーションを保持するクラス。
セッターメソッド(setで始まるメソッド)で値をセットする。
これらのメソッドは自分自身のインスタンスを返すので、メソッドチェーンで連続してメソッドを呼び出せる。
SparkConfの内部ではHashMap[String, String]でキーと値を保持している。
デフォルトでは、実行時のVM引数に「spark.
」で始まるプロパティーが定義されていれば、それを読み込む。
例えば実行時のVM引数に「-Dspark.master=local
」と書いておけば、setMaster("local")
を指定したのと同じになる。
val conf = new SparkConf() // new SparkConf(true)と同じ val conf = new SparkConf(true) // VM引数を読み込む val conf = new SparkConf(false) // VM引数を読み込まない
プロパティー名 | メソッド | 説明 | プロパティーの例 | メソッドの例 |
---|---|---|---|---|
spark.master |
setMaster(master: String) |
Sparkのマスターノードの接続URLを指定する。 →masterに指定できる値 |
-Dspark.master=local[2] |
setMaster("local[2]") |
spark.app.name |
setAppName(name: String) |
アプリケーション名を指定する。 Spark web UIから見るのに使われる。 |
-Dspark.app.name=example |
setAppName("example") |
spark.jars |
setJars(jars: Seq[String]) |
jarファイルを指定する。 | -Dspark.jars=a.jar,b.jar |
setJars(Seq("a.jar", "b.jar")) |
spark.executorEnv |
setExecutorEnv(variable: String, value: String) |
-Dspark.executorEnv.VAR1=value1 |
setExecutorEnv("VAR1", "value1") |
|
spark.home |
setSparkHome(home: String) |
SPARK_HOMEを指定する。 | -Dspark.home=D:/spark |
setSparkHome("D:/spark") |
spark.logConf |
trueにすると、SparkContext生成時にSparkConfの内容をログ出力する。 | -Dspark.logConf=true |
set("spark.logConf", "true") |
|
spark.files |
各executorのワークディレクトリーに配布されるファイル一覧。 | |||
spark.hadoop |
Hadoopの設定を指定する。 「spark.hadoop.VAR1」と指定すると、HadoopのコンフィグレーションのVAR1に設定される。 |
-Dspark.hadoop.mapred.reduce.tasks=1 |
||
set(key: String, value: String) |
値を設定する。 keyやvalueがnullだと例外が発生する。 |
|||
setAll(settings: Traversable[(String, String)]) |
値を設定する(上書きする)。 | |||
setIfMissing(key: String, value: String) |
まだ設定が無い場合だけ値を設定する。 | |||
remove(key: String) |
設定を削除する。 |
使用できるプロパティーは、Spark ConfigurationのAvailable Propertiesに載っている。[2014-09-15]
マスターURL(SparkConfのmasterに指定する値)により、Sparkアプリケーションをどの環境で動かすかが決まる。[2014-09-03]
(Spark1.0.2では、SparkContextオブジェクトのcreateTaskSchedulerメソッド内で値のチェックをしている)
値 | 説明 | 例 |
---|---|---|
local |
ローカルモードで稼動する。 スレッド数は1、最大失敗回数も1。 |
local |
local[スレッド数] |
ローカルモードで稼動する。 スレッド数に「*」を指定するとCPUのコア数になる。 最大失敗回数は1。 |
local[2] |
local[スレッド数, 最大失敗回数] |
ローカルモードで稼動する。 “タスク実行に失敗した回数”が“最大失敗回数”以上になるとジョブが異常終了する。 |
local[2, 3] |
local-cluster[スレーブ数, コア数, メモリーサイズ] |
ローカルクラスター(擬似分散モード)で稼動する。 ローカルクラスターは、単体テスト用にクラスターをエミュレートする目的で存在しているらしい。 ローカルクラスターで動かす場合は、SPARK_HOME(Sparkをインストールした場所)の下にworkディレクトリーが作れる権限が必要になる。 |
local-cluster[2, 2, 1024] |
spark://ホスト:ポート |
Spark Standalone Clusterで稼動する。→Spark
Standalone Mode ホスト名・ポート番号はマスターサーバーのものを指定する。 ポート番号のデフォルトは7077だが、省略不可。 |
spark://192.168.1.4:7077 |
yarn-cluster |
YARNで稼動する。→Running
Spark on YARN リソースマネージャーの場所はHadoopのconfigurationから取得される(ので、マスターURLにIPアドレス等は指定しない)。 driverはYARNに管理されたアプリケーションマスター内で実行される。 |
yarn-cluster |
yarn-standalone |
1.0で非推奨。→yarn-cluster | |
yarn-client |
YARNで稼動する。→Running
Spark on YARN リソースマネージャーの場所はHadoopのconfigurationから取得される(ので、マスターURLにIPアドレス等は指定しない)。 driverもクライアントプロセスとして実行される。(アプリケーションマスターはYARNからリソースを取得する為だけに使われる) |
yarn-client |
mesos://〜 |
Mesosで稼動する。→Running Spark on Mesos | mesos://host:5050 |
simr://〜 |
HadoopのMapReduceで稼動する。(Spark In MapReduce) Hadoop2系ならYARNで動かせるが、Hadoop1系で動かす為にあるらしい? |
SparkContextの主なメソッド。
メソッド | 説明 | 例 |
---|---|---|
getConf:
SparkConf |
SparkConfを取得する。 これはクローンなので、書き換えても元の設定には影響しない。 |
val conf = sc.getConf |
jars: Seq[String] |
SparkConfで指定されたjarファイル一覧を返す。 | |
files:
Seq[String] |
SparkConfで指定されたexetutorのワークディレクトリーに配布されるファイル一覧を返す。 | |
master: String |
SparkConfで指定されたマスター接続URLを返す。 | |
appName:
String |
SparkConfで指定されたアプリケーション名を返す。 | |
isLocal:
Boolean |
ローカルモード(masterがlocal)かどうかを返す。 | |
hadoopConfiguration: Configuration |
Hadoopのコンフィグレーションを返す。 | |
startTime:
Long |
Sparkの開始時刻(≒SparkContextを生成した時刻)を返す。 | |
sparkUser:
String |
Sparkを実行しているユーザーを返す。 (環境変数 SPARK_USER か、システムプロパティーuser.name の値。どちらも無い場合は「 <unknown> 」になる) |
|
setLocalProperty(key: String, value: String): Unit |
ローカルプロパティーを設定する。 | |
getLocalProperty(key: String): String |
ローカルプロパティーを取得する。無い場合はnullが返る。 | |
setJobGroup( |
||
clearJobGroup(): Unit |
||
version:
String |
Sparkのバージョンを返す。 | |
getExecutorMemoryStatus: Map[String, (Long, Long)] |
スレーブノードの使用可能メモリーサイズを返す。 | |
getSchedulingMode: SchedulingMode |
タスクスケジューラーのスケジューリングモードを返す。 | |
addJar(path:
String): Unit |
jarファイルを追加する。 | |
setCheckpointDir(directory: String) |
チェックポイントで使用するディレクトリー(HDFSのパス)を指定する。 | |
getCheckpointDir: Option[String] |
チェックポイントディレクトリーを返す。 | |
defaultParallelism: Int |
デフォルトの並列数を返す。 | |
defaultMinPartitions: Int |
デフォルトの最小分割数(デフォルト並列数と同じ、ただし最低は2)を返す。 |
メソッド | 説明 | 例 |
---|---|---|
stop() |
Sparkアプリケーションの実行を終了する。[2014-09-14] 各ワーカーノード上に起動されているexecutorも終了する。 |
sc.stop() |
submitJob[T,
U, R]( |
ジョブを登録する。[2014-09-14] RDDから使われる。 |
メソッド | 説明 | 例 | |
---|---|---|---|
コーディング | 実行結果 | ||
parallelize[T]( |
Scalaの普通のコレクションであるSeqからRDDを生成する。 要素の型Tはシリアライズできる必要がある。[2014-09-15] (ケースクラスはJava標準のシリアライズが可能なので使用できる) |
||
makeRDD[T]( |
parallelizeメソッドと同じ。 | val rdd = sc.makeRDD(Seq("a", "b", "c")) |
a |
makeRDD[T]( |
|||
textFile( |
テキストファイルの行毎のRDDを生成する。 Hadoopがサポートしているファイルシステム(HDFSやローカルファイル等)のパスを指定することが出来る。 ワイルドカードを使って複数ファイルを指定することも出来る。 (全ファイルの行数分のレコードが作られる) 実際の読み込みは各executorで行われるので、ローカルパスを指定する場合は、全ノードにファイルが置かれている必要がある。 →SparkContext.addFile() |
val lines = sc.textFile("/tmp/text/a.txt") |
a |
val lines = sc.textFile("/tmp/text/*.txt") |
a |
||
wholeTextFiles( |
ディレクトリーを指定し、その中の全ファイルのRDDを生成する。 このRDDの値は、ファイル名とファイル内容全体のペア(タプル)。 (ファイル数分のレコードが作られる) |
val files = sc.wholeTextFiles("/tmp/text/") |
(/tmp/text/a.txt,a |
hadoopFile[K,
V]( |
HadoopのInputFormat(旧API)を使ってファイルを読み込むRDDを生成する。 | ||
newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( |
HadoopのInputFormat(新API)を使ってファイルを読み込むRDDを生成する。 | ||
sequenceFile[K,
V]( |
HadoopのSequenceFileを読み込むRDDを生成する。 | ||
objectFile[T]( |
キーがNullWritable、値がByteWritableであるHadoopのSequenceFileを読み込むRDDを生成する。 このRDDの値は、ByteWritableをTに変換したもの。 |
||
union[T](rdds:
Seq[RDD[T]]): RDD[T] |
複数のRDDを結合して1つのRDDにする。 →RDD.union |
val rdd1 = sc.makeRDD(Seq("a",
"b", "c")) |
a |
emptyRDD[T] |
空のRDDを生成する。 | val rdd = sc.emptyRDD[String] |
0 |
getPersistentRDDs: Map[Int, RDD[_]] |
cacheメソッドが呼ばれて永続化されたRDDの一覧を返す。 |
メソッド | 説明 | 例 |
---|---|---|
accumulator[T](initialValue:
T) |
アキュムレーターを生成する。 デフォルトではInt,Long,Float,Doubleが使用できる。 |
val n = sc.accumulator(0) //初期値0 |
accumulable[T,
R](initialValue: T) |
Accumulableを生成する。 AccumulableParamを自分で実装する必要がある。 |
|
accumulableCollection[R, T](initialValue: R) |
Growableトレイトをミックスインしているコレクション(ArrayBufferやListBuffer) に値を追加するAccumulableを生成する。 | val ac = sc.accumulableCollection(scala.collection.mutable.ArrayBuffer.empty[String]) |
broadcast[T](value:
T): Broadcast[T] |
ブロードキャストを生成する。 | val n = sc.broadcast(2) |
addFile(path:
String): Unit |
指定されたファイル(driverのマシン上にあるローカルファイルも可)を各ノードに転送し、各executorからローカルファイルとして読めるようにする。 addFile()にはフルパスでファイルを指定する。 読み込む為にはSparkFiles.get()でファイル名のみを指定し、転送されたパス(絶対パス)を取得する。 (SparkFiles.get()はFileクラスを使ってローカルの絶対パスを取得する。Windowsではドライブ名が入るが、いざファイルを読む段になるとスキーマ名に誤認されて例外が発生する) |
sc.addFile("/tmp/text/a.txt") |
HDFSからファイルを読み込む場合、ファイルのローカリティーが考慮される。[2014-09-04]
つまり、HDFSのファイルのブロックに応じたパーティションが作られる。→RDD.preferredLocations
また、そのブロックを読み込む処理も、(なるべく)そのブロックのあるマシン上で実行される。
Sparkでは、各executorに定数を転送したり、各executorで集計した値をdriverで受け取ったりする機能がある。[2014-08-21]
SparkはScalaの関数(クロージャー)を使って処理を記述するので、関数の外側で定義した変数を関数内で使うことは出来る(ようになっている)が、
実行はexecutor(分散した各ノード上)で行われるので、そこで変数に対して行われた変更はdriver側には反映されない。
当然、executor間で参照し合うことも出来ない。
そのため、driverとexecutorとの間で値を共有するための仕組みがSparkには用意されている。
ブロードキャスト変数(broadcast variables)は、driverで定義した定数(固定値)を各executorに転送する為の変数。[2014-08-21]
SparkはScalaの関数(クロージャー)を使って処理を記述するので、ブロードキャストを使わなくても定数を使用することは出来るのだが。
Scalaの定数を使った例 |
val rdd = sc.makeRDD(Seq(123, 456, 789)) val CONSTANT = 123 val filter = rdd.filter(_ != CONSTANT) filter.foreach(println) |
Sparkのブロードキャストを使った例 |
val rdd = sc.makeRDD(Seq(123, 456, 789)) val CONSTANT = sc.broadcast(123) val filter = rdd.filter(_ != CONSTANT.value) filter.foreach(println) |
Scalaの関数で定数を使用している場合、関数を転送する度に値の転送も発生することになる。
ブロードキャストを使うと、定数の内容は1度だけ各executorに転送される。
したがって、ある程度大きなサイズの定数(バイト列とかMapとか)を共有したい場合はブロードキャストを使う方が良い。
逆に小さいサイズの定数なら、ブロードキャストの方がオーバーヘッドが大きい可能性がある。
なお、ブロードキャストで転送する値は、ブロードキャスト生成後に変更してはならない。
例えば、後から新しいノードが追加になった場合に、そのノードに対してブロードキャストを転送することがある為。
アキュムレーター(accumulator)は、“追加”のみを行う変数。[2014-08-21]
driverでアキュムレーターを生成し、各executorでアキュムレーターに対して値の追加(加算・蓄積)を行い、driverでその結果(総計)を受け取ることが出来る。
アキュムレーターは、Hadoopのカウンターのようなもの。(Hadoopでは、各タスクでカウンターに値を加算していく)
ただし、Hadoopのカウンターの結果はアプリケーション内からは利用できない(Mapタスクで集計したカウンターをReduceタスクで読み込むことは出来ない)が、Sparkのアキュムレーターはアプリケーション内の後続処理に利用することが出来る。
val rdd = sc.makeRDD(Seq(1, 2, 3)) val sum = sc.accumulator(0) rdd.foreach(sum += _) println(sum.value)
アキュムレーターはvalueというフィールドを持っており、アキュムレーターの+=メソッドを使うと、valueに対して“追加(加算)”が実行される。
アキュムレーターはデフォルトではInt・Long・Float・Doubleでしか使えないが、自分でAccumulatorParamを実装すれば、どんな型でも扱える。
(Int・Long・Float・Doubleに関しては、暗黙のAccumulatorParamが用意されている)
import org.apache.spark.AccumulatorParam
object StringAccumulatorParam extends AccumulatorParam[String] { def zero(initialValue: String): String = "" def addInPlace(t1: String, t2: String): String = t1 + t2 }
val rdd = sc.makeRDD(Seq(1, 2, 3)) val s = sc.accumulator("0")(StringAccumulatorParam) rdd.foreach(s += _.toString) println(s.value)
※foreachメソッドでは処理順序は保証されないので、上記の結果が「0123」になるとは限らない。「0312」とかにもなりうる。
AccumulatorParamのオブジェクトをimplicit objectにしておけば、accumulatorメソッドでAccumulatorParamを指定する必要が無くなる。
implicit object StringAccumulatorParam extends AccumulatorParam[String] { def zero(initialValue: String): String = "" def addInPlace(t1: String, t2: String): String = t1 + t2 }
val rdd = sc.makeRDD(Seq(1, 2, 3)) val s = sc.accumulator("0")
Accumulableはアキュムレーターの汎用版で、保持する値の型と追加する値の型が異なっていてもよい。
(Accumulator[T]はAccumulable[T,T]である)
import org.apache.spark.AccumulableParam
object SeqAccumulableParam extends AccumulableParam[Seq[String], String] { def zero(initialValue: Seq[String]): Seq[String] = Seq.empty def addInPlace(r1: Seq[String], r2: Seq[String]): Seq[String] = r1 ++ r2 def addAccumulator(r: Seq[String], t: String): Seq[String] = r :+ t }
val rdd = sc.makeRDD(Seq("a", "b", "c")) val s = sc.accumulable(Seq.empty[String])(SeqAccumulableParam) rdd.foreach(s += _) println(s.value) //→List(b, c, a)
class MapAccumulableParam[K, V] extends AccumulableParam[Map[K, V], (K, V)] { def zero(initialValue: Map[K, V]): Map[K, V] = Map.empty def addInPlace(r1: Map[K, V], r2: Map[K, V]): Map[K, V] = r1 ++ r2 def addAccumulator(r: Map[K, V], t: (K, V)): Map[K, V] = r + t }
val rdd = sc.makeRDD(Seq("a", "bc", "def")) val m = sc.accumulable(Map.empty[String, Int])(new MapAccumulableParam[String, Int]) rdd.foreach(key => m += (key, key.length)) println(m.value) //→Map(a -> 1, bc -> 2, def -> 3)
object MapCounterAccumulableParam extends AccumulableParam[scala.collection.mutable.Map[String, Int], String] { def zero(initialValue: scala.collection.mutable.Map[String, Int]) = scala.collection.mutable.Map.empty[String, Int] def addInPlace(r1: scala.collection.mutable.Map[String, Int], r2: scala.collection.mutable.Map[String, Int]): scala.collection.mutable.Map[String, Int] = { r2.foreach{ kv => add(r1, kv._1, kv._2) } r1 } def addAccumulator(r: scala.collection.mutable.Map[String, Int], t: String): scala.collection.mutable.Map[String, Int] = { add(r, t, 1) r } private def add(r: scala.collection.mutable.Map[String, Int], key: String, value: Int): Unit = { r.put(key, r.getOrElse(key, 0) + value) } }
val rdd = sc.makeRDD(Seq("a", "b", "c", "a", "b", "a")) val m = sc.accumulable(scala.collection.mutable.Map.empty[String, Int])(MapCounterAccumulableParam) rdd.foreach(m += _) println(m.value) //→Map(b -> 2, a -> 3, c -> 1)
Growableトレイトをミックスインしているコレクション(mutableのArrayBufferやListBuffer)を使う場合はaccumulableCollectionメソッドが便利。
val rdd = sc.makeRDD(Seq("a", "b", "c")) val ac = sc.accumulableCollection(scala.collection.mutable.ArrayBuffer.empty[String]) rdd.foreach(ac += _) println(ac.value) //→ArrayBuffer(a, b, c)