S-JIS[2014-09-07] 変更履歴
Apache Sparkのパーティションのメモ。
|
RDDはScalaのコレクションのSeqのようなもので、データを順番に保持している。
RDDの内部はパーティションに分かれている。
パーティション毎に別々のサーバーでデータを保持する。したがって、分散処理する際にはパーティション毎に並列に処理される。
mapやfilter等の基本的な操作の場合、データの順序は変わらない。
処理によっては、データの順序が変わることがある。(データの順序が保持されない)
(例えば重複データの無いRDDであっても、distinctの後は順序が変わってしまう)
「パーティションをどう分けるか」を決めるPartitionerというクラスもあるが、RDDが必ずPartitionerを保持しているとも限らない。
Sparkのパーティション数は、100〜1万くらいにするのが良いそうだ。
実際の処理はパーティション単位で並列に実行されるので、各パーティションが各サーバーのメモリー内に収まるよう、パーティションサイズは小さめにするのが良い。その為には、パーティション数を多めにする。
Sparkのタスクはオーバーヘッドがほとんどかからないので、パーティション数を多くしても問題ないらしい。
とは言え、さすがに1万を超えるとオーバーヘッドが無視できなくなってくるらしい。
SparkContext.makeRDD(parallelize)でSeqからRDDを作ると、いくつかのパーティションに分割される。
import org.apache.spark._
val sc = new SparkContext("local[4]", "partition-example") val rdd = sc.makeRDD(1 to 100) // パーティショナーを表示 printf("partitioner=%s\n", rdd.partitioner) // パーティション毎に内容を表示 rdd.glom().mapPartitionsWithIndex((n, i) => i.map(a => "pno[%d]: %s".format(n, a.mkString(", "))), true).collect.foreach(println)
↓結果
partitioner=None pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25 pno[1]: 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50 pno[2]: 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75 pno[3]: 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100
makeRDDメソッドでは、パーティション数は“Sparkが稼動するクラスター”のデフォルトの並列数になる。例えばマスターURLが「local[4]
」の場合、デフォルトのパーティション数は4になる。
パーティションが分かれていても、データの順序は元のSeqと同じになっている。
HDFS(Hadoop分散ファイルシステム)上のファイルはブロック単位で分割されて各サーバー上に配置されている。
Sparkのパーティションはそのブロックに対応する。
val rdd = sc.textFile("/tmp/text/a.txt")
// パーティショナーを表示
printf("partitioner=%s\n", rdd.partitioner)
// パーティション数を表示
printf("partition.size=%d\n", rdd.partitions.size)
↓結果
partitioner=None partition.size=2
textFileメソッドでは、パーティション数はブロック数になるらしいが、最低でも2になる。
実際にファイルを読み込むときも、(なるべく)ブロックの置かれているサーバー上でタスクが実行されるらしい。
パーティション数を変更するには、RDD.repartitionまたはcoalesceメソッドを使う。
また、演算することによってパーティション(内部の偏り)が変わりそうなメソッドでは、パーティション数やPartitionerが指定できるようになっている。
(パーティション数を指定するメソッドは、内部では大抵HashPartitionerを使うようになっている気がする)
repartitionメソッドは、内部でcoalesceを呼び出している。
// メソッド定義のイメージ def repartition(numPartitions: Int) = coalesce(numPartitions, true) def coalesce(numPartitions: Int, shuffle: Boolean = false) : RDD[T]
coalesceは、新しいパーティション数と、シャッフルを行うかどうかのフラグを引数に取る。
repartitionメソッドはシャッフルを行う。coalesceのデフォルト(フラグ省略時)はシャッフルを行わない。
シャッフルを行うと、各パーティションの要素数が均等になる。ただし、要素の並び順は変わってしまう。
シャッフルを行わない場合、パーティション数を増やすことは出来ない。要素の並び順が変わらない反面、要素数は偏ることがある。
import org.apache.spark._
val sc = new SparkContext("local[4]", "partition-example") val rdd = sc.makeRDD(1 to 20) // パーティション数は4 val rdd2 = rdd.repartition(N) // パーティション毎に内容を表示 rdd2.glom().mapPartitionsWithIndex((n, i) => i.map(a => "pno[%d]: %s".format(n, a.mkString(", "))), true).collect.foreach(println)
メソッド呼び出し | 実行結果 | 備考 |
---|---|---|
val rdd2 = rdd |
pno[0]: 1, 2, 3, 4, 5 |
元データ。 |
val rdd2 = rdd.repartition(3) |
pno[0]: 3, 8, 12, 15, 16, 19 |
repartitionでパーティション数を減らす場合。 要素数は均等だが、並び順が変わっている。 |
val rdd2 = rdd.repartition(5) |
pno[0]: 5, 10, 12, 16 |
repartitionでパーティション数を増やす場合。 要素数は均等だが、並び順が変わっている。 |
val rdd2 = rdd.coalesce(2) |
pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 |
coalesceでパーティション数を減らす場合。 並び順は変わっていない。 |
val rdd2 = rdd.coalesce(3) |
pno[0]: 1, 2, 3, 4, 5 |
coalesceでパーティション数を減らす場合。 単純に繋げられたパーティションが出来ており(coalesceは合体とか合併するという意味だそうだ)、要素数が不均等になっている。 並び順は変わっていない。 |
val rdd2 = rdd.coalesce(5) |
pno[0]: 1, 2, 3, 4, 5 |
coalesceでパーティション数を増やす場合。 パーティションは変わらない。 |
filterメソッドなどを呼び出して空のパーティションが出来ている場合、その空パーティションを処理する為のタスクも作られるので、無駄である。
そういった場合にrepartitionすると良い。
val sc = new SparkContext("local[4]", "partition-example") val rdd = sc.makeRDD(1 to 100).filter(n => n<=10 || n>90) // パーティション数は4 val rdd2 = rdd.repartition(N) // パーティション毎に内容を表示 rdd2.glom().mapPartitionsWithIndex((n, i) => i.map(a => "pno[%d]: %s".format(n, a.mkString(", "))), true).collect.foreach(println)
メソッド呼び出し | 実行結果 | 備考 |
---|---|---|
val rdd2 = rdd |
pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 |
元データ。 |
val rdd2 = rdd.repartition(3) |
pno[0]: 3, 6, 9, 91, 94, 97, 100 |
repartitionの場合、要素数が均等(に近くなる)になるよう再配置される。 |
val rdd2 = rdd.repartition(4) |
pno[0]: 2, 6, 10, 92, 96, 100 |
|
val rdd2 = rdd.repartition(5) |
pno[0]: 5, 10, 91, 96 |
|
val rdd2 = rdd.coalesce(2) |
pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 |
coalesceの場合、パーティション数によっては空のパーティションが残ってしまう。 |
val rdd2 = rdd.coalesce(3) |
pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 |
|
val rdd2 = rdd.coalesce(4) |
pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 |
|
val rdd2 = rdd.coalesce(5) |
pno[0]: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 |