S-JIS[2014-09-07] 変更履歴

Spark パーティション

Apache Sparkのパーティションのメモ。


概要

RDDはScalaのコレクションのSeqのようなもので、データを順番に保持している。

RDDの内部はパーティションに分かれている。
パーティション毎に別々のサーバーでデータを保持する。したがって、分散処理する際にはパーティション毎に並列に処理される。

mapfilter等の基本的な操作の場合、データの順序は変わらない。
処理によっては、データの順序が変わることがある。(データの順序が保持されない)
(例えば重複データの無いRDDであっても、distinctの後は順序が変わってしまう)

「パーティションをどう分けるか」を決めるPartitionerというクラスもあるが、RDDが必ずPartitionerを保持しているとも限らない。


Sparkのパーティション数は、100〜1万くらいにするのが良いそうだ。

実際の処理はパーティション単位で並列に実行されるので、各パーティションが各サーバーのメモリー内に収まるよう、パーティションサイズは小さめにするのが良い。その為には、パーティション数を多めにする。

Sparkのタスクはオーバーヘッドがほとんどかからないので、パーティション数を多くしても問題ないらしい。
とは言え、さすがに1万を超えるとオーバーヘッドが無視できなくなってくるらしい。


makeRDDのパーティション

SparkContext.makeRDDparallelize)でSeqからRDDを作ると、いくつかのパーティションに分割される。

import org.apache.spark._
val sc = new SparkContext("local[4]", "partition-example")
val rdd = sc.makeRDD(1 to 100)

// パーティショナーを表示
printf("partitioner=%s\n", rdd.partitioner)

// パーティション毎に内容を表示
rdd.glom().mapPartitionsWithIndex((n, i) => i.map(a => "pno[%d]: %s".format(n, a.mkString(", "))), true).collect.foreach(println)

↓結果

partitioner=None
pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25
pno[1]: 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50
pno[2]: 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75
pno[3]: 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100

makeRDDメソッドでは、パーティション数は“Sparkが稼動するクラスター”のデフォルトの並列数になる。例えばマスターURLが「local[4]」の場合、デフォルトのパーティション数は4になる。
パーティションが分かれていても、データの順序は元のSeqと同じになっている。


HDFSのパーティション

HDFS(Hadoop分散ファイルシステム)上のファイルはブロック単位で分割されて各サーバー上に配置されている。
Sparkのパーティションはそのブロックに対応する。

val rdd = sc.textFile("/tmp/text/a.txt")

// パーティショナーを表示
printf("partitioner=%s\n", rdd.partitioner)

// パーティション数を表示
printf("partition.size=%d\n", rdd.partitions.size)

↓結果

partitioner=None
partition.size=2

textFileメソッドでは、パーティション数はブロック数になるらしいが、最低でも2になる。

実際にファイルを読み込むときも、(なるべく)ブロックの置かれているサーバー上でタスクが実行されるらしい。


パーティションの変更

パーティション数を変更するには、RDD.repartitionまたはcoalesceメソッドを使う。

また、演算することによってパーティション(内部の偏り)が変わりそうなメソッドでは、パーティション数やPartitionerが指定できるようになっている。
(パーティション数を指定するメソッドは、内部では大抵HashPartitionerを使うようになっている気がする)


パーティション数の変更

repartitionメソッドは、内部でcoalesceを呼び出している。

// メソッド定義のイメージ
def repartition(numPartitions: Int) = coalesce(numPartitions, true)

def coalesce(numPartitions: Int, shuffle: Boolean = false) : RDD[T]

coalesceは、新しいパーティション数と、シャッフルを行うかどうかのフラグを引数に取る。
repartitionメソッドはシャッフルを行う。coalesceのデフォルト(フラグ省略時)はシャッフルを行わない。

シャッフルを行うと、各パーティションの要素数が均等になる。ただし、要素の並び順は変わってしまう。

シャッフルを行わない場合、パーティション数を増やすことは出来ない。要素の並び順が変わらない反面、要素数は偏ることがある。

import org.apache.spark._
val sc = new SparkContext("local[4]", "partition-example")
val rdd = sc.makeRDD(1 to 20) // パーティション数は4

val rdd2 = rdd.repartition(N)

