S-JIS[2014-08-23/2017-07-26] 変更履歴

RDD

Apache Sparkで主に使うRDDについて。


概要

RDD(Resilient Distributed Dataset)は、不変で分散処理可能なコレクション。


最初のRDDはSparkContextを使って生成する。
Scalaのコレクション(Seq)から作る方法と、ファイル等から読み込んで作る方法がある。

RDDクラスにはfilterやmapといったScalaのコレクションと同様のメソッドが用意されている。
他にも特定用途に特化したRDDクラスがあり、暗黙変換によって意識せずに変換できる。
この暗黙変換メソッドはSparkContextオブジェクトに定義されているので、一緒にインポートしておくのが良い。

import org.apache.spark._
import org.apache.spark.SparkContext._

RDDクラス自身はorg.apache.spark.rddパッケージだが、明示的にクラス名を記述するとき以外はインポートしておく必要は無い。

クラス RDDとしての定義 説明
RDD[T]   基本的なクラス。
PairRDDFunctions[K, V] RDD[(K, V)] キーと値のペアのRDD。キーによる結合を行いたい場合に使う。
SchemaRDD RDD[Row] Spark SQLで使用するRDD。[2014-08-25]

RDDの不変とは

RDDの不変(イミュータブル)とは、“依存RDD(系譜)”と“処理内容”が変わらないという意味であり、「“処理した結果のデータ(値)”が常に変わらない」という意味ではない。[2014-09-08]

「一度出来上がったデータ(値)は、正常な手段では変更されない」という意味では、データが不変と言えなくもないが。

Sparkでは、障害が起きてRDDの一部が消失すると、入力元の(依存している)RDDを使って再処理してデータを作り直す。
基本的には、依存RDDのデータ(値)が障害前と同じであれば、再処理した結果のデータも障害前と同じになる。

しかし、障害前と同じにならないケースもある。
破損していない入力データが見つかるまで再帰的に依存RDDを辿っていくことになるが、一番先頭まで辿り着いてしまうと、HDFS等の外部システムからデータを再度読み込むことになる。
もしそのデータの内容が変わってしまっていたら、再処理したRDDの内容は障害前と変わってしまう。

また、RDDの処理の中でタイムスタンプを使ったり乱数を使ったりしていると、処理結果は当然毎回変わってしまう。
さすがにSparkはプログラマーが記述した処理の内容まで把握して毎回同じ結果を出すようにすることは出来ない。

そういったケースでも本当にデータを不変にしたいのであれば、要所々々で(信頼できる)外部ストレージにデータを保存(永続化)するようプログラミングする必要がある。
(「信頼できる」というのは、耐障害性のある、といった意味)


スレッドセーフ

Sparkでは、Sparkクラスターの各スレーブノード上でexecutor(JavaVM)が動き、その中で複数のタスクを(マルチスレッドで)並列に処理するので、スレッドセーフになるようにプログラミングする必要がある。[2017-07-26]

ただし、RDDのmapfilter等に渡す関数(関数オブジェクト)はexecutorに渡す際にシリアライズされ、executorの各タスクで別々にデシリアライズされる。
つまりタスク毎に別インスタンスになるので、関数の中のインスタンスが共有されることはない。
(例えば関数の外でインスタンス化したSimpleDateFormat(SimpleDateFormatはスレッドセーフではない)を関数の中で使ってもよい)
(むしろ複数タスク間で共有したい場合は共有変数を使う必要がある)

参考: maropuさんのツイート


RDDのメソッド


変換処理

新しいRDDを返す処理。(JavaのStreamの「中間処理」に当たる)
これらのメソッドは新しいRDDを返すだけで、実際の処理は行わない(実行されない)。
アクションのメソッドを実行すると実際の処理が行われる。

ほとんどの処理では、処理結果(出力)の並び順は入力データの並び順が保持される。[2014-09-07]
パーティション数やParitionerを指定するメソッドでは、基本的に並び順は保持されない(Paritionerに応じて順序が入れ替わる)。

クラス メソッド 備考 出力順
名称 引数 戻り型 コーディング 結果
RDD[T] map [U] f: T => U RDD[U] 要素を別の型に変換する。 保持 val rdd = sc.makeRDD(Seq(1, 2, 3))
val rdd2 = rdd.map("a" + _)
a1, a2, a3
RDD[T] flatMap [U] f: T => TraversableOnce[U] RDD[U] 要素を別の型(要素数は複数可)に変換する。 保持 val rdd = sc.makeRDD(Seq(0, 1, 2))
val rdd2 = rdd.flatMap(n => Seq(n, 'A' + n))
0, 65, 1, 66, 2, 67
RDD[T] filter   f: T => Boolean RDD[T] 条件を満たす要素だけ抽出する。 保持 val rdd = sc.makeRDD(1 to 5)
val rdd2 = rdd.filter(_ % 2 == 1)
1, 3, 5
RDD[T] collect [U] f: PartialFunction[T, U] RDD[U] filtermapを合わせたようなもの。
caseにマッチした結果だけでコレクションが作られる。
Scalaのコレクションのcollect相当。
要素を収集して配列を返すcollect
保持 val rdd = sc.makeRDD(Seq(1, 2, 3))
val rdd2 = rdd.collect{ case 1 => "one"; case 2 => "two" }
one, two
RDD[T] distinct   numPartitions: Int = partitions.size RDD[T] 重複を排除する。 × val rdd = sc.makeRDD(Seq(1, 2, 3, 2, 1))
val rdd2 = rdd.distinct()
1, 2, 3
RDD[T] repartition   numPartitions: Int RDD[T] パーティション数を変更する。
使用例
shuffleをtrueにしたcoalesceと同じ。
× rdd.repartition(100)  
RDD[T] coalesce   numPartitions: Int
shuffle: Boolean = false
RDD[T] パーティション数を変更する。
使用例

