S-JIS[2014-08-25/2014-09-02] 変更履歴

Spark SQL

Apache SparkをSQLで操作するSpark SQLについて。


概要

Spark SQLは、SparkRDD)の操作をSQL(SELECT文)で行う為のクラス群。
Sparkをインストールすればそのまま使える。(他に特に何かをインストールする必要は無い)


Spark SQLを使う場合、SQLContextSchemaRDDクラスを使う。

import org.apache.spark._
import org.apache.spark.sql._
  val sc: SparkContext = 
  val sqlContext = new SQLContext(sc)
  import sqlContext._

SQLContextインスタンスはSparkContextを使って生成する。
SQLContextには通常のRDDからSchemaRDDへ変換する暗黙変換メソッドが定義されているので、インポートしておく。

SchemaRDDはRDD[Row]、すなわちRowの一覧を保持するRDDである。


Spark SQLは、通常のSpark(RDD)と違って、細かい最適化を行ってくれる。[2014-09-02]
例えば結果が常に一定になる条件判定は除去されるとか。
org.apache.spark.sql.catalyst.optimizer.Optimizerを参照(@ueshinさんがコントリビュートしている!)


Spark SQLの例

import org.apache.spark._
import org.apache.spark.sql._
case class Person(
  name: String,
  age: Int
)
object SparkSQLExample {

  val sc = new SparkContext("local[2]", "sql-example")
  val sqlContext = new SQLContext(sc)
  import sqlContext._
  def main(args: Array[String]): Unit = {
    val rdd = sc.makeRDD(Seq(Person("a", 16), Person("b", 17), Person("c", 18)))
    rdd.registerAsTable("person") // RDDにテーブル名を付ける(RDDをテーブルとして登録する)

    val rdd2 = sql("select name, age from person where age>=17") // SchemaRDD

    println(rdd2.collect().toSeq)
  }
}

通常のRDDに対し、registerAsTableメソッドでテーブル名を付ける。(上記の例では「person」というテーブル名)
これで、sqlContext.sqlメソッドを使ってSQL文が発行できる。
上記の例では「import sqlContext._」でsqlContextのメソッドをインポートしているので、sqlContext変数を省略して「sql」だけでメソッドが呼び出せる。

sqlメソッドを呼び出すとSchemaRDDが返ってくる。SchemaRDDはRDD[Row]でもあるので、Rowというクラスで個々のデータを扱うことになる。


Rowから別の型(クラス)へ変換したい場合は、自前で変換ロジックを書く必要がある。

Rowからはカラム名で値を取得する方法は無く、SELECT文で抽出対象として並べたカラムの順序によるインデックス(番号)で指定する。

    // RowからPersonへの変換
    val rdd3 = rdd2.map(row => Person(row.getString(0), row.getInt(1)))

別の方法として、以下のように書くことも出来る。

    val rdd3 = rdd2.map { case Row(name: String, age: Int) => Person(name, age) }

スキーマ(カラムの順序やデータ型)を確認するには、printSchemaメソッドが便利。

    rdd2.printSchema()

↓実行結果

root
 |-- name: StringType
 |-- age: IntegerType

統合言語クエリー


SQLContext

SQLContextは、Spark SQL用の基本的なクラス。

SQLContextインスタンスは、SparkContextインスタンスを使って生成する。
インスタンス生成後にSQLContextのメソッドをインポートしておくと便利。(createSchemaRDDメソッド(RDDからSchemaRDDへの暗黙変換メソッド)等が使えるようになる)

import org.apache.spark.sql.SQLContext
  val sqlContext = new SQLContext(sc)
  import sqlContext._

SQLContextのメソッド

SQLContextの主なメソッド。

メソッド 説明
コーディング 実行結果
createSchemaRDD[A <: Product](rdd: RDD[A]): SchemaRDD RDDからSchemaRDDへの暗黙変換
A(RDDが保持するデータの型)はProductを継承している必要がある。
ケースクラスはProductを継承している)
val rdd2 = sqlContext.createSchemaRDD(rdd)  
parquetFile(path: String): SchemaRDD Parquetファイルを読み込む。
SchemaRdd.saveAsParquetFile
val rdd = sqlContext.parquetFile("/tmp/parquet1")  
jsonFile(path: String): SchemaRDD Jsonファイルを読み込む。    
jsonRDD(json: RDD[String]): SchemaRDD Json文字列の入ったRDDからSchemaRDDを生成する。    
registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit SchemaRDDをテーブルとして登録する。
(普通はSchemaRDDのregisterAsTableを使う)
   
