S-JIS[2014-09-15/2015-01-15] 変更履歴
Apache Sparkのシリアライズ方法であるKryoのメモ。
|
|
|
Spark(1系)のデフォルトのシリアライズ方法は、Java標準のシリアライズである。
つまり、RDDで保持する対象となるクラスはjava.io.Serializable(またはjava.io.Externalizable)をミックスイン(実装)している必要がある。
(Scalaのケースクラスは自動的にSerializableがミックスインされるので、ケースクラスを使う場合は気にしなくて良い)
それとは別に、別途シリアライザーを定義しておき、それを使う方法がある。
デフォルトではKryo serializationを使う方法が用意されている。
Kryoを「クリョ」と読んで「言いづらいなぁ」と思っていたのだけれど、「クライオ」と読むらしい^^;[/2014-09-16]
Kryoはオブジェクトのグラフ(つまりコレクション?)を効率よくシリアライズ/クローンする為のJavaフレームワークで、Java標準のシリアライズより高速・高圧縮で、場合によっては10倍くらい効率が良いらしい。
実際にJavaSerializerをKryoSerializerに替えてみたら、CSVファイルを読み込んで(Long,Double,String)
のタプルに変換して出力する処理が2倍弱速くなった(96秒→50秒)。[2015-01-15]
(Kryoを使う場合はRDDで保持するクラスにSerializableを実装する必要は無い)
package example
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.serializer.KryoRegistrator import com.esotericsoftware.kryo.Kryo
// RDDで保持するクラス class MyData(var n: Int) { override def toString(): String = "MyData(" + n + ")" }
// シリアライズ対象クラスを定義するクラス class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[MyData]) } }
Kryoを使う場合、KryoRegistratorを継承したクラスを用意する。
その中で、シリアライズする対象クラス(今回の例ではMyDataクラス)を登録する。
このKryoRegistratorを継承したクラス(上記のMyKryoRegistrator)を、SparkConfの「spark.kryo.registrator
」に指定する。
また、「spark.serializer
」にKryoSerializerクラスを指定する。
(しかしデフォルトでは、registratorを用意しなくても、KryoSerializerだけ指定しておけば自動的にシリアライズしてくれるっぽい)
あとは通常のSparkプログラムと同じ。
object KryoExample { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("kryo-example") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrator", "example.MyKryoRegistrator") val sc = new SparkContext(conf) try { val rdd1 = sc.makeRDD(Seq(new MyData(123), new MyData(456), new MyData(789))) val rdd2 = rdd1.map { d => d.n += 1; d } val rdd3 = rdd2.map { d => d.n *= 2; d } rdd3.collect.foreach(println) println("----") rdd2.collect.foreach(println) println("----") rdd1.collect.foreach(println) } finally { sc.stop() } } }
↓実行結果
MyData(248) MyData(914) MyData(1580) ---- MyData(124) MyData(457) MyData(790) ---- MyData(123) MyData(456) MyData(789)
この例では、敢えてMyDataを可変オブジェクト(フィールドの値を変更できる)にしてみたが、mapメソッドによって中身を書き換えても、元のRDDには影響していない。
メモリー上に保持するときもシリアライズして保持しているのだろうか。
少なくとも、makeRDD(parallelize)メソッドを使った場合はSeqのデータを各executor(ワーカーノード)に配布する必要があるので、最初のデータのシリアライズはしているらしい。[2014-09-16]