shuffleがtrueの場合、並び順は保持されない。
(たぶん)新しい各パーティション内の要素数がなるべく均等になるよう分散される。

shuffleがfalseの場合、パーティション数を減らすことは出来るが増やすことは出来ない(多くしようとしても変わらない)。
減らす場合、一部のパーティションを単純につなぐだけらしく、要素数が偏る。
「coalesce」は合体・合併させるといった意味らしい。

引数次第 rdd.coalesce(100)  
RDD[T] sample   withReplacement: Boolean
fraction: Double
seed: Long = Utils.random.nextLong
RDD[T] ランダムに値を抽出する。
withReplacementにtrueを指定すると、同じ値が何度も返ることがある。
takeSample
保持 val rdd = sc.makeRDD(1 to 10)
val rdd2 = rdd.sample(false, 0.5)
5, 7, 9, 10
RDD[T] randomSplit   weights: Array[Double]
seed: Long = Utils.random.nextLong
Array[RDD[T]] 指定した割合でランダムに分割する。 保持 val rdd = sc.makeRDD(1 to 10)
val rdds = rdd.randomSplit(Array(0.3, 0.7))
//val result = rdds.map(_.collect)
1, 5, 7, 8

2, 3, 4, 6, 9, 10
RDD[T] union   other: RDD[T] RDD[T] 他のRDDと結合する。
パーティションは元のパーティションとotherのパーティションの単純な合計になる。(パーティション数が単純に増える)
SparkContext.union
保持 val rdd1 = sc.makeRDD(Seq(1, 2, 3))
val rdd2 = sc.makeRDD(Seq(3, 4))
val rdd = rdd1.union(rdd2)
1, 2, 3, 3, 4
RDD[T] ++   other: RDD[T] RDD[T] unionと同じ。 保持 val rdd1 = sc.makeRDD(Seq(1, 2, 3))
val rdd2 = sc.makeRDD(Seq(3, 4))
val rdd = rdd1 ++ rdd2
1, 2, 3, 3, 4
RDD[T] glom     RDD[Array[T]] 各パーティションの内容を配列化したRDDを生成する。 保持 val rdd = sc.makeRDD(1 to 10).coalesce(3)
val rdd2 = rdd.glom()
Array(1, 2)
Array(3, 4, 5, 6)
Array(7, 8, 9, 10)
RDD[T] keyBy [K] f: T => K RDD[(K, T)] キーを生成して、“キーと要素のペア”のRDDを返す。 保持 val rdd = sc.makeRDD(Seq("a", "bc", "d", "e"))
val rdd2 = rdd.keyBy(s => s.length)
(1,a), (2,bc), (1,d), (1,e)
OrderedRDDFunctions[K, V, P] sortByKey   ascending: Boolean = true
numPartitions: Int = partitions.size
RDD[P] キーでソートする。[2014-09-15]
要素はタプル(K, V)でKはソート可能である必要がある。
ソート val rdd = sc.makeRDD(Seq(3 -> "a", 1 -> "b", 2 -> "c"))
val rdd2 = rdd.sortByKey(true)
(1,b), (2,c), (3,a)
RDD[T] groupBy [K] f: T => K RDD[(K, Iterable[T])] キーを生成してグルーピングする。
元のRDDにParitionerが定義されている場合はそれを使う。
定義されていない場合はHashPartitionerを使う。
× val rdd = sc.makeRDD(1 to 5)
val rdd2 = rdd.groupBy(_ % 2)
(0,ArrayBuffer(2, 4)), (1,ArrayBuffer(1, 3, 5))
RDD[T] groupBy [K] f: T => K
numPartitions: Int
RDD[(K, Iterable[T])] キーを生成してグルーピングする。
groupBy(f, new HashPartitioner(numPartitions))と同じ。
× val rdd = sc.makeRDD(1 to 5)
val rdd2 = rdd.groupBy((n: Int) => n % 2, 3)
(0,ArrayBuffer(2, 4))
(1,ArrayBuffer(1, 3, 5))
RDD[T] groupBy [K] f: T => K
p: Partitioner
RDD[(K, Iterable[T])] キーを生成してグルーピングする。