sql(sqlText: String): SchemaRDD SQL文からSchemaRDDを生成する。 val rdd = sqlContext.sql("select * from hoge")  
table(tableName: String): SchemaRDD テーブル名を指定し、登録されているRDDを取得する。 val rdd = sqlContext.table("hoge")  
cacheTable(tableName: String): Unit テーブル名を指定し、登録されているRDDをメモリーにキャッシュする。 sqlContext.cacheTable("hoge")  
uncacheTable(tableName: String): Unit キャッシュを解除する。 sqlContext.uncacheTable("hoge")  
isCached(tableName: String): Boolean テーブルがキャッシュされているかどうかを返す。 println(sqlContext.isCached("hoge")) false

Row

Rowは、Spark SQLで一行分のデータを保持するのに使われるトレイト。
SchemaRDDはRDD[Row]を継承したクラス、すなわちRowを保持するRDDである)

import org.apache.spark.sql.Row

このRowはorg.apache.spark.sql.Rowというtypeとして定義されているが、実体はorg.apache.spark.sql.catalyst.expressions.Rowトレイトである。
(Scalaのソースとしては、org.apache.sparkパッケージのsqlパッケージオブジェクトにRowというtypeが定義されている)


Rowはカラム名などは持っておらず、SELECT文で抽出対象として記述されたカラムの順序に従って値を保持している。
(SchemaRDDのqueryExecutor経由でカラム名一覧を取得することは出来る)
したがって、Rowに“カラムのインデックス(番号)”を指定して各カラムの値を取得する。

    case class Person(name: String, age: Int)
    val rdd1 = sc.makeRDD(Seq(Person("a", 16), Person("b", 17), Person("c", 18)))
    rdd1.registerAsTable("person")
    val rdd2 = sqlContext.sql("select age, name from person")
    val rdd3 = rdd2.map(row => "%d:%s".format(row.getInt(0), row.getString(1)))

また、RowトレイトのコンパニオンオブジェクトであるRowオブジェクトにunapplySeqメソッドが定義されているので、Scalaのパターンマッチ機能(PartialFunction)を使って以下のように書くことも出来る。
(ついでに言うと、unapplySeqはSeqを返す必要があるメソッドだが、RowはSeq[Any]を継承しているので、unapplySeq内でSeq[Any]へ変換するコストは無い)

    val rdd3 = rdd2.map { case Row(age: Int, name: String) => "%d:%s".format(age, name) }

このパターンマッチは、Rowに対し、0番がInt、1番がStringのときだけマッチする。(マッチしなかった場合は例外が発生する)
あくまで順番に依存し、その順番の値の型がマッチすることをチェックしている。 つまり、ここで指定する変数名は、カラム名とは無関係である。(コーディング規約的には、カラム名と合わせておいた方が分かり易いと思うけど)

    val rdd3 = rdd2.map { case Row(a: Int, n: String) => "%d:%s".format(a, n) } // 変数名がカラム名と合っている必要は無い

なお、SELECT文で抽出カラムを増減させた場合は、パターンマッチの引数も合わせて増減させる必要がある。


Rowのメソッド

メソッド 備考
名称 引数 戻り型 コーディング
apply i: Int Any Row内のiの位置の値を返す。
(applyメソッドなので、呼び出し時に「apply」を省略できる
row.apply(0)
row(0)
isNullAt i: Int Boolean Row内のiの位置がnullかどうかを返す。 row.isNullAt(0)
getInt i: Int Int Row内のiの位置の値を返す。 row.getInt(0)
getLong i: Int Long Row内のiの位置の値を返す。 row.getLong(0)
getFloat i: Int Float Row内のiの位置の値を返す。 row.getFloat(0)
getDouble i: Int Double Row内のiの位置の値を返す。 row.getDouble(0)
getBoolean i: Int Boolean Row内のiの位置の値を返す。 row.getBoolean(0)
getByte i: Int Byte Row内のiの位置の値を返す。 row.getByte(0)
getShort i: Int Short Row内のiの位置の値を返す。 row.getShort(0)
getString i: Int String Row内のiの位置の値を返す。 row.getString(0)
copy   Row 値をコピーしたRowを返す。 row.copy()
anyNull   Boolean Row内のどれか1つでもnullだったらtrueを返す。 row.anyNull

SchemaRDD

SchemaRDDはSpark SQL用のRDD
SchemaRDDはRDD[Row]を継承したクラス、すなわちRowを保持するRDDである。

SQLのクエリー(文字列でSQLを書く)の場合、SQLContextsqlメソッドでSchemaRDDを生成する。
その他に、SchemaRDDのSQLっぽいメソッドでSchemaRDDを構築する方法がある。→統合言語クエリー

Apache Hiveを扱いたい場合は、HiveContexthqlメソッドでSchemaRDDを生成する。[2014-09-01]


SchemaRDDのメソッド

統合言語クエリー以外の)SchemaRDDの主なメソッド。

