S-JIS[2014-09-01] 変更履歴
Apache SparkでHiveQLを実行する方法のメモ。
|
|
Spark(Spark SQL)では、Apache Hiveを操作するHiveQL(HQL)を実行することが出来る。
Sparkをインストールすればそのまま使える。(他に特に何かをインストールする必要は無い)
(自分でビルドしてインストールする場合は、Hiveを有効にする必要があるっぽい)
Hiveを操作する場合、HiveContextとSchemaRDDクラスを使う。
(SchemaRDDはHive専用のRDDではなく、Spark SQLで使っているのと同じクラス)
import org.apache.spark._ import org.apache.spark.sql.hive._
val sc: SparkContext = 〜 val hiveContext = new HiveContext(sc) import hiveContext._
HiveContextインスタンスはSparkContextを使って生成する。
HiveContextには通常のRDDからSchemaRDDへ変換する暗黙変換メソッド
等が定義されているので、インポートしておく。
Hiveは「/user/hive」の下にテーブル定義(メタデータ)を作るので、事前にディレクトリーを作って書き込み権限を設定しておく必要がある。
$ su - # mkdir -p /user/hive # chmod 777 /user/hive # exit
Spark ShellでHiveQLを実行してみる。
import org.apache.spark._ import org.apache.spark.sql.hive._
val hiveContext = new HiveContext(sc) hiveContext.hql("CREATE TABLE IF NOT EXISTS example1 (key INT, value STRING)") hiveContext.hql("LOAD DATA LOCAL INPATH '/tmp/hive/kv.txt' INTO TABLE example1") val rdd = hiveContext.hql("SELECT key, value FROM example1") // SchemaRDD rdd.collect().foreach(println)
HiveContextインスタンスを作ると、そのhiveqlメソッド(もしくはhqlメソッド)を使ってHiveQLが実行できる。
LOAD DATA文でファイルを読み込んでHiveのテーブルに入れることが出来る。
ただしこのファイルはフィールドが0x01で区切られている必要があるので注意。(UNIX上なら「sed 's/\t/\x01/g' kv.tsv >
kv.txt
」でタブ文字から変換できる)
hqlメソッドを呼び出すとSchemaRDDが返ってくる。SchemaRDDはRDD[Row]でもあるので、Rowというクラスで個々のデータを扱うことになる。
SchemaRDDを扱う方法については、Spark SQLと同様。
SchemaRDDの統合言語クエリーも使用することが出来る。
val rdd = hiveContext.hql("select * from example1") import hiveContext._ val rdd2 = rdd.where('key <= 500).select('value) // 統合言語クエリー rdd2.collect().foreach(println)
SchemaRDDのスキーマ(カラムの順序やデータ型)を確認するには、printSchemaメソッドが便利。
rdd.printSchema()
↓実行結果
root |-- key: IntegerType |-- value: StringType
HiveContextは、SparkでHiveを操作する為の基本的なクラス。
HiveContextインスタンスは、SparkContextインスタンスを使って生成する。
インスタンス生成後にHiveContextのメソッドをインポートしておくと便利。(createSchemaRDDメソッド(RDDからSchemaRDDへの暗黙変換メソッド)等が使えるようになる)
import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sc) import hiveContext._
HiveContextはSQLContextクラスを継承している。
また、HiveContextを継承したLocalHiveContextというクラスもある。
HiveContextの主なメソッド。
メソッド | 説明 | 例 | |
---|---|---|---|
コーディング | 実行結果 | ||
def hiveql(hqlQuery:
String): SchemaRDD |
HiveQLを実行する。 | val rdd = hiveContext.hiveql("SELECT *
FROM example1") |
|
def hql(hqlQuery:
String): SchemaRDD |
hiveqlメソッドと同じ。 | ||
createTable[A <: Product]( |
Productクラス(ケースクラスとか)を元にHiveテーブルを作成する。 (CREATE TABLE文と同様) |
case class Person(name: String, age: Int) |
HiveContextはSQLContextを継承しているので、SQLContextのメソッドも使用できる。