HashPartitionerが指定された場合、キーの並び順は保証されない。
キー毎の値の並び順は元の順序が保持されるようだ。
キーの種類数が指定したパーティション数より少ない場合、キーの種類数までのパーティションしか作られない。

     
PairRDDFunctions[K, V] groupByKey   numPartitions: Int RDD[(K, Iterable[V])] キー毎に値をグルーピングする。
要素はタプル(K, V)である必要がある。
groupByKey(new HashPartitioner(numPartitions))と同じ。
× val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3, "a"->4))
val rdd2 = rdd.groupByKey(2)
(b,ArrayBuffer(2)),
(a,ArrayBuffer(1, 4)),
(c,ArrayBuffer(3))
PairRDDFunctions[K, V] groupByKey   partitioner: Partitioner RDD[(K, Iterable[V])] キー毎に値をグルーピングする。

HashPartitionerが指定された場合、キーの並び順は保証されない。
キー毎の値の並び順は元の順序が保持されるようだ。
キーの種類数が指定したパーティション数より少ない場合、キーの種類数までのパーティションしか作られない。

     
PairRDDFunctions[K, V] partitionBy   partitioner: Partitioner RDD[(K, V)] partitionerで分割されたRDDを返す。 × val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3, "a"->4))
val rdd2 = rdd.partitionBy(new HashPartitioner(3))
(c,3), (a,1), (a,4), (b,2)
RDD[T] pipe   command: String RDD[String] シェルのコマンド(外部プロセス)を実行する。
データは標準入力に渡され、標準出力に出力された内容がRDDの内容になる。

commandにコマンドを書く。引数もスペース区切りで入れられる。
ただしスペース入りの引数を使いたい場合はSeq[String]版を使う。

envには環境変数を渡せる。

保持 val rdd = sc.makeRDD(Seq("a", "b", "c"))
val rdd2 = rdd.pipe("cat")
a, b, c
RDD[T] pipe   command: String
env: Map[String, String]
RDD[String] val rdd = sc.makeRDD(Seq("A", "B", "C"))
val rdd2 = rdd.pipe("/tmp/test.sh", Map("zzz" -> "s"))

test.shは以下のような感じ。実行権限が必要。
while read line
do
  echo $zzz$line
done
sA, sB, sC
RDD[T] pipe   command: Seq[String]
env: Map[String, String] = Map()
printPipeContext: (String => Unit) => Unit = null
printRDDElement: (T, String => Unit) => Unit = null
separateWorkingDir: Boolean = false
RDD[String] val rdd = sc.makeRDD(Seq("A", "B", "C"))
val rdd2 = rdd.pipe(Seq("awk", """{ print "p" $1 }"""))
pA, pB, pC
PairRDDFunctions[K, V] keys     RDD[K] 要素のタプル(K, V)からキーのみに変換する。 保持 val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3, "a"->2))
val ks = rdd.keys
a, b, c, a
PairRDDFunctions[K, V] values     RDD[V] 要素のタプル(K, V)から値のみに変換する。 保持 val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3, "a"->2))
val vs = rdd.values
1, 2, 3, 2
PairRDDFunctions[K, V] mapValues [U] f: V => U RDD[(K, U)] 値を別の型に変換する。
要素はタプル(K, V)である必要がある。
保持 val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3))
val rdd2 = rdd.mapValues("a" + _)
(a,a1), (b,a2), (c,a3)
PairRDDFunctions[K, V] flatMapValues [U] f: V => TraversableOnce[U] RDD[(K, U)] 値を別の型(個数は複数可)に変換する。 保持 val rdd = sc.makeRDD(Seq("a"->0, "b"->1, "c"->2))
val rdd2 = rdd.flatMapValues(n => Seq(n, 'A'+n))
(a,0), (a,65), (b,1), (b,66), (c,2), (c,67)
RDD[T] mapPartitions [U] f: Iterator[T] => Iterator[U]
preservesPartitioning: Boolean = false
RDD[U] パーティションを別の型のパーティションに変換する。
map
保持 val rdd = sc.makeRDD(Seq(1, 2, 3))
val rdd2 = rdd.mapPartitions(i => i.map("a" * _))
a, aa, aaa
RDD[T] mapPartitionsWithIndex [U] f: (Int, Iterator[T]) => Iterator[U]
preservesPartitioning: Boolean = false
RDD[U] パーティション番号付きのmapPartitions 保持 val rdd = sc.makeRDD(Seq("a", "b", "c", "d", "e", "f", "g")).coalesce(3)
val rdd2 = rdd.mapPartitionsWithIndex((n, i) => i.map(s => s + n))
a0, b1, c1, d1, e2, f2, g2
RDD[T] zipWithIndex     RDD[(T, Long)] 要素と位置(インデックス)のペアを生成する。
ScalaのコレクションのzipWithIndex相当。
保持 val rdd = sc.makeRDD(Seq("a", "b", "c"))
val rdd2 = rdd.zipWithIndex()
(a,0), (b,1), (c,2)
RDD[T] zipWithUniqueId     RDD[(T, Long)] 要素と“重複しない番号”のペアを生成する。
この番号は必ず一意(ユニーク)になるが、連番になるとは限らない。
保持 val rdd = sc.makeRDD(Seq("a", "b", "c"))
val rdd2 = rdd.zipWithUniqueId()
(a,2), (b,5), (c,7)
PairRDDFunctions[K, V] combineByKey   createCombiner: V => C
mergeValue: (C, V) => C
mergeCombiners: (C, C) => C
RDD[(K, C)] キー毎に値を結合する。
要素はタプル(K, V)である必要がある。