メソッド 備考
名称 引数 戻り型 コーディング 結果
registerAsTable   tableName: String Unit SchemaRDDをSparkのテーブルとして登録する。
これで、SQL文の中からテーブルとして扱えるようになる。
(内部ではSQLContextのregisterRDDAsTableメソッドを呼んでいる)
rdd.registerAsTable("person")  
saveAsParquetFile   path: String Unit Parquetファイルとして保存する。
指定するのはディレクトリーのパス。その中に複数のファイルが作られる。
SQLContext.parquetFile
rdd.saveAsParquetFile("/tmp/parquet1")  
saveAsTable   tableName: String Unit Hiveテーブルとして保存する。
Hiveテーブルが既に存在しているとエラーになる。
HiveContextから作ったRDDのみ有効。[2014-09-01]
val rdd = hiveContext.hql("select * from example1")
rdd.saveAsTable("example2")
 
insertInto   tableName: String
overwrite: Boolean = false
Unit Hiveテーブルにデータを追加する。
Hiveテーブルが存在していないとエラーになる。
overwriteがtrueだと既存データを削除してから追加する。
HiveContextから作ったRDDのみ有効。[2014-09-01]
val rdd = hiveContext.hql("select * from example1")
rdd.insertInto("example2")
 
schemaString     String スキーマの情報を文字列にして返す。 case class Person(name: String, age: Int)
val rdd = sc.makeRDD(Seq(Person("a", 16), Person("b", 17), Person("c", 18)))
rdd.registerAsTable("person")
val rdd2 = sqlContext.sql("select * from person")
val schema = rdd2.schemaString
root
 |-- name: StringType
 |-- age: IntegerType
printSchema     Unit スキーマの情報をコンソールに表示する。 rdd2.printSchema() root
 |-- name: StringType
 |-- age: IntegerType
queryExecution     QueryExecution クエリー実行情報(?)を返す。 val nameList = rdd2.queryExecution.analyzed.output.map(_.name)
val nameIndex = nameList.zipWithIndex.toMap
List(name, age)
Map(name -> 0, age -> 1)

統合言語クエリー

Spark SQLには、統合言語クエリー(Language Integrated Queries)という、SQLに似たメソッドでSchemaRDDを構築していく方法がある。
(.NETのLINQみたいなもの?)

import org.apache.spark._
import org.apache.spark.sql._
case class Person(name: String, age: Int)
object SparkSQLExample {

