S-JIS[2017-01-14] 変更履歴
Apache Sparkで最初に使うSparkSessionクラスについて。
|
|
Dataset(Spark1.6以降)(あるいはDataFrame(Spark1.3〜1.5))を使う為には、最初にSparkSessionインスタンスを生成する必要がある。
import org.apache.spark.sql.SparkSession
パッケージがspark.sqlなのがちょっと気になるけど^^;、元々SQLを扱う目的で始まったらしい。
SparkSessionインスタンスの生成には、SparkSessionのビルダーを使用する。
val spark = SparkSession.builder().getOrCreate()
(SparkSessionの内部ではSparkContext(やSparkConf)が生成されて保持される)
この場合、マスターURLやアプリケーション名はspark-submitによって指定されたものが使われる。
マスターURLやアプリケーション名等をプログラム内で指定するには以下のようにする。
val spark = SparkSession.builder() .master("local") .appName("example") .config("キー", "値") .getOrCreate()
SparkContextに何かセットしたい場合は、SparkSessionからSparkContextを取得する。
val sc = spark.sparkContext sc.setCheckpointDir("/tmp/spark/checkpoint")
SparkSessionインスタンスを生成すると、HTTPサーバーとしての機能が起動する。
ので、終了するときはstopメソッドを呼び出す。
val spark = 〜 try { 〜 } finally { spark.stop() }
stopすることで、各executorも終了する。
このstopメソッドは、内部でSparkContextのstopを呼び出している。
Spark2.1ではcloseメソッドもあるが、stopを呼び出しているだけ。
SparkSessionを利用する暗黙変換メソッドが色々あるので、インポートしておくと便利。
val spark = SparkSession.builder().〜.getOrCreate() import spark.implicits._
Scalaでは、インスタンス内のobjectのメソッドをインポートすることが出来る。上記の「spark.implicits
」は、spark変数(で保持されているインスタンス)のimplicitsオブジェクトを指している。
主に以下のことが出来るようになる。
implicitsオブジェクトはSpark2.0.0ではSQLImplicitsクラスを継承しており、実際の暗黙変換メソッドはそちらで定義されている。
class SparkSession { 〜 object implicits extends SQLImplicits with Serializable { protected override def _sqlContext: SQLContext = SparkSession.this.sqlContext } 〜 }
SparkSessionの主なメソッド。
メソッド | ver | 説明 | 例 | 実行結果 |
---|---|---|---|---|
builder ():
Builder |
2.0.0 | SparkSessionインスタンスを生成する為のビルダーを取得する。 | val spark = SparkSession.builder().getOrCreate() |
|
implicits |
2.0.0 | 暗黙変換メソッドが定義されているオブジェクト。 | →使用例 | |
version :
String |
2.0.0 | バージョンを取得する。 | println(spark.version) |
2.1.0 |
sqlContext :
SQLContext |
2.0.0 | SQLContextを取得する。 | ||
conf :
RuntimeConfig |
2.0.0 | |||
udf :
UDFRegistration |
2.0.0 | |||
catalog :
Catalog |
2.0.0 | |||
newSession ():
SparkSession |
2.0.0 | 新しいSparkSessionを生成する。 |
メソッド | ver | 説明 | 例 |
---|---|---|---|
stop(): Unit |
2.0.0 | Sparkアプリケーションの実行を終了する。 各ワーカーノード上に起動されているexecutorも終了する。 |
spark.stop() |
close (): Unit |
2.1.0 | stopと同じ。(JavaのCloseableの実装) | |
time(f: => T): T |
2.1.0 | 処理を実行し、その実行時間を表示する。(テストやデバッグ用) |
メソッド | ver | 説明 | 例 | |
---|---|---|---|---|
コーディング | 実行結果 | |||
emptyDataFrame :
DataFrame |
2.0.0 | 空のDataFrameを生成する。 | val df = spark.emptyDataFrame |
|
createDataFrame (data:
Seq[A]): DataFrame |
2.0.0 | SeqからDataFrameを生成する。 →SeqのtoDF() |
val df = spark.createDataFrame(Seq(Person("hoge",
20))) |
|
table(tableName: String):
DataFrame |
2.0.0 | |||
sql(sqlText: String): DataFrame |
2.0.0 | |||
read: DataFrameReader |
2.0.0 | ファイルからデータを読むのに使う。 →ファイルを読み込む例 |
||
readStream:
DataStreamReader |
2.0.0 |
メソッド | ver | 説明 | 例 | |
---|---|---|---|---|
コーディング | 実行結果 | |||
emptyDataset :
Dataset[T] |
2.0.0 | 空のDatasetを生成する。 (TはEncoderに対応している必要がある。spark.implicitsをインポートしてあれば、case classはEncoderとして扱える) |
val ds = spark.emptyDataset[Person] |
|
createDataset (data:
Seq[T]): Dataset[T] |
2.0.0 |
SeqからDatasetを生成する。 (TTはEncoderに対応している必要がある。spark.implicitsをインポートしてあれば、case classはEncoderとして扱える) →SeqのtoDS() |
val ds = spark.createDataset(Seq(Person("hoge",
20))) |
|
range (end:
Long): Dataset[Long] |
2.0.0 | 0〜end-1の数値列を生成する。 | val ds = spark.range(4) |
0 1 2 3 |
range(start: Long, end: Long): Dataset[Long] |
2.0.0 | start〜end-1の数値列を生成する。 | val ds = spark.range(1, 4) |
1 2 3 |
range(start: Long, end: Long, step: Long):
Dataset[Long] |
2.0.0 | start〜end-1(step刻み)の数値列を生成する。 | val ds = spark.range(1, 5, 2) |
1 3 |
range(start: Long, end: Long, step: Long,
numPartitions: Int): Dataset[Long] |
2.0.0 |