デフォルトではHashPartitionerが使われるので、キーの並び順は保証されない。

× val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3, "a"->4))
val rdd2 = rdd.combineByKey(
  (v: Int) => v.toLong,
  (c: Long, v: Int) => c + v,
  (c1: Long, c2: Long) => c1 + c2)
(a,5), (b,2), (c,3)
PairRDDFunctions[K, V] combineByKey [C] createCombiner: V => C
mergeValue: (C, V) => C
mergeCombiners: (C, C) => C
numPartitions: Int
RDD[(K, C)]    
PairRDDFunctions[K, V] combineByKey [C] createCombiner: V => C
mergeValue: (C, V) => C
mergeCombiners: (C, C) => C
partitioner: Partitioner
mapSideCombine: Boolean = true
serializer: Serializer = null
RDD[(K, C)]    
PairRDDFunctions[K, V] foldByKey   zeroValue: V
func: (V, V) => V
RDD[(K, V)] キー毎に値を畳み込む。
fold

デフォルトではHashPartitionerが使われるので、キーの並び順は保証されない。

× val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3, "a"->4))
val rdd2 = rdd.foldByKey(0)(_ + _)
(a,5), (b,2), (c,3)
PairRDDFunctions[K, V] foldByKey   zeroValue: V
numPartitions: Int
func: (V, V) => V
RDD[(K, V)]    
PairRDDFunctions[K, V] foldByKey   zeroValue: V
partitioner: Partitioner
func: (V, V) => V
RDD[(K, V)]    
PairRDDFunctions[K, V] reduceByKey   func: (V, V) => V
numPartitions: Int
RDD[(K, V)] キー毎に要素同士を演算する。
reduceByKey(new HashPartitioner(numPartitions), func)と同じ。
reduce
× val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3, "a"->4))
val rdd2 = rdd.reduceByKey(_ + _)
(a,5), (b,2), (c,3)
PairRDDFunctions[K, V] reduceByKey   partitioner: Partitioner
func: (V, V) => V
RDD[(K, V)] キー毎に要素同士を演算する。
combineByKey(v=>v, func, func, partitioner)と同じ。
×    
PairRDDFunctions[K, V] countApproxDistinctByKey   relativeSD: Double = 0.05 RDD[(K, Long)] キー毎に値の種類数を返す。
countApproxDistinct
× val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3, "a"->4, "a"->1))
val count = rdd.
countApproxDistinctByKey()
(a,2), (b,1), (c,1)
PairRDDFunctions[K, V] countApproxDistinctByKey   relativeSD: Double
numPartitions: Int
RDD[(K, Long)]    
PairRDDFunctions[K, V] countApproxDistinctByKey   relativeSD: Double
partitioner: Partitioner
RDD[(K, Long)]    

結合処理

複数のRDDを結合して新しいRDDを生成する処理。

クラス メソッド 備考 出力順
名称 引数 戻り型 コーディング 結果
RDD[T] zip [U] other: RDD[U] RDD[(T, U)] 要素をペアにする。
Scalaのコレクションのzip相当。
双方のRDDのパーティション数は同じでなければならない。
また、互いのパーティションの要素数も同じでなければならない。
(例えば、片方のRDDが他方からmapによって作られた場合はこれらの条件を満たす)
保持 val rdd1 = sc.makeRDD(Seq("a", "b", "c")).coalesce(2)
val rdd2 = sc.makeRDD(Seq(1, 2, 3)).coalesce(2)
val rdd = rdd1.zip(rdd2)
(a,1), (b,2), (c,3)
RDD[T] zipPartitions [B, V] rdd2: RDD[B]
preservesPartitioning: Boolean
f: (Iterator[T], Iterator[B]) => Iterator[V]
RDD[V] パーティションをペアにする。
双方のRDDのパーティション数は同じでなければならない。
(パーティション毎の要素数は何でもよい)
保持 val rdd1 = sc.makeRDD(Seq("a", "b", "c", "d")).coalesce(2)
val rdd2 = sc.makeRDD(Seq(1, 2)).coalesce(2)
val rdd = rdd1.zipPartitions(rdd2, false)(
  (i1, i2) => new Iterator[String] {
    def hasNext: Boolean = i1.hasNext || i2.hasNext
    def next() = {
      val s = if (i1.hasNext) i1.next() else ""
      val n = if (i2.hasNext) i2.next() else 0
      s + n
    }
  })
