S-JIS[2014-08-25/2014-09-02] 変更履歴
Apache SparkをSQLで操作するSpark SQLについて。
|
|
Spark SQLは、Spark(RDD)の操作をSQL(SELECT文)で行う為のクラス群。
Sparkをインストールすればそのまま使える。(他に特に何かをインストールする必要は無い)
Spark SQLを使う場合、SQLContextとSchemaRDDクラスを使う。
import org.apache.spark._ import org.apache.spark.sql._
val sc: SparkContext = 〜 val sqlContext = new SQLContext(sc) import sqlContext._
SQLContextインスタンスはSparkContextを使って生成する。
SQLContextには通常のRDDからSchemaRDDへ変換する暗黙変換メソッドが定義されているので、インポートしておく。
SchemaRDDはRDD[Row]、すなわちRowの一覧を保持するRDDである。
Spark SQLは、通常のSpark(RDD)と違って、細かい最適化を行ってくれる。[2014-09-02]
例えば結果が常に一定になる条件判定は除去されるとか。
→org.apache.spark.sql.catalyst.optimizer.Optimizerを参照(@ueshinさんがコントリビュートしている!)
import org.apache.spark._ import org.apache.spark.sql._
case class Person( name: String, age: Int )
object SparkSQLExample { val sc = new SparkContext("local[2]", "sql-example") val sqlContext = new SQLContext(sc) import sqlContext._
def main(args: Array[String]): Unit = { val rdd = sc.makeRDD(Seq(Person("a", 16), Person("b", 17), Person("c", 18))) rdd.registerAsTable("person") // RDDにテーブル名を付ける(RDDをテーブルとして登録する) val rdd2 = sql("select name, age from person where age>=17") // SchemaRDD println(rdd2.collect().toSeq) } }
通常のRDDに対し、registerAsTableメソッドでテーブル名を付ける。(上記の例では「person」というテーブル名)
これで、sqlContext.sql
メソッドを使ってSQL文が発行できる。
上記の例では「import sqlContext._
」でsqlContextのメソッドをインポートしているので、sqlContext変数を省略して「sql」だけでメソッドが呼び出せる。
sqlメソッドを呼び出すとSchemaRDDが返ってくる。SchemaRDDはRDD[Row]でもあるので、Rowというクラスで個々のデータを扱うことになる。
Rowから別の型(クラス)へ変換したい場合は、自前で変換ロジックを書く必要がある。
Rowからはカラム名で値を取得する方法は無く、SELECT文で抽出対象として並べたカラムの順序によるインデックス(番号)で指定する。
// RowからPersonへの変換 val rdd3 = rdd2.map(row => Person(row.getString(0), row.getInt(1)))
別の方法として、以下のように書くことも出来る。
val rdd3 = rdd2.map { case Row(name: String, age: Int) => Person(name, age) }
スキーマ(カラムの順序やデータ型)を確認するには、printSchemaメソッドが便利。
rdd2.printSchema()
↓実行結果
root |-- name: StringType |-- age: IntegerType
SQLContextは、Spark SQL用の基本的なクラス。
SQLContextインスタンスは、SparkContextインスタンスを使って生成する。
インスタンス生成後にSQLContextのメソッドをインポートしておくと便利。(createSchemaRDDメソッド(RDDからSchemaRDDへの暗黙変換メソッド)等が使えるようになる)
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc) import sqlContext._
SQLContextの主なメソッド。
メソッド | 説明 | 例 | |
---|---|---|---|
コーディング | 実行結果 | ||
createSchemaRDD[A <: Product](rdd: RDD[A]): SchemaRDD |
RDDからSchemaRDDへの暗黙変換。 A(RDDが保持するデータの型)はProductを継承している必要がある。 (ケースクラスはProductを継承している) |
val rdd2 = sqlContext.createSchemaRDD(rdd) |
|
parquetFile(path: String): SchemaRDD |
Parquetファイルを読み込む。 →SchemaRdd.saveAsParquetFile |
val rdd = sqlContext.parquetFile("/tmp/parquet1") |
|
jsonFile(path:
String): SchemaRDD |
Jsonファイルを読み込む。 | ||
jsonRDD(json:
RDD[String]): SchemaRDD |
Json文字列の入ったRDDからSchemaRDDを生成する。 | ||
registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit |
SchemaRDDをテーブルとして登録する。 (普通はSchemaRDDのregisterAsTableを使う) |
||
sql(sqlText:
String): SchemaRDD |
SQL文からSchemaRDDを生成する。 | val rdd = sqlContext.sql("select * from hoge") |
|
table(tableName:
String): SchemaRDD |
テーブル名を指定し、登録されているRDDを取得する。 | val rdd = sqlContext.table("hoge") |
|
cacheTable(tableName:
String): Unit |
テーブル名を指定し、登録されているRDDをメモリーにキャッシュする。 | sqlContext.cacheTable("hoge") |
|
uncacheTable(tableName: String): Unit |
キャッシュを解除する。 | sqlContext.uncacheTable("hoge") |
|
isCached(tableName:
String): Boolean |
テーブルがキャッシュされているかどうかを返す。 | println(sqlContext.isCached("hoge")) |
false |
Rowは、Spark SQLで一行分のデータを保持するのに使われるトレイト。
(SchemaRDDはRDD[Row]を継承したクラス、すなわちRowを保持するRDDである)
import org.apache.spark.sql.Row
このRowはorg.apache.spark.sql.Rowというtypeとして定義されているが、実体はorg.apache.spark.sql.catalyst.expressions.Rowトレイトである。
(Scalaのソースとしては、org.apache.sparkパッケージのsqlパッケージオブジェクトにRowというtypeが定義されている)
Rowはカラム名などは持っておらず、SELECT文で抽出対象として記述されたカラムの順序に従って値を保持している。
(SchemaRDDのqueryExecutor経由でカラム名一覧を取得することは出来る)
したがって、Rowに“カラムのインデックス(番号)”を指定して各カラムの値を取得する。
case class Person(name: String, age: Int) val rdd1 = sc.makeRDD(Seq(Person("a", 16), Person("b", 17), Person("c", 18))) rdd1.registerAsTable("person") val rdd2 = sqlContext.sql("select age, name from person") val rdd3 = rdd2.map(row => "%d:%s".format(row.getInt(0), row.getString(1)))
また、RowトレイトのコンパニオンオブジェクトであるRowオブジェクトにunapplySeqメソッドが定義されているので、Scalaのパターンマッチ機能(PartialFunction)を使って以下のように書くことも出来る。
(ついでに言うと、unapplySeqはSeqを返す必要があるメソッドだが、RowはSeq[Any]を継承しているので、unapplySeq内でSeq[Any]へ変換するコストは無い)
val rdd3 = rdd2.map { case Row(age: Int, name: String) => "%d:%s".format(age, name) }
このパターンマッチは、Rowに対し、0番がInt、1番がStringのときだけマッチする。(マッチしなかった場合は例外が発生する)
あくまで順番に依存し、その順番の値の型がマッチすることをチェックしている。
つまり、ここで指定する変数名は、カラム名とは無関係である。(コーディング規約的には、カラム名と合わせておいた方が分かり易いと思うけど)
val rdd3 = rdd2.map { case Row(a: Int, n: String) => "%d:%s".format(a, n) } // 変数名がカラム名と合っている必要は無い
なお、SELECT文で抽出カラムを増減させた場合は、パターンマッチの引数も合わせて増減させる必要がある。
メソッド | 備考 | 例 | ||
---|---|---|---|---|
名称 | 引数 | 戻り型 | コーディング | |
apply |
i: Int |
Any |
Row内のiの位置の値を返す。 (applyメソッドなので、呼び出し時に「apply」を省略できる) |
row.apply(0) |
isNullAt |
i: Int |
Boolean |
Row内のiの位置がnullかどうかを返す。 | row.isNullAt(0) |
getInt |
i: Int |
Int |
Row内のiの位置の値を返す。 | row.getInt(0) |
getLong |
i: Int |
Long |
Row内のiの位置の値を返す。 | row.getLong(0) |
getFloat |
i: Int |
Float |
Row内のiの位置の値を返す。 | row.getFloat(0) |
getDouble |
i: Int |
Double |
Row内のiの位置の値を返す。 | row.getDouble(0) |
getBoolean |
i: Int |
Boolean |
Row内のiの位置の値を返す。 | row.getBoolean(0) |
getByte |
i: Int |
Byte |
Row内のiの位置の値を返す。 | row.getByte(0) |
getShort |
i: Int |
Short |
Row内のiの位置の値を返す。 | row.getShort(0) |
getString |
i: Int |
String |
Row内のiの位置の値を返す。 | row.getString(0) |
copy |
Row |
値をコピーしたRowを返す。 | row.copy() |
|
anyNull |
Boolean |
Row内のどれか1つでもnullだったらtrueを返す。 | row.anyNull |
SchemaRDDはSpark SQL用のRDD。
SchemaRDDはRDD[Row]を継承したクラス、すなわちRowを保持するRDDである。
SQLのクエリー(文字列でSQLを書く)の場合、SQLContextのsqlメソッドでSchemaRDDを生成する。
その他に、SchemaRDDのSQLっぽいメソッドでSchemaRDDを構築する方法がある。→統合言語クエリー
Apache Hiveを扱いたい場合は、HiveContextのhqlメソッドでSchemaRDDを生成する。[2014-09-01]
(統合言語クエリー以外の)SchemaRDDの主なメソッド。
メソッド | 備考 | 例 | ||||
---|---|---|---|---|---|---|
名称 | 型 | 引数 | 戻り型 | コーディング | 結果 | |
registerAsTable |
tableName: String |
Unit |
SchemaRDDをSparkのテーブルとして登録する。 これで、SQL文の中からテーブルとして扱えるようになる。 (内部ではSQLContextのregisterRDDAsTableメソッドを呼んでいる) |
rdd.registerAsTable("person") |
||
saveAsParquetFile |
path: String |
Unit |
Parquetファイルとして保存する。 指定するのはディレクトリーのパス。その中に複数のファイルが作られる。 →SQLContext.parquetFile |
rdd.saveAsParquetFile("/tmp/parquet1") |
||
saveAsTable |
tableName: String |
Unit |
Hiveテーブルとして保存する。 Hiveテーブルが既に存在しているとエラーになる。 HiveContextから作ったRDDのみ有効。[2014-09-01] |
val rdd = hiveContext.hql("select * from example1") |
||
insertInto |
tableName: String |
Unit |
Hiveテーブルにデータを追加する。 Hiveテーブルが存在していないとエラーになる。 overwriteがtrueだと既存データを削除してから追加する。 HiveContextから作ったRDDのみ有効。[2014-09-01] |
val rdd = hiveContext.hql("select * from example1") |
||
schemaString |
String |
スキーマの情報を文字列にして返す。 | case class Person(name: String, age: Int) |
root |
||
printSchema |
Unit |
スキーマの情報をコンソールに表示する。 | rdd2.printSchema() |
root |
||
queryExecution |
QueryExecution |
クエリー実行情報(?)を返す。 | val nameList = rdd2.queryExecution.analyzed.output.map(_.name) |
List(name, age) |
Spark SQLには、統合言語クエリー(Language Integrated Queries)という、SQLに似たメソッドでSchemaRDDを構築していく方法がある。
(.NETのLINQみたいなもの?)
import org.apache.spark._ import org.apache.spark.sql._
case class Person(name: String, age: Int)
object SparkSQLExample { val sc = new SparkContext("local[2]", "sql-example") val sqlContext = new SQLContext(sc) import sqlContext._
def main(args: Array[String]): Unit = { val rdd1 = sc.makeRDD(Seq(Person("a", 16), Person("b", 17), Person("c", 18))) // rdd1.registerAsTable("person") // val rdd2 = sql("select name, age from person where age>=17") val rdd2 = rdd1.where('age >= 17).select('name, 'age) println(rdd2.collect().toSeq) } }
要するに、filterメソッドやmapメソッドの代わりにwhereメソッドやselectメソッドを使うような感じ。
先に呼んだメソッドから処理されるので、通常のSQL文とは異なり、selectより先にwhereを書く。
例えば「rdd1.select('name).where('age >= 17)
」と書くと、selectでnameだけ抽出されるので、whereでageが見つからなくてエラーになる。
統合言語クエリー関連のメソッドのほとんどはExpressionを引数に取るが、Symbol(シングルクォートを先頭に付けた文字列)を使うとExpressionに暗黙変換してくれる。
(これらの暗黙変換メソッドはSQLContext(がミックスインしているExpressionConversionsトレイト)で定義されているので、sqlContextのメソッドをインポートしておく必要がある)
また、ダブルクォーテーションで囲った文字列にattrを付けるとExpression(のサブクラスであるAttribute)に変換してくれる。
(例:「"name".attr
」。ピリオドはSymbolに含められないので、ピリオドの入った名前(テーブル名.カラム名
)を指定したい場合に便利。まぁ、「Symbol("テーブル名.カラム名")
」でも大丈夫なんだけど^^;)
Expressionの演算メソッドも定義されており、四則演算+,-,*,/,%
や比較演算<,<=,>=,>,===,!==
や論理演算&&,||
等が用意されている。(等しいことを確認する演算子は「=」が3つであることに注意)
(例:「rdd1.where('age === 16).select('name, 'age + 1)
」)
これらのメソッドはorg.apache.spark.sql.catalystパッケージのdslパッケージオブジェクトで定義されている。
メソッド | 備考 | 例 | |||||
---|---|---|---|---|---|---|---|
クラス | 名称 | 型 | 引数 | 戻り型 | コーディング | 結果 | |
SchemaRDD | select |
exprs: Expression* |
SchemaRDD |
カラムを抽出する。 | rdd1.select('name) |
[a], [b], [c] |
|
SchemaRDD | where |
condition: Expression |
SchemaRDD |
条件で抽出する。 | rdd1.where('age === 17) |
[b,17] |
|
rdd1.where('name like "a%") |
[a,16] |
||||||
rdd1.where(('age <= 16) || ('age >= 18)) |
[a,16], [c,18] |
||||||
SchemaRDD | where |
[T1] |
arg1: Symbol |
SchemaRDD |
rdd1.where('age)((age: Int) => age == 17) |
[b,17] |
|
SchemaRDD | where |
dynamicUdf: (DynamicRow) => Boolean |
SchemaRDD |
rdd1.where(_.getInt(1) == 17) |
[b,17] |
||
SchemaRDD | join |
otherPlan: SchemaRDD |
SchemaRDD |
結合する。 | case class Age(age1: Int) |
[a,16,16], [c,18,18] |
|
SchemaRDD | orderBy |
sortExprs: SortOrder* |
SchemaRDD |
ソートする。 | rdd1.orderBy('name.desc) |
[c,18], [b,17], [a,16] |
|
SchemaRDD | limit |
limitNum: Int |
SchemaRDD |
指定件数だけ抽出する。 | rdd1.limit(2) |
[a,16], [b,17] |
|
SchemaRDD | limit |
limitExpr: Expression |
SchemaRDD |
1.1.0で非推奨。 | |||
SchemaRDD | groupBy |
groupingExprs: Expression* |
SchemaRDD |
グループ化する。 | (rdd1: SchemaRDD).groupBy('name)('name,
sum('age) as 'age) |
[a,16], [c,18], [b,17] |
|
SchemaRDD | aggregate |
aggregateExprs: Expression* |
SchemaRDD |
集約する。 グループキー無しのgroupByと同じ。 |
(rdd1: SchemaRDD).aggregate(sum('age) as
'age) |
[51] |
|
SchemaRDD | as |
alias: Symbol |
SchemaRDD |
テーブル名を付ける。 (ここで付けた名前は、SQL文として使う際のテーブル名とは無関係(SQL文で使うことは出来ない)) |
val rdd3 = rdd1.where('name
=== "a").as('x) |
[a,16,a,16] |
|
SchemaRDD | unionAll |
otherPlan: SchemaRDD |
SchemaRDD |
他のSchemaRDDと統合する。 | val rdd3 = sc.makeRDD(Seq(Person("d",
16))) |
[a,16], [b,17], [c,18], [d,16] |
|
SchemaRDD | sample |
withReplacement: Boolean = true |
SchemaRDD |
||||
SchemaRDD | generate |
generator: Generator |
SchemaRDD |