// パーティション毎に内容を表示
rdd2.glom().mapPartitionsWithIndex((n, i) => i.map(a => "pno[%d]: %s".format(n, a.mkString(", "))), true).collect.foreach(println)
メソッド呼び出し 実行結果 備考
val rdd2 = rdd pno[0]: 1, 2, 3, 4, 5
pno[1]: 6, 7, 8, 9, 10
pno[2]: 11, 12, 13, 14, 15
pno[3]: 16, 17, 18, 19, 20
元データ。
val rdd2 = rdd.repartition(3) pno[0]: 3, 8, 12, 15, 16, 19
pno[1]: 1, 4, 6, 9, 13, 17, 20
pno[2]: 2, 5, 7, 10, 11, 14, 18
repartitionでパーティション数を減らす場合。
要素数は均等だが、並び順が変わっている。
val rdd2 = rdd.repartition(5) pno[0]: 5, 10, 12, 16
pno[1]: 1, 6, 13, 17
pno[2]: 2, 7, 14, 18
pno[3]: 3, 8, 15, 19
pno[4]: 4, 9, 11, 20
repartitionでパーティション数を増やす場合。
要素数は均等だが、並び順が変わっている。
val rdd2 = rdd.coalesce(2) pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
pno[1]: 11, 12, 13, 14, 15, 16, 17, 18, 19, 20
coalesceでパーティション数を減らす場合。
並び順は変わっていない。
val rdd2 = rdd.coalesce(3) pno[0]: 1, 2, 3, 4, 5
pno[1]: 6, 7, 8, 9, 10
pno[2]: 11, 12, 13, 14, 15, 16, 17, 18, 19, 20
coalesceでパーティション数を減らす場合。
単純に繋げられたパーティションが出来ており(coalesceは合体とか合併するという意味だそうだ)、要素数が不均等になっている。
並び順は変わっていない。
val rdd2 = rdd.coalesce(5) pno[0]: 1, 2, 3, 4, 5
pno[1]: 6, 7, 8, 9, 10
pno[2]: 11, 12, 13, 14, 15
pno[3]: 16, 17, 18, 19, 20
coalesceでパーティション数を増やす場合。
パーティションは変わらない。

filterメソッドなどを呼び出して空のパーティションが出来ている場合、その空パーティションを処理する為のタスクも作られるので、無駄である。
そういった場合にrepartitionすると良い。

val sc = new SparkContext("local[4]", "partition-example")
val rdd = sc.makeRDD(1 to 100).filter(n => n<=10 || n>90) // パーティション数は4

val rdd2 = rdd.repartition(N)

// パーティション毎に内容を表示
rdd2.glom().mapPartitionsWithIndex((n, i) => i.map(a => "pno[%d]: %s".format(n, a.mkString(", "))), true).collect.foreach(println)
メソッド呼び出し 実行結果 備考
val rdd2 = rdd pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
pno[1]:
pno[2]:
pno[3]: 91, 92, 93, 94, 95, 96, 97, 98, 99, 100
元データ。
val rdd2 = rdd.repartition(3) pno[0]: 3, 6, 9, 91, 94, 97, 100
pno[1]: 1, 4, 7, 10, 92, 95, 98
pno[2]: 2, 5, 8, 93, 96, 99
repartitionの場合、要素数が均等(に近くなる)になるよう再配置される。
val rdd2 = rdd.repartition(4) pno[0]: 2, 6, 10, 92, 96, 100
pno[1]: 3, 7, 93, 97
pno[2]: 4, 8, 94, 98
pno[3]: 1, 5, 9, 91, 95, 99
val rdd2 = rdd.repartition(5) pno[0]: 5, 10, 91, 96
pno[1]: 1, 6, 92, 97
pno[2]: 2, 7, 93, 98
pno[3]: 3, 8, 94, 99
pno[4]: 4, 9, 95, 100
val rdd2 = rdd.coalesce(2) pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
pno[1]: 91, 92, 93, 94, 95, 96, 97, 98, 99, 100
coalesceの場合、パーティション数によっては空のパーティションが残ってしまう。
val rdd2 = rdd.coalesce(3) pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
pno[1]:
pno[2]: 91, 92, 93, 94, 95, 96, 97, 98, 99, 100
val rdd2 = rdd.coalesce(4) pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
pno[1]:
pno[2]:
pno[3]: 91, 92, 93, 94, 95, 96, 97, 98, 99, 100
val rdd2 = rdd.coalesce(5) pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
pno[1]:
pno[2]:
pno[3]: 91, 92, 93, 94, 95, 96, 97, 98, 99, 100

Spark目次へ戻る / Scalaへ戻る / 技術メモへ戻る
メールの送信先:ひしだま