a1, b0, c2, d0
RDD[T] zipPartitions [B, V] rdd2: RDD[B]
f: (Iterator[T], Iterator[B]) => Iterator[V]
RDD[V] val rdd1 = sc.makeRDD(Seq("a", "b", "c", "d")).coalesce(2)
val rdd2 = sc.makeRDD(Seq(1, 2)).coalesce(2)
val rdd = rdd1.zipPartitions(rdd2)(
  (i1, i2) => new Iterator[String] {
    def hasNext: Boolean = i1.hasNext || i2.hasNext
    def next() = {
      val s = if (i1.hasNext) i1.next() else ""
      val n = if (i2.hasNext) i2.next() else 0
      s + n
    }
  })
a1, b0, c2, d0
RDD[T] zipPartitions [B, C, V] rdd2: RDD[B]
rdd3: RDD[C]
preservesPartitioning: Boolean
f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]
RDD[V]        
RDD[T] zipPartitions [B, C, V] rdd2: RDD[B]
rdd3: RDD[C]
f: (Iterator[T], Iterator[B]) => Iterator[V]
RDD[V]    
RDD[T] zipPartitions [B, C, D, V] rdd2: RDD[B]
rdd3: RDD[C]
rdd4: RDD[D]
preservesPartitioning: Boolean
f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]
RDD[V]        
RDD[T] zipPartitions [B, C, D, V] rdd2: RDD[B]
rdd3: RDD[C]
rdd4: RDD[D]
f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]
RDD[V]    
RDD[T] cartesian [U] other: RDD[U] RDD[(T, U)] 他のRDDとのデカルト積(cartesian product)(直積)を生成する。
パーティション数も自分のRDDのパーティション数とotherのパーティション数の積になる。
保持 val rdd1 = sc.makeRDD(Seq("a", "b", "c"))
val rdd2 = sc.makeRDD(Seq(1, 2))
val rdd = rdd1.cartesian(rdd2)
(a,1), (a,2), (b,1), (b,2), (c,1), (c,2)
RDD[T] intersection   other: RDD[T] RDD[T] 他のRDDと共通の値を抽出する。 × val rdd1 = sc.makeRDD(Seq(1, 2, 3))
val rdd2 = sc.makeRDD(Seq(2, 3, 4))
val rdd = rdd1.intersection(rdd2)
2, 3
RDD[T] intersection   other: RDD[T],
partitioner: Partitioner
RDD[T]    
RDD[T] intersection   other: RDD[T]
numPartitions: Int
RDD[T]    
RDD[T] subtract   other: RDD[T] RDD[T] 他のRDDとの差集合を生成する。 × val rdd1 = sc.makeRDD(Seq(1, 2, 3, 4))
val rdd2 = sc.makeRDD(Seq(2, 4, 6))
val rdd = rdd1.subtract(rdd2)
1, 3
RDD[T] subtract   other: RDD[T]
numPartitions: Int
RDD[T]    
RDD[T] subtract   other: RDD[T]
p: Partitioner
RDD[T]    
PairRDDFunctions[K, V] subtractByKey [W] other: RDD[(K, W)] RDD[(K, V)] 他のRDDとの差集合(キーで比較)を生成する。
要素はタプル(K, V)である必要がある。
× val rdd1 = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3, "d"->4))
val rdd2 = sc.makeRDD(Seq("b"->"B", "d"->"D", "e"->"E"))
val rdd = rdd1.subtractByKey(rdd2)
(a,1), (c,3)
PairRDDFunctions[K, V] subtractByKey [W] other: RDD[(K, W)]
numPartitions: Int
RDD[(K, V)]    
PairRDDFunctions[K, V] subtractByKey [W] other: RDD[(K, W)]
p: Partitioner
RDD[(K, V)]    
PairRDDFunctions[K, V] join [W] other: RDD[(K, W)] RDD[(K, (V, W))] 他のRDDを結合(inner join)する。
要素はタプル(K, V)である必要がある。
× val rdd1 = sc.makeRDD(Seq("a"->1, "b"->2, "a"->3, "b"->4))
val rdd2 = sc.makeRDD(Seq("b"->"foo", "c"->"bar"))
val rdd = rdd1.join(rdd2)
(b,(2,foo)), (b,(4,foo))
PairRDDFunctions[K, V] join [W] other: RDD[(K, W)]
numPartitions: Int
RDD[(K, (V, W))]    
PairRDDFunctions[K, V] join [W] other: RDD[(K, W)]
partitioner: Partitioner
RDD[(K, (V, W))]    
PairRDDFunctions[K, V] leftOuterJoin [W] other: RDD[(K, W)] RDD[(K, (V, Option[W]))] 他のRDDを結合(left outer join)する。
要素はタプル(K, V)である必要がある。
× val rdd1 = sc.makeRDD(Seq("a"->1, "b"->2, "a"->3, "b"->4))
val rdd2 = sc.makeRDD(Seq("b"->"foo", "c"->"bar"))
val rdd = rdd1.leftOuterJoin(rdd2)
(a,(1,None)), (a,(3,None)), (b,(2,Some(foo))), (b,(4,Some(foo)))
PairRDDFunctions[K, V] leftOuterJoin [W] other: RDD[(K, W)]
numPartitions: Int
RDD[(K, (V, Option[W]))]    
PairRDDFunctions[K, V] leftOuterJoin [W] other: RDD[(K, W)]
partitioner: Partitioner
RDD[(K, (V, Option[W]))]    
PairRDDFunctions[K, V] rightOuterJoin [W] other: RDD[(K, W)] RDD[(K, (Option[V], W))] 他のRDDを結合(right outer join)する。
要素はタプル(K, V)である必要がある。
× val rdd1 = sc.makeRDD(Seq("a"->1, "b"->2, "a"->3, "b"->4))
val rdd2 = sc.makeRDD(Seq("b"->"foo", "c"->"bar"))
val rdd = rdd1.rightOuterJoin(rdd2)
(b,(Some(2),foo)), (b,(Some(4),foo)), (c,(None,bar))
PairRDDFunctions[K, V] rightOuterJoin [W] other: RDD[(K, W)]
numPartitions: Int
RDD[(K, (Option[V], W))]    
PairRDDFunctions[K, V] rightOuterJoin [W] other: RDD[(K, W)]
partitioner: Partitioner
RDD[(K, (Option[V], W))]    
PairRDDFunctions[K, V] cogroup [W] other: RDD[(K, W)] RDD[(K, (Iterable[V], Iterable[W]))] 同じキー同士で値をまとめる。
要素はタプル(K, V)である必要がある。