  val sc = new SparkContext("local[2]", "sql-example")
  val sqlContext = new SQLContext(sc)
  import sqlContext._
  def main(args: Array[String]): Unit = {
    val rdd1 = sc.makeRDD(Seq(Person("a", 16), Person("b", 17), Person("c", 18)))

//  rdd1.registerAsTable("person")
//  val rdd2 = sql("select name, age from person where age>=17")

    val rdd2 = rdd1.where('age >= 17).select('name, 'age)

    println(rdd2.collect().toSeq)
  }
}

要するに、filterメソッドmapメソッドの代わりにwhereメソッドやselectメソッドを使うような感じ。

先に呼んだメソッドから処理されるので、通常のSQL文とは異なり、selectより先にwhereを書く。
例えば「rdd1.select('name).where('age >= 17)」と書くと、selectでnameだけ抽出されるので、whereでageが見つからなくてエラーになる。

統合言語クエリー関連のメソッドのほとんどはExpressionを引数に取るが、Symbol(シングルクォートを先頭に付けた文字列)を使うとExpressionに暗黙変換してくれる。
(これらの暗黙変換メソッドはSQLContext(がミックスインしているExpressionConversionsトレイト)で定義されているので、sqlContextのメソッドをインポートしておく必要がある)
また、ダブルクォーテーションで囲った文字列にattrを付けるとExpression(のサブクラスであるAttribute)に変換してくれる。
(例:「"name".attr」。ピリオドはSymbolに含められないので、ピリオドの入った名前(テーブル名.カラム名)を指定したい場合に便利。まぁ、「Symbol("テーブル名.カラム名")」でも大丈夫なんだけど^^;)

Expressionの演算メソッドも定義されており、四則演算+,-,*,/,%や比較演算<,<=,>=,>,===,!==や論理演算&&,||等が用意されている。(等しいことを確認する演算子は「=」が3つであることに注意)
(例:「rdd1.where('age === 16).select('name, 'age + 1)」)
これらのメソッドはorg.apache.spark.sql.catalystパッケージのdslパッケージオブジェクトで定義されている。

メソッド 備考
クラス 名称 引数 戻り型 コーディング 結果
SchemaRDD select   exprs: Expression* SchemaRDD カラムを抽出する。 rdd1.select('name) [a], [b], [c]
SchemaRDD where   condition: Expression SchemaRDD 条件で抽出する。 rdd1.where('age === 17) [b,17]
rdd1.where('name like "a%") [a,16]
rdd1.where(('age <= 16) || ('age >= 18)) [a,16], [c,18]
SchemaRDD where [T1] arg1: Symbol
udf: (T1) => Boolean
SchemaRDD rdd1.where('age)((age: Int) => age == 17)
rdd1.where[Int]('age)(_ == 17)
[b,17]
SchemaRDD where   dynamicUdf: (DynamicRow) => Boolean SchemaRDD rdd1.where(_.getInt(1) == 17)
rdd1.where(_.selectDynamic("age").toInt == 17)
[b,17]
SchemaRDD join   otherPlan: SchemaRDD
joinType: JoinType = Inner
on: Option[Expression] = None
SchemaRDD 結合する。 case class Age(age1: Int)
val rdd3 = sc.makeRDD(Seq(Age(16), Age(18)))
rdd1.join(rdd3, on = Option('age === 'age1))
[a,16,16], [c,18,18]
SchemaRDD orderBy   sortExprs: SortOrder* SchemaRDD ソートする。 rdd1.orderBy('name.desc) [c,18], [b,17], [a,16]
SchemaRDD limit   limitNum: Int SchemaRDD 指定件数だけ抽出する。 rdd1.limit(2) [a,16], [b,17]
SchemaRDD limit   limitExpr: Expression SchemaRDD 1.1.0で非推奨。    
SchemaRDD groupBy   groupingExprs: Expression*
aggregateExprs: Expression*
SchemaRDD グループ化する。 (rdd1: SchemaRDD).groupBy('name)('name, sum('age) as 'age) [a,16], [c,18], [b,17]
SchemaRDD aggregate   aggregateExprs: Expression* SchemaRDD 集約する。
グループキー無しのgroupByと同じ。
(rdd1: SchemaRDD).aggregate(sum('age) as 'age) [51]
SchemaRDD as   alias: Symbol SchemaRDD テーブル名を付ける。
(ここで付けた名前は、SQL文として使う際のテーブル名とは無関係(SQL文で使うことは出来ない))
val rdd3 = rdd1.where('name === "a").as('x)
val rdd4 = rdd1.where('age === 16).as('y)
rdd3.join(rdd4, on = Option("x.name".attr === "y.name".attr))
[a,16,a,16]
SchemaRDD unionAll   otherPlan: SchemaRDD SchemaRDD 他のSchemaRDDと統合する。 val rdd3 = sc.makeRDD(Seq(Person("d", 16)))
rdd1.unionAll(rdd3)
[a,16], [b,17], [c,18], [d,16]
SchemaRDD sample   withReplacement: Boolean = true
fraction: Double
seed: Long
SchemaRDD      
SchemaRDD generate   generator: Generator
join: Boolean = false
outer: Boolean = false
alias: Option[String] = None
SchemaRDD      

Spark目次へ戻る / Scalaへ戻る / 技術メモへ戻る
メールの送信先:ひしだま