S-JIS[2014-09-15/2015-01-15] 変更履歴

Spark Kryo serialization

Apache Sparkのシリアライズ方法であるKryo(クライオ)のメモ。

  • Encoder(Spark2系のシリアライズ方法)

概要

Spark(1系)のデフォルトのシリアライズ方法は、Java標準のシリアライズである。
つまり、RDDで保持する対象となるクラスはjava.io.Serializable(またはjava.io.Externalizable)をミックスイン(実装)している必要がある。
(Scalaのケースクラスは自動的にSerializableがミックスインされるので、ケースクラスを使う場合は気にしなくて良い)

それとは別に、別途シリアライザーを定義しておき、それを使う方法がある。
デフォルトではKryo serializationを使う方法が用意されている。
Kryoを「クリョ」と読んで「言いづらいなぁ」と思っていたのだけれど、「クライオ」と読むらしい^^;[/2014-09-16]
Kryoはオブジェクトのグラフ(つまりコレクション?)を効率よくシリアライズ/クローンする為のJavaフレームワークで、Java標準のシリアライズより高速・高圧縮で、場合によっては10倍くらい効率が良いらしい。
実際にJavaSerializerをKryoSerializerに替えてみたら、CSVファイルを読み込んで(Long,Double,String)のタプルに変換して出力する処理が2倍弱速くなった(96秒→50秒)。[2015-01-15]
(Kryoを使う場合はRDDで保持するクラスにSerializableを実装する必要は無い)


package example
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.serializer.KryoRegistrator

import com.esotericsoftware.kryo.Kryo
// RDDで保持するクラス
class MyData(var n: Int) {

  override def toString(): String = "MyData(" + n + ")"
}
// シリアライズ対象クラスを定義するクラス
class MyKryoRegistrator extends KryoRegistrator {

  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[MyData])
  }
}

Kryoを使う場合、KryoRegistratorを継承したクラスを用意する。
その中で、シリアライズする対象クラス(今回の例ではMyDataクラス)を登録する。

このKryoRegistratorを継承したクラス(上記のMyKryoRegistrator)を、SparkConfの「spark.kryo.registrator」に指定する。
また、「spark.serializer」にKryoSerializerクラスを指定する。
(しかしデフォルトでは、registratorを用意しなくても、KryoSerializerだけ指定しておけば自動的にシリアライズしてくれるっぽい)

あとは通常のSparkプログラムと同じ。

object KryoExample {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("kryo-example")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.kryo.registrator", "example.MyKryoRegistrator")
    val sc = new SparkContext(conf)

    try {
      val rdd1 = sc.makeRDD(Seq(new MyData(123), new MyData(456), new MyData(789)))
      val rdd2 = rdd1.map { d => d.n += 1; d }
      val rdd3 = rdd2.map { d => d.n *= 2; d }

      rdd3.collect.foreach(println)
      println("----")
      rdd2.collect.foreach(println)
      println("----")
      rdd1.collect.foreach(println)
    } finally {
      sc.stop()
    }
  }
}

↓実行結果

MyData(248)
MyData(914)
MyData(1580)
----
MyData(124)
MyData(457)
MyData(790)
----
MyData(123)
MyData(456)
MyData(789)

この例では、敢えてMyDataを可変オブジェクト(フィールドの値を変更できる)にしてみたが、mapメソッドによって中身を書き換えても、元のRDDには影響していない。
メモリー上に保持するときもシリアライズして保持しているのだろうか。

少なくとも、makeRDDparallelize)メソッドを使った場合はSeqのデータを各executor(ワーカーノード)に配布する必要があるので、最初のデータのシリアライズはしているらしい。[2014-09-16]


Spark目次へ戻る / Scalaへ戻る / 技術メモへ戻る
メールの送信先:ひしだま