キー毎の値の並び順は入力データと同じようだ。

× val rdd1 = sc.makeRDD(Seq("a"->1, "b"->2, "a"->3))
val rdd2 = sc.makeRDD(Seq("b"->"foo", "c"->"bar"))
val rdd = rdd1.cogroup(rdd2)
(a,(ArrayBuffer(1, 3),ArrayBuffer())),
(b,(ArrayBuffer(2),ArrayBuffer(foo))),
(c,(ArrayBuffer(),ArrayBuffer(bar)))
PairRDDFunctions[K, V] cogroup [W] other: RDD[(K, W)]
numPartitions: Int
RDD[(K, (Iterable[V], Iterable[W]))]    
PairRDDFunctions[K, V] cogroup [W] other: RDD[(K, W)]
partitioner: Partitioner
RDD[(K, (Iterable[V], Iterable[W]))]    
PairRDDFunctions[K, V] cogroup [W1, W2] other1: RDD[(K, W1)]
other2: RDD[(K, W2)]
RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] 同じキー同士で値をまとめる。
要素はタプル(K, V)である必要がある。
×    
PairRDDFunctions[K, V] cogroup [W1, W2] other1: RDD[(K, W1)]
other2: RDD[(K, W2)]
numPartitions: Int
RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]    
PairRDDFunctions[K, V] cogroup [W1, W2] other1: RDD[(K, W1)]
other2: RDD[(K, W2)]
partitioner: Partitioner
RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]    
PairRDDFunctions[K, V] groupWith [W] other: RDD[(K, W)] RDD[(K, (Iterable[V], Iterable[W]))] cogroupと同じ。 ×    
PairRDDFunctions[K, V] groupWith [W1, W2] other1: RDD[(K, W1)]
other2: RDD[(K, W2)]
RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] cogroupと同じ。 ×    

アクション

ジョブを実行して値を返す処理。(JavaのStreamの「終端処理」に当たる)

クラス メソッド 備考 出力順
名称 引数 戻り型 コーディング 結果
RDD[T] foreach   f: T => Unit Unit 要素を1回ずつ処理する。
この処理はexecutorで行われるはず。
出力なし val rdd = sc.makeRDD(Seq("a", "bc", "def"))
val len = sc.accumulator(0)
rdd.foreach(len += _.length)
println(len.value)
6
RDD[T] foreachPartition   f: Iterator[T] => Unit Unit パーティションを1回ずつ処理する。 出力なし val rdd = sc.makeRDD(11 to 20)
val counter = sc.accumulator(0)
rdd.foreachPartition(counter += _.size)
println(counter.value)
10
RDD[T] collect     Array[T] 要素を配列に入れて返す。
ScalaのコレクションのtoArray相当。
→Scalaのコレクションのcollectに相当するのは、filterとmapを実行するcollect
保持 val rdd = sc.makeRDD(Seq("a", "b", "c"))
val array = rdd.collect()
Array(a, b, c)
PairRDDFunctions[K, V] collectAsMap     Map[K, V] 要素をMapに入れて返す。
要素はタプル(K, V)である必要がある。
× val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3))
val map = rdd.collectAsMap()
Map(b -> 2, a -> 1, c -> 3)
RDD[T] toLocalIterator     Iterator[T] ローカル(driver上)で動くIteratorを返す。
このIteratorを使うときにジョブが動く。
保持 val rdd = sc.makeRDD(Seq("a", "b", "c"))
val i = rdd.toLocalIterator
i.foreach(println)
a
b
c
RDD[T] toArray     Array[T] 要素を配列に入れて返す。
1.0.0で非推奨。→collect
保持 val rdd = sc.makeRDD(Seq("a", "b", "c"))
val array = rdd.toArray()
Array(a, b, c)
RDD[T] reduce   f: (T, T) => T T 要素同士の演算を行う。
Scalaのコレクションのreduce相当。
- val rdd = sc.makeRDD(Seq(1, 2, 3))
val sum = rdd.reduce(_ + _)
6
PairRDDFunctions[K, V] reduceByKeyLocally   func: (V, V) => V Map[K, V] キー毎に値同士の演算を行う。

