S-JIS[2017-01-14/2017-01-22] 変更履歴
Spark2.1のサンプル。
|
|
|
Sparkでのプログラミングは、Scalaのコレクションの関数の記述と似ている。
ScalaのコレクションではRangeやList等のインスタンスを作ってそれに対してmapやfilter関数を呼び出すが、
SparkではまずSpark用のオブジェクトを作り、そこからコレクションっぽいオブジェクトやファイルを読み込むイテレーターを生成する。
当初のSparkではRDDというクラスを使ってプログラミングしていたが、
Spark1.3からDataFrameというクラスが導入され、
Spark1.6からDatasetというクラスが導入された。Spark2.0ではDatasetが正式となっている。
RDDよりもDataFrameの方が(最適化が効いて)高速だが型情報が消えてしまう為、Datasetを使うのが良い。(DataFrameはDatasetに統合されている)
参考: yubessyさんのApache Sparkの3つのAPI: RDD, DataFrameからDatasetへ
RDDを使ってコーディングする場合、最初にSparkContextを生成する。
import org.apache.spark.SparkContext case class Person(name: String, age: Long) object RddExample { def main(args: Array[String]): Unit = { val sc = new SparkContext("local", "rdd-example") val rdd = sc.makeRDD(Seq(Person("hoge", 20))) rdd.foreach(println) } }
DataFrameを使ってコーディングする場合、最初にSparkSessionを生成する。
import org.apache.spark.sql.SparkSession case class Person(name: String, age: Long) object DataFrameExample { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local") .appName("dataframe-example") .getOrCreate() try { import spark.implicits._ val df = Seq(Person("hoge", 20)).toDF() // df.show() df.foreach(row => println(row)) } finally { spark.stop() } } }
変数dfの型は「DataFrame
」となる。つまり、「Personのデータである」という情報は抜け落ちる。
Spark2ではDataFrameは「Dataset[Row]
」の別名になっており、Rowのデータとして扱う。
Datasetを使ってコーディングする場合、最初にSparkSessionを生成する。
import org.apache.spark.sql.SparkSession case class Person(name: String, age: Long) object DatasetExample { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local") .appName("dataset-example") .getOrCreate() try { import spark.implicits._ val ds = Seq(Person("hoge", 20)).toDS() // ds.show() ds.foreach(person => println(person)) } finally { spark.stop() } } }
masterに「local
」を指定すると、ローカル(単独環境)での実行となる。
「local
」は「local[*]
」と等価で、括弧内の数値はスレッドプールの個数(並列性)を表している。
local[n]以外はクラスターでの実行用。→マスターURL
変数dsの型は「Dataset[Person]
」となる。
Sparkアプリケーションはシリアライズ可能である必要がある。[2017-01-22]
Sparkアプリケーション(自分で作ったプログラム)はシリアライズされてexecutorに渡される部分がある為、その部分はシリアライズ可能である必要がある。
例えば以下のようなプログラムは、実行すると例外が発生する。
case class Person(name: String, age: Long)
object KeyValueGroupedExample { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() try { execute(spark) } finally { spark.stop() } } def execute(spark: SparkSession): Unit = { import spark.implicits._ val ds = Seq[Person](Person("zzz", 20), Person("zzz", 19), Person("abc", 20)).toDS() val c = new java.util.Comparator[Person] { def compare(x: Person, y: Person): Int = { x.age.compareTo(y.age) } } val ds2 = ds.groupByKey(_.name).reduceGroups((p, r) => if (c.compare(p, r) <= 0) p else r) ds2.map(_._2).show() } }
↓実行すると例外発生
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 〜 Caused by: java.io.NotSerializableException: com.example.KeyValueGroupedExample$$anon$1 Serialization stack: - object not serializable (class: com.example.KeyValueGroupedExample$$anon$1, value: com.example.KeyValueGroupedExample$$anon$1@33e434c8) - field (class: com.example.KeyValueGroupedExample$$anonfun$7, name: c$1, type: interface java.util.Comparator) 〜
「$anon$1」は関数に付けられているクラス名。これだけでは、どこの関数なのか分からない(苦笑)
この例では、(reduceGroupsに渡す関数内で使われている)Comparatorがシリアライズ可能ではない。
以下のようにSerializableをミックスインすれば、実行できるようになる。
val c = new java.util.Comparator[Person] with Serializable { def compare(x: Person, y: Person): Int = { x.age.compareTo(y.age) } }
同様に、関数内から他オブジェクトを(ユーティリティー的に)呼び出している場合でも、そのオブジェクト(クラス)をシリアライズ可能にしなければならない事があるので注意。