S-JIS[2017-01-14/2017-01-22] 変更履歴

Sparkサンプル

Spark2.1のサンプル。


概要

Sparkでのプログラミングは、Scalaのコレクションの関数の記述と似ている。

ScalaのコレクションではRangeやList等のインスタンスを作ってそれに対してmapやfilter関数を呼び出すが、
SparkではまずSpark用のオブジェクトを作り、そこからコレクションっぽいオブジェクトファイルを読み込むイテレーターを生成する。


当初のSparkではRDDというクラスを使ってプログラミングしていたが、
Spark1.3からDataFrameというクラスが導入され、
Spark1.6からDatasetというクラスが導入された。Spark2.0ではDatasetが正式となっている。

RDDよりもDataFrameの方が(最適化が効いて)高速だが型情報が消えてしまう為、Datasetを使うのが良い。(DataFrameはDatasetに統合されている)

参考: yubessyさんのApache Sparkの3つのAPI: RDD, DataFrameからDatasetへ


RDDのサンプル

RDDを使ってコーディングする場合、最初にSparkContextを生成する。

import org.apache.spark.SparkContext

case class Person(name: String, age: Long)

object RddExample {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext("local", "rdd-example")

    val rdd = sc.makeRDD(Seq(Person("hoge", 20)))
    rdd.foreach(println)
  }
}

その他のRDDのサンプル


DataFrameのサンプル

DataFrameを使ってコーディングする場合、最初にSparkSessionを生成する。

import org.apache.spark.sql.SparkSession

case class Person(name: String, age: Long)

object DataFrameExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local")
      .appName("dataframe-example")
      .getOrCreate()
    try {
      import spark.implicits._

      val df = Seq(Person("hoge", 20)).toDF()
//    df.show()
      df.foreach(row => println(row))
    } finally {
      spark.stop()
    }
  }
}

変数dfの型は「DataFrame」となる。つまり、「Personのデータである」という情報は抜け落ちる。
Spark2ではDataFrameは「Dataset[Row]」の別名になっており、Rowのデータとして扱う。


Datasetのサンプル

Datasetを使ってコーディングする場合、最初にSparkSessionを生成する。

import org.apache.spark.sql.SparkSession

case class Person(name: String, age: Long)

object DatasetExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local")
      .appName("dataset-example")
      .getOrCreate()
    try {
      import spark.implicits._

      val ds = Seq(Person("hoge", 20)).toDS()
//    ds.show()
      ds.foreach(person => println(person))
    } finally {
      spark.stop()
    }
  }
}

masterに「local」を指定すると、ローカル(単独環境)での実行となる。
local」は「local[*]」と等価で、括弧内の数値はスレッドプールの個数(並列性)を表している。
local[n]以外はクラスターでの実行用。→マスターURL

変数dsの型は「Dataset[Person]」となる。


シリアライズの注意

Sparkアプリケーションはシリアライズ可能である必要がある。[2017-01-22]

Sparkアプリケーション(自分で作ったプログラム)はシリアライズされてexecutorに渡される部分がある為、その部分はシリアライズ可能である必要がある。


例えば以下のようなプログラムは、実行すると例外が発生する。

case class Person(name: String, age: Long)
object KeyValueGroupedExample {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().getOrCreate()
    try {
      execute(spark)
    } finally {
      spark.stop()
    }
  }

  def execute(spark: SparkSession): Unit = {
    import spark.implicits._

    val ds = Seq[Person](Person("zzz", 20), Person("zzz", 19), Person("abc", 20)).toDS()

    val c = new java.util.Comparator[Person] {
      def compare(x: Person, y: Person): Int = {
        x.age.compareTo(y.age)
      }
    }
    val ds2 = ds.groupByKey(_.name).reduceGroups((p, r) => if (c.compare(p, r) <= 0) p else r)
    ds2.map(_._2).show()
  }
}

↓実行すると例外発生

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
〜
Caused by: java.io.NotSerializableException: com.example.KeyValueGroupedExample$$anon$1
Serialization stack:
	- object not serializable (class: com.example.KeyValueGroupedExample$$anon$1, value: com.example.KeyValueGroupedExample$$anon$1@33e434c8)
	- field (class: com.example.KeyValueGroupedExample$$anonfun$7, name: c$1, type: interface java.util.Comparator)
〜

「$anon$1」は関数に付けられているクラス名。これだけでは、どこの関数なのか分からない(苦笑)

この例では、(reduceGroupsに渡す関数内で使われている)Comparatorがシリアライズ可能ではない。
以下のようにSerializableをミックスインすれば、実行できるようになる。

    val c = new java.util.Comparator[Person] with Serializable {
      def compare(x: Person, y: Person): Int = {
        x.age.compareTo(y.age)
      }
    }

同様に、関数内から他オブジェクトを(ユーティリティー的に)呼び出している場合でも、そのオブジェクト(クラス)をシリアライズ可能にしなければならない事があるので注意。


Spark目次へ戻る / Scalaへ戻る / 技術メモへ戻る
メールの送信先:ひしだま