値の処理順も不定。

× val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3, "a"->4))
val map = rdd.reduceByKeyLocally(_ + _)
Map(a -> 5, b -> 2, c -> 3)
PairRDDFunctions[K, V] reduceByKeyToDriver   func: (V, V) => V Map[K, V] 1.0.0で非推奨。→reduceByKeyLocally      
RDD[T] fold   zeroValue: T
op: (T, T) => T
T 畳み込みを行う。
Scalaのコレクションのfold相当。
- val rdd = sc.makeRDD(Seq(1, 2, 3))
val sum = rdd.fold(0)(_ + _)
6
RDD[T] aggregate [U] zeroValue: U
seqOp: (U, T) => U
combOp: (U, U) => U
U 集約を行う。
Scalaのコレクションのaggregate相当。
各パーティション内の要素をseqOpで集約し、それらの結果をcombOpで集約する。
- val rdd = sc.makeRDD(Seq(1, 2, 3))
val sum = rdd.aggregate(BigDecimal(0))((d, n) => d + n, (d1, d2) => d1 + d2)
BigDecimal(6)
RDD[T] count     Long 要素数を返す。
Scalaのコレクションのsize相当。
- val rdd = sc.makeRDD(Seq("a", "b", "c"))
val size = rdd.count
3
RDD[T] countApprox   timeout: Long
confidence: Double = 0.95
PartialResult[BoundedDouble] タイムアウト付きcount - val rdd = sc.makeRDD(Seq("a", "b", "c"))
val size = rdd.countApprox(1000)
(final: [3.000, 3.000])
PairRDDFunctions[K, V] countByKey     Map[K, Long] キー毎に値の個数を返す。
要素はタプル(K, V)である必要がある。
× val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3, "a"->4))
val map = rdd.countByKey()
Map(b -> 1, a -> 2, c -> 1)
PairRDDFunctions[K, V] countByKeyApprox   timeout: Long
confidence: Double = 0.95
PartialResult[Map[K, BoundedDouble]] タイムアウト付きcountByKey × val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3, "a"->4))
val countMap = rdd.countByKeyApprox(1000)
(final: Map(a -> [2.000, 2.000], b -> [1.000, 1.000], c -> [1.000, 1.000]))
RDD[T] countByValue     Map[T, Long] 要素の種類毎の個数を返す。 × val rdd = sc.makeRDD(Seq("a", "b", "c", "b", "a", "b"))
val countMap = rdd.countByValue()
Map(b -> 3, a -> 2, c -> 1)
RDD[T] countByValueApprox   timeout: Long
confidence: Double = 0.95
PartialResult[Map[T, BoundedDouble]] タイムアウト付きcountByValue × val rdd = sc.makeRDD(Seq("a", "b", "c", "b", "a", "b"))
val countMap = rdd.countByValueApprox(1000)
(final: Map(a -> [2.000, 2.000], b -> [3.000, 3.000], c -> [1.000, 1.000]))
RDD[T] countApproxDistinct   relativeSD: Double = 0.05 Long 要素の種類数を返す。 - val rdd = sc.makeRDD(Seq("a", "b", "c", "b", "a", "b"))
val countMap = rdd.countApproxDistinct()
3
RDD[T] take   num: Int Array[T] 先頭num個の要素を配列に入れて返す。 保持 val rdd = sc.makeRDD(Seq(3, 2, 5, 1, 4))
val t = rdd.take(3)
Array(3, 2, 5)
RDD[T] first     T 先頭の要素を返す。
take(1)と同等だが、要素が1個も無い場合はUnsupportedOperationExceptionが発生する。
保持 val rdd = sc.makeRDD(Seq(3, 2, 5, 1, 4))
val element = rdd.first()
3
RDD[T] top   num: Int Array[T] 値が大きい方から順番にnum個の要素を返す。 - val rdd = sc.makeRDD(Seq(3, 2, 5, 1, 4))
val top = rdd.top(3)
Array(5, 4, 3)
RDD[T] takeOrdered   num: Int Array[T] 値が小さい方から順番にnum個の要素を返す。 - val rdd = sc.makeRDD(Seq(3, 2, 5, 1, 4))
val top = rdd.takeOrdered(3)
Array(1, 2, 3)
RDD[T] max     T 最大の値を返す。 - val rdd = sc.makeRDD(Seq(3, 2, 5, 1, 4))
val element = rdd.max()
5
RDD[T] min     T 最小の値を返す。 - val rdd = sc.makeRDD(Seq(3, 2, 5, 1, 4))
val element = rdd.min()
1
RDD[T] takeSample   withReplacement: Boolean
num: Int
seed: Long = Utils.random.nextLong
Array[T] 指定された個数の要素をランダムに返す。
withReplacementにtrueを指定すると、同じ値が何度も返ることがある。
sample
× val rdd = sc.makeRDD(1 to 10)
val result = rdd.takeSample(false, 3)
Array(6, 1, 5)
PairRDDFunctions[K, V] lookup   key: K Seq[V] キーを指定して値を取得する。 保持 val rdd = sc.makeRDD(Seq("a"->1, "b"->2, "c"->3, "a"->4))
val s = rdd.lookup("a")
WrappedArray(1, 4)
RDD[T] saveAsTextFile   path: String Unit テキストファイルとして保存する。
指定するパスはディレクトリー相当。
その下に各executorがそれぞれファイルを格納する。
(つまり、基本的にHDFS等の分散ファイルシステムのパスを指定する)
ディレクトリーが既に存在している場合はFileAlreadyExistsExceptionが発生する。
  rdd.saveAsTextFile("/user/hishidama/spark1")  
