S-JIS[2017-01-14] 変更履歴

SparkSession

Apache Sparkで最初に使うSparkSessionクラスについて。


概要

Dataset(Spark1.6以降)(あるいはDataFrame(Spark1.3〜1.5))を使う為には、最初にSparkSessionインスタンスを生成する必要がある。

import org.apache.spark.sql.SparkSession

パッケージがspark.sqlなのがちょっと気になるけど^^;、元々SQLを扱う目的で始まったらしい。


SparkSessionの生成

SparkSessionインスタンスの生成には、SparkSessionのビルダーを使用する。

    val spark = SparkSession.builder().getOrCreate()

(SparkSessionの内部ではSparkContext(やSparkConf)が生成されて保持される)

この場合、マスターURLやアプリケーション名はspark-submitによって指定されたものが使われる。


マスターURLやアプリケーション名等をプログラム内で指定するには以下のようにする。

    val spark = SparkSession.builder()
      .master("local")
      .appName("example")
      .config("キー", "値")
      .getOrCreate()

SparkContextに何かセットしたい場合は、SparkSessionからSparkContextを取得する。

    val sc = spark.sparkContext
    sc.setCheckpointDir("/tmp/spark/checkpoint")

SparkSessionインスタンスを生成すると、HTTPサーバーとしての機能が起動する。
ので、終了するときはstopメソッドを呼び出す。

    val spark = 〜
    try {
      〜
    } finally {
      spark.stop()
    }

stopすることで、各executorも終了する。

このstopメソッドは、内部でSparkContextのstopを呼び出している。
Spark2.1ではcloseメソッドもあるが、stopを呼び出しているだけ。


SparkSessionの暗黙変換メソッド

SparkSessionを利用する暗黙変換メソッドが色々あるので、インポートしておくと便利。

     val spark = SparkSession.builder().〜.getOrCreate()
     import spark.implicits._

Scalaでは、インスタンス内のobjectのメソッドをインポートすることが出来る。上記の「spark.implicits」は、spark変数(で保持されているインスタンス)のimplicitsオブジェクトを指している。

主に以下のことが出来るようになる。


implicitsオブジェクトはSpark2.0.0ではSQLImplicitsクラスを継承しており、実際の暗黙変換メソッドはそちらで定義されている。

class SparkSession {
〜
  object implicits extends SQLImplicits with Serializable {
    protected override def _sqlContext: SQLContext = SparkSession.this.sqlContext
  }
〜
}

SparkSessionのメソッド

SparkSessionの主なメソッド。

設定系

メソッド ver 説明 実行結果
builder(): Builder 2.0.0 SparkSessionインスタンスを生成する為のビルダーを取得する。 val spark = SparkSession.builder().getOrCreate()  
implicits 2.0.0 暗黙変換メソッドが定義されているオブジェクト。 使用例  
version: String 2.0.0 バージョンを取得する。 println(spark.version) 2.1.0
sqlContext: SQLContext 2.0.0 SQLContextを取得する。    
conf: RuntimeConfig 2.0.0      
udf: UDFRegistration 2.0.0      
catalog: Catalog 2.0.0      
newSession(): SparkSession 2.0.0 新しいSparkSessionを生成する。    

実行系

メソッド ver 説明
stop(): Unit 2.0.0 Sparkアプリケーションの実行を終了する。
各ワーカーノード上に起動されているexecutorも終了する。
spark.stop()
close(): Unit 2.1.0 stopと同じ。(JavaのCloseableの実装)  
time(f: => T): T 2.1.0 処理を実行し、その実行時間を表示する。(テストやデバッグ用)  

DataFrame系

メソッド ver 説明
コーディング 実行結果
emptyDataFrame: DataFrame 2.0.0 空のDataFrameを生成する。 val df = spark.emptyDataFrame  
createDataFrame(data: Seq[A]): DataFrame 2.0.0 SeqからDataFrameを生成する。
SeqのtoDF()
val df = spark.createDataFrame(Seq(Person("hoge", 20)))  
table(tableName: String): DataFrame 2.0.0      
sql(sqlText: String): DataFrame 2.0.0      
read: DataFrameReader 2.0.0 ファイルからデータを読むのに使う。
ファイルを読み込む例
   
readStream: DataStreamReader 2.0.0    

Dataset系

メソッド ver 説明
コーディング 実行結果
emptyDataset: Dataset[T] 2.0.0 空のDatasetを生成する。
(TはEncoderに対応している必要がある。spark.implicitsをインポートしてあれば、case classはEncoderとして扱える)
val ds = spark.emptyDataset[Person]  
createDataset(data: Seq[T]): Dataset[T] 2.0.0 SeqからDatasetを生成する。
(TTはEncoderに対応している必要がある。spark.implicitsをインポートしてあれば、case classはEncoderとして扱える)
SeqのtoDS()
val ds = spark.createDataset(Seq(Person("hoge", 20)))  
range(end: Long): Dataset[Long] 2.0.0 0〜end-1の数値列を生成する。 val ds = spark.range(4) 0 1 2 3
range(start: Long, end: Long): Dataset[Long] 2.0.0 start〜end-1の数値列を生成する。 val ds = spark.range(1, 4) 1 2 3
range(start: Long, end: Long, step: Long): Dataset[Long] 2.0.0 start〜end-1(step刻み)の数値列を生成する。 val ds = spark.range(1, 5, 2) 1 3
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long] 2.0.0      

Spark目次へ戻る / Scalaへ戻る / 技術メモへ戻る
メールの送信先:ひしだま