S-JIS[2014-09-01] 変更履歴

Spark Hive Tables

Apache SparkでHiveQLを実行する方法のメモ。


概要

Spark(Spark SQL)では、Apache Hiveを操作するHiveQL(HQL)を実行することが出来る。

Sparkをインストールすればそのまま使える。(他に特に何かをインストールする必要は無い)
(自分でビルドしてインストールする場合は、Hiveを有効にする必要があるっぽい)


Hiveを操作する場合、HiveContextSchemaRDDクラスを使う。
(SchemaRDDはHive専用のRDDではなく、Spark SQLで使っているのと同じクラス)

import org.apache.spark._
import org.apache.spark.sql.hive._
  val sc: SparkContext = 
  val hiveContext = new HiveContext(sc)
  import hiveContext._

HiveContextインスタンスはSparkContextを使って生成する。
HiveContextには通常のRDDからSchemaRDDへ変換する暗黙変換メソッド 等が定義されているので、インポートしておく。


HiveQLの例

Hiveは「/user/hive」の下にテーブル定義(メタデータ)を作るので、事前にディレクトリーを作って書き込み権限を設定しておく必要がある。

$ su -
# mkdir -p /user/hive
# chmod 777 /user/hive
# exit

Spark ShellHiveQLを実行してみる。

import org.apache.spark._
import org.apache.spark.sql.hive._
val hiveContext = new HiveContext(sc)
hiveContext.hql("CREATE TABLE IF NOT EXISTS example1 (key INT, value STRING)")
hiveContext.hql("LOAD DATA LOCAL INPATH '/tmp/hive/kv.txt' INTO TABLE example1")

val rdd = hiveContext.hql("SELECT key, value FROM example1") // SchemaRDD
rdd.collect().foreach(println)

HiveContextインスタンスを作ると、そのhiveqlメソッド(もしくはhqlメソッド)を使ってHiveQLが実行できる。

LOAD DATA文でファイルを読み込んでHiveのテーブルに入れることが出来る。
ただしこのファイルはフィールドが0x01で区切られている必要があるので注意。(UNIX上なら「sed 's/\t/\x01/g' kv.tsv > kv.txt」でタブ文字から変換できる)

hqlメソッドを呼び出すとSchemaRDDが返ってくる。SchemaRDDはRDD[Row]でもあるので、Rowというクラスで個々のデータを扱うことになる。
SchemaRDDを扱う方法については、Spark SQLと同様。


SchemaRDDの統合言語クエリーも使用することが出来る。

val rdd = hiveContext.hql("select * from example1")

import hiveContext._

val rdd2 = rdd.where('key <= 500).select('value) // 統合言語クエリー
rdd2.collect().foreach(println)

SchemaRDDのスキーマ(カラムの順序やデータ型)を確認するには、printSchemaメソッドが便利。

    rdd.printSchema()

↓実行結果

root
 |-- key: IntegerType
 |-- value: StringType

HiveContext

HiveContextは、SparkでHiveを操作する為の基本的なクラス。

HiveContextインスタンスは、SparkContextインスタンスを使って生成する。
インスタンス生成後にHiveContextのメソッドをインポートしておくと便利。(createSchemaRDDメソッド(RDDからSchemaRDDへの暗黙変換メソッド)等が使えるようになる)

import org.apache.spark.sql.hive.HiveContext
  val hiveContext = new HiveContext(sc)
  import hiveContext._

HiveContextはSQLContextクラスを継承している。
また、HiveContextを継承したLocalHiveContextというクラスもある。


HiveContextのメソッド

HiveContextの主なメソッド。

メソッド 説明
コーディング 実行結果
def hiveql(hqlQuery: String): SchemaRDD HiveQLを実行する。 val rdd = hiveContext.hiveql("SELECT * FROM example1")  
def hql(hqlQuery: String): SchemaRDD hiveqlメソッドと同じ。    
createTable[A <: Product](
  tableName: String,
  allowExisting: Boolean = true
)
Productクラス(ケースクラスとか)を元にHiveテーブルを作成する。
CREATE TABLE文と同様)
case class Person(name: String, age: Int)
hiveContext.createTable[Person]("person")
 

HiveContextはSQLContextを継承しているので、SQLContextのメソッドも使用できる。


Spark目次へ戻る / Scalaへ戻る / 技術メモへ戻る
メールの送信先:ひしだま