RDD[T] saveAsTextFile   path: String
codec: Class[_ <: CompressionCodec]
Unit 圧縮されたテキストファイルとして保存する。
Codecにはorg.apache.hadoop.io.compressパッケージのCompressionCodecの具象クラスを指定する。
  rdd.saveAsTextFile("/user/hishidama/spark2", classOf[parquet.hadoop.codec.SnappyCodec])  
SequenceFileRDDFunctions[K, V] saveAsSequenceFile   path: String
codec: Option[Class[_ <: CompressionCodec]] = None
Unit HadoopのSequenceFileとして保存する。[2014-09-15]      
RDD[T] saveAsObjectFile   path: String Unit キーがNullWritable、値がByteWritableであるHadoopのSequenceFileとして保存する。   rdd.saveAsObjectFile("/user/hishidama/spark3")  

永続化(キャッシュ)処理

クラス メソッド 備考
名称 引数 戻り型 コーディング 結果
RDD[T] persist   newLevel: StorageLevel  =  StorageLevel.MEMORY_ONLY 自分自身 RDDを永続化する。
一度StorageLevelを設定したら、unpersistするまで、変更することは出来ない。
(「永続化」と聞くとアプリケーションが終了してもデータが残るイメージだが、ここでは、アプリ実行中だけの話だと思われる)
チェックポイントとの違い
rdd.persist()
rdd.persist(StorageLevel.MEMORY_ONLY)
 
RDD[T] cache     自分自身 persistと同じ。 rdd.cache()  
RDD[T] unpersist   blocking: Boolean = true 自分自身 永続化を解除する。 rdd.unpersist(true)  
RDD[T] getStorageLevel     StorageLevel   rdd.getStorageLevel  
RDD[T] checkpoint       チェックポイントを作る。
SparkContextでチェックポイントのディレクトリー(HDFS)を指定しておく必要がある。
永続化との違い
sc.setCheckpointDir("/tmp/spark-checkpoint")
val rdd = rdd1.〜
rdd.checkpoint()
 
RDD[T] isCheckpointed     Boolean チェックポイント化し、アクションを実行した後はtrueになる。 rdd.isCheckpointed true
RDD[T] getCheckpointFile     Option[String] チェックポイントファイルの場所を返す。 rdd.getCheckpointFile Some(file:/tmp/spark-checkpoint/2a24bd25-29b9-449e-9d0c-5249b5af9974/rdd-464)

その他

クラス メソッド 備考
名称 引数 戻り型 コーディング 結果
RDD[T] dependencies     Seq[Dependency[_]] 依存しているRDD一覧を返すっぽい。 rdd.dependencies List(org.apache.spark.OneToOneDependency@d2c3a77)
RDD[T] partitions       パーティションの一覧を返す。 val size = rdd.partitions.size 8
RDD[T] preferredLocations   split: Partition Seq[String] データの置かれている場所を返す。
(データのローカリティー)
(例えば、HDFSのファイルのブロックが置かれている場所)
   

永続化とチェックポイント

永続化キャッシュ)でもチェックポイントでもデータをファイルに保存することが出来るので、似ているように感じられる。[2014-09-14]
しかし、永続化の保存先は自分のノード上であり、チェックポイントの保存先はHDFS等の分散ファイルシステム上である。

永続化のデフォルトの保存先はメモリー上だが、ディスクに保存するよう指定することが出来る。
チェックポイントはディレクトリーのパスを指定するが、これはHDFS(等の分散ファイルシステム)のパスである。
したがって、永続化によって保存されたデータは自分のノードだけでしか見られない(2箇所に保存するオプションだと、もう1つ別のノードにも保存されるかも?)が、
チェックポイントで保存されたデータは全ノードから参照することが出来る。

※「永続化」と聞くと、DBの永続化をイメージして「アプリケーションが終了してもデータが残り続ける」ような気がするが、Sparkにおける永続化は「アプリケーション実行中は残り続ける」という状態。


Spark目次へ戻る / Scalaへ戻る / 技術メモへ戻る
メールの送信先:ひしだま