S-JIS[2014-08-23/2017-07-26] 変更履歴
Apache Sparkで主に使うRDDについて。
|
|
|
RDD(Resilient Distributed Dataset)は、不変で分散処理可能なコレクション。
最初のRDDはSparkContextを使って生成する。
Scalaのコレクション(Seq)から作る方法と、ファイル等から読み込んで作る方法がある。
RDDクラスにはfilterやmapといったScalaのコレクションと同様のメソッドが用意されている。
他にも特定用途に特化したRDDクラスがあり、暗黙変換によって意識せずに変換できる。
この暗黙変換メソッドはSparkContextオブジェクトに定義されているので、一緒にインポートしておくのが良い。
import org.apache.spark._ import org.apache.spark.SparkContext._
RDDクラス自身はorg.apache.spark.rddパッケージだが、明示的にクラス名を記述するとき以外はインポートしておく必要は無い。
クラス | RDDとしての定義 | 説明 |
---|---|---|
RDD[T] |
基本的なクラス。 | |
PairRDDFunctions[K, V] |
RDD[(K, V)] |
キーと値のペアのRDD。キーによる結合を行いたい場合に使う。 |
SchemaRDD |
RDD[Row] |
Spark SQLで使用するRDD。[2014-08-25] |
RDDの不変(イミュータブル)とは、“依存RDD(系譜)”と“処理内容”が変わらないという意味であり、「“処理した結果のデータ(値)”が常に変わらない」という意味ではない。[2014-09-08]
「一度出来上がったデータ(値)は、正常な手段では変更されない」という意味では、データが不変と言えなくもないが。
Sparkでは、障害が起きてRDDの一部が消失すると、入力元の(依存している)RDDを使って再処理してデータを作り直す。
基本的には、依存RDDのデータ(値)が障害前と同じであれば、再処理した結果のデータも障害前と同じになる。
しかし、障害前と同じにならないケースもある。
破損していない入力データが見つかるまで再帰的に依存RDDを辿っていくことになるが、一番先頭まで辿り着いてしまうと、HDFS等の外部システムからデータを再度読み込むことになる。
もしそのデータの内容が変わってしまっていたら、再処理したRDDの内容は障害前と変わってしまう。
また、RDDの処理の中でタイムスタンプを使ったり乱数を使ったりしていると、処理結果は当然毎回変わってしまう。
さすがにSparkはプログラマーが記述した処理の内容まで把握して毎回同じ結果を出すようにすることは出来ない。
そういったケースでも本当にデータを不変にしたいのであれば、要所々々で(信頼できる)外部ストレージにデータを保存(永続化)するようプログラミングする必要がある。
(「信頼できる」というのは、耐障害性のある、といった意味)
Sparkでは、Sparkクラスターの各スレーブノード上でexecutor(JavaVM)が動き、その中で複数のタスクを(マルチスレッドで)並列に処理するので、スレッドセーフになるようにプログラミングする必要がある。[2017-07-26]
ただし、RDDのmapやfilter等に渡す関数(関数オブジェクト)はexecutorに渡す際にシリアライズされ、executorの各タスクで別々にデシリアライズされる。
つまりタスク毎に別インスタンスになるので、関数の中のインスタンスが共有されることはない。
(例えば関数の外でインスタンス化したSimpleDateFormat(SimpleDateFormatはスレッドセーフではない)を関数の中で使ってもよい)
(むしろ複数タスク間で共有したい場合は共有変数を使う必要がある)
参考: maropuさんのツイート
新しいRDDを返す処理。(JavaのStreamの「中間処理」に当たる)
これらのメソッドは新しいRDDを返すだけで、実際の処理は行わない(実行されない)。
アクションのメソッドを実行すると実際の処理が行われる。
ほとんどの処理では、処理結果(出力)の並び順は入力データの並び順が保持される。[2014-09-07]
パーティション数やParitionerを指定するメソッドでは、基本的に並び順は保持されない(Paritionerに応じて順序が入れ替わる)。
クラス | メソッド | 備考 | 出力順 | 例 | ||||
---|---|---|---|---|---|---|---|---|
名称 | 型 | 引数 | 戻り型 | コーディング | 結果 | |||
RDD[T] | map |
[U] |
f: T => U |
RDD[U] |
要素を別の型に変換する。 | 保持 | val rdd = sc.makeRDD(Seq(1,
2, 3)) |
a1, a2, a3 |
RDD[T] | flatMap |
[U] |
f: T => TraversableOnce[U] |
RDD[U] |
要素を別の型(要素数は複数可)に変換する。 | 保持 | val rdd = sc.makeRDD(Seq(0,
1, 2)) |
0, 65, 1, 66, 2, 67 |
RDD[T] | filter |
f: T => Boolean |
RDD[T] |
条件を満たす要素だけ抽出する。 | 保持 | val rdd = sc.makeRDD(1
to 5) |
1, 3, 5 |
|
RDD[T] | collect |
[U] |
f: PartialFunction[T,
U] |
RDD[U] |
filterとmapを合わせたようなもの。 caseにマッチした結果だけでコレクションが作られる。 Scalaのコレクションのcollect相当。 →要素を収集して配列を返すcollect |
保持 | val rdd = sc.makeRDD(Seq(1,
2, 3)) |
one, two |
RDD[T] | distinct |
numPartitions: Int =
partitions.size |
RDD[T] |
重複を排除する。 | × | val rdd = sc.makeRDD(Seq(1,
2, 3, 2, 1)) |
1, 2, 3 |
|
RDD[T] | repartition |
numPartitions: Int |
RDD[T] |
パーティション数を変更する。 →使用例 shuffleをtrueにしたcoalesceと同じ。 |
× | rdd.repartition(100) |
||
RDD[T] | coalesce |
numPartitions: Int |
RDD[T] |
パーティション数を変更する。 →使用例 shuffleがtrueの場合、並び順は保持されない。 shuffleがfalseの場合、パーティション数を減らすことは出来るが増やすことは出来ない(多くしようとしても変わらない)。 |
引数次第 | rdd.coalesce(100) |
||
RDD[T] | sample |
withReplacement: Boolean |
RDD[T] |
ランダムに値を抽出する。 withReplacementにtrueを指定すると、同じ値が何度も返ることがある。 →takeSample |
保持 | val rdd = sc.makeRDD(1
to 10) |
5, 7, 9, 10 |
|
RDD[T] | randomSplit |
weights: Array[Double] |
Array[RDD[T]] |
指定した割合でランダムに分割する。 | 保持 | val rdd = sc.makeRDD(1
to 10) |
1, 5, 7, 8 |
|
RDD[T] | union |
other: RDD[T] |
RDD[T] |
他のRDDと結合する。 パーティションは元のパーティションとotherのパーティションの単純な合計になる。(パーティション数が単純に増える) →SparkContext.union |
保持 | val rdd1 = sc.makeRDD(Seq(1,
2, 3)) |
1, 2, 3, 3, 4 |
|
RDD[T] | ++ |
other: RDD[T] |
RDD[T] |
unionと同じ。 | 保持 | val rdd1 = sc.makeRDD(Seq(1,
2, 3)) |
1, 2, 3, 3, 4 |
|
RDD[T] | glom |
RDD[Array[T]] |
各パーティションの内容を配列化したRDDを生成する。 | 保持 | val rdd = sc.makeRDD(1
to 10).coalesce(3) |
Array(1, 2) |
||
RDD[T] | keyBy |
[K] |
f: T => K |
RDD[(K, T)] |
キーを生成して、“キーと要素のペア”のRDDを返す。 | 保持 | val rdd = sc.makeRDD(Seq("a", "bc", "d", "e")) |
(1,a), (2,bc), (1,d), (1,e) |
OrderedRDDFunctions[K, V, P] | sortByKey |
ascending: Boolean = true |
RDD[P] |
キーでソートする。[2014-09-15] 要素はタプル(K, V)でKはソート可能である必要がある。 |
ソート | val rdd = sc.makeRDD(Seq(3
-> "a", 1 -> "b", 2 -> "c")) |
(1,b), (2,c), (3,a) |
|
RDD[T] | groupBy |
[K] |
f: T => K |
RDD[(K, Iterable[T])] |
キーを生成してグルーピングする。 元のRDDにParitionerが定義されている場合はそれを使う。 定義されていない場合はHashPartitionerを使う。 |
× | val rdd = sc.makeRDD(1
to 5) |
(0,ArrayBuffer(2, 4)), (1,ArrayBuffer(1, 3, 5)) |
RDD[T] | groupBy |
[K] |
f: T => K |
RDD[(K, Iterable[T])] |
キーを生成してグルーピングする。groupBy(f, new HashPartitioner(numPartitions)) と同じ。 |
× | val rdd = sc.makeRDD(1
to 5) |
(0,ArrayBuffer(2, 4)) |
RDD[T] | groupBy |
[K] |
f: T => K |
RDD[(K, Iterable[T])] |
キーを生成してグルーピングする。 HashPartitionerが指定された場合、キーの並び順は保証されない。 |
|||
PairRDDFunctions[K, V] | groupByKey |
numPartitions: Int |
RDD[(K, Iterable[V])] |
キー毎に値をグルーピングする。 要素はタプル(K, V)である必要がある。 groupByKey(new HashPartitioner(numPartitions)) と同じ。 |
× | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3, "a"->4)) |
(b,ArrayBuffer(2)), |
|
PairRDDFunctions[K, V] | groupByKey |
partitioner: Partitioner |
RDD[(K, Iterable[V])] |
キー毎に値をグルーピングする。 HashPartitionerが指定された場合、キーの並び順は保証されない。 |
||||
PairRDDFunctions[K, V] | partitionBy |
partitioner: Partitioner |
RDD[(K, V)] |
partitionerで分割されたRDDを返す。 | × | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3, "a"->4)) |
(c,3), (a,1), (a,4), (b,2) |
|
RDD[T] | pipe |
command: String |
RDD[String] |
シェルのコマンド(外部プロセス)を実行する。 データは標準入力に渡され、標準出力に出力された内容がRDDの内容になる。 commandにコマンドを書く。引数もスペース区切りで入れられる。 envには環境変数を渡せる。 |
保持 | val rdd = sc.makeRDD(Seq("a",
"b", "c")) |
a, b, c |
|
RDD[T] | pipe |
command: String |
RDD[String] |
val rdd = sc.makeRDD(Seq("A",
"B", "C")) |
sA, sB, sC |
|||
RDD[T] | pipe |
command: Seq[String] |
RDD[String] |
val rdd = sc.makeRDD(Seq("A",
"B", "C")) |
pA, pB, pC |
|||
PairRDDFunctions[K, V] | keys |
RDD[K] |
要素のタプル(K, V)からキーのみに変換する。 | 保持 | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3, "a"->2)) |
a, b, c, a |
||
PairRDDFunctions[K, V] | values |
RDD[V] |
要素のタプル(K, V)から値のみに変換する。 | 保持 | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3, "a"->2)) |
1, 2, 3, 2 |
||
PairRDDFunctions[K, V] | mapValues |
[U] |
f: V => U |
RDD[(K, U)] |
値を別の型に変換する。 要素はタプル(K, V)である必要がある。 |
保持 | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3)) |
(a,a1), (b,a2), (c,a3) |
PairRDDFunctions[K, V] | flatMapValues |
[U] |
f: V => TraversableOnce[U] |
RDD[(K, U)] |
値を別の型(個数は複数可)に変換する。 | 保持 | val rdd = sc.makeRDD(Seq("a"->0,
"b"->1, "c"->2)) |
(a,0), (a,65), (b,1), (b,66), (c,2), (c,67) |
RDD[T] | mapPartitions |
[U] |
f: Iterator[T] => Iterator[U] |
RDD[U] |
パーティションを別の型のパーティションに変換する。 →map |
保持 | val rdd = sc.makeRDD(Seq(1,
2, 3)) |
a, aa, aaa |
RDD[T] |
mapPartitionsWithIndex |
[U] |
f: (Int, Iterator[T]) => Iterator[U] |
RDD[U] |
パーティション番号付きのmapPartitions | 保持 | val rdd = sc.makeRDD(Seq("a",
"b", "c", "d", "e", "f", "g")).coalesce(3) |
a0, b1, c1, d1, e2, f2, g2 |
RDD[T] | zipWithIndex |
RDD[(T, Long)] |
要素と位置(インデックス)のペアを生成する。 ScalaのコレクションのzipWithIndex相当。 |
保持 | val rdd = sc.makeRDD(Seq("a",
"b", "c")) |
(a,0), (b,1), (c,2) |
||
RDD[T] | zipWithUniqueId |
RDD[(T, Long)] |
要素と“重複しない番号”のペアを生成する。 この番号は必ず一意(ユニーク)になるが、連番になるとは限らない。 |
保持 | val rdd = sc.makeRDD(Seq("a",
"b", "c")) |
(a,2), (b,5), (c,7) |
||
PairRDDFunctions[K, V] | combineByKey |
createCombiner: V => C |
RDD[(K, C)] |
キー毎に値を結合する。 要素はタプル(K, V)である必要がある。 デフォルトではHashPartitionerが使われるので、キーの並び順は保証されない。 |
× | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3, "a"->4)) |
(a,5), (b,2), (c,3) |
|
PairRDDFunctions[K, V] | combineByKey |
[C] |
createCombiner: V => C |
RDD[(K, C)] |
||||
PairRDDFunctions[K, V] | combineByKey |
[C] |
createCombiner: V => C |
RDD[(K, C)] |
||||
PairRDDFunctions[K, V] | foldByKey |
zeroValue: V |
RDD[(K, V)] |
キー毎に値を畳み込む。 →fold デフォルトではHashPartitionerが使われるので、キーの並び順は保証されない。 |
× | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3, "a"->4)) |
(a,5), (b,2), (c,3) |
|
PairRDDFunctions[K, V] | foldByKey |
zeroValue: V |
RDD[(K, V)] |
|||||
PairRDDFunctions[K, V] | foldByKey |
zeroValue: V |
RDD[(K, V)] |
|||||
PairRDDFunctions[K, V] | reduceByKey |
func: (V, V) => V |
RDD[(K, V)] |
キー毎に要素同士を演算する。reduceByKey(new HashPartitioner(numPartitions), func) と同じ。→reduce |
× | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3, "a"->4)) |
(a,5), (b,2), (c,3) |
|
PairRDDFunctions[K, V] | reduceByKey |
partitioner: Partitioner |
RDD[(K, V)] |
キー毎に要素同士を演算する。combineByKey(v=>v, func, func, partitioner) と同じ。 |
× | |||
PairRDDFunctions[K, V] |
countApproxDistinctByKey |
relativeSD: Double = 0.05 |
RDD[(K, Long)] |
キー毎に値の種類数を返す。 →countApproxDistinct |
× | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3, "a"->4, "a"->1)) countApproxDistinctByKey () |
(a,2), (b,1), (c,1) |
|
PairRDDFunctions[K, V] | countApproxDistinctByKey |
relativeSD: Double |
RDD[(K, Long)] |
|||||
PairRDDFunctions[K, V] | countApproxDistinctByKey |
relativeSD: Double |
RDD[(K, Long)] |
複数のRDDを結合して新しいRDDを生成する処理。
クラス | メソッド | 備考 | 出力順 | 例 | ||||
---|---|---|---|---|---|---|---|---|
名称 | 型 | 引数 | 戻り型 | コーディング | 結果 | |||
RDD[T] | zip |
[U] |
other: RDD[U] |
RDD[(T, U)] |
要素をペアにする。 Scalaのコレクションのzip相当。 双方のRDDのパーティション数は同じでなければならない。 また、互いのパーティションの要素数も同じでなければならない。 (例えば、片方のRDDが他方からmapによって作られた場合はこれらの条件を満たす) |
保持 | val rdd1 = sc.makeRDD(Seq("a",
"b", "c")).coalesce(2) |
(a,1), (b,2), (c,3) |
RDD[T] | zipPartitions |
[B, V] |
rdd2: RDD[B] |
RDD[V] |
パーティションをペアにする。 双方のRDDのパーティション数は同じでなければならない。 (パーティション毎の要素数は何でもよい) |
保持 | val rdd1 = sc.makeRDD(Seq("a",
"b", "c", "d")).coalesce(2) |
a1, b0, c2, d0 |
RDD[T] | zipPartitions |
[B, V] |
rdd2: RDD[B] |
RDD[V] |
val rdd1 = sc.makeRDD(Seq("a",
"b", "c", "d")).coalesce(2) |
a1, b0, c2, d0 |
||
RDD[T] | zipPartitions |
[B, C, V] |
rdd2: RDD[B] |
RDD[V] |
||||
RDD[T] | zipPartitions |
[B, C, V] |
rdd2: RDD[B] |
RDD[V] |
||||
RDD[T] | zipPartitions |
[B, C, D, V] |
rdd2: RDD[B] |
RDD[V] |
||||
RDD[T] | zipPartitions |
[B, C, D, V] |
rdd2: RDD[B] |
RDD[V] |
||||
RDD[T] | cartesian |
[U] |
other: RDD[U] |
RDD[(T, U)] |
他のRDDとのデカルト積(cartesian product)(直積)を生成する。 パーティション数も自分のRDDのパーティション数とotherのパーティション数の積になる。 |
保持 | val rdd1 = sc.makeRDD(Seq("a",
"b", "c")) |
(a,1), (a,2), (b,1), (b,2), (c,1), (c,2) |
RDD[T] | intersection |
other: RDD[T] |
RDD[T] |
他のRDDと共通の値を抽出する。 | × | val rdd1 = sc.makeRDD(Seq(1,
2, 3)) |
2, 3 |
|
RDD[T] | intersection |
other: RDD[T], |
RDD[T] |
|||||
RDD[T] | intersection |
other: RDD[T] |
RDD[T] |
|||||
RDD[T] | subtract |
other: RDD[T] |
RDD[T] |
他のRDDとの差集合を生成する。 | × | val rdd1 = sc.makeRDD(Seq(1,
2, 3, 4)) |
1, 3 |
|
RDD[T] | subtract |
other: RDD[T] |
RDD[T] |
|||||
RDD[T] | subtract |
other: RDD[T] |
RDD[T] |
|||||
PairRDDFunctions[K, V] | subtractByKey |
[W] |
other: RDD[(K, W)] |
RDD[(K, V)] |
他のRDDとの差集合(キーで比較)を生成する。 要素はタプル(K, V)である必要がある。 |
× | val rdd1 = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3, "d"->4)) |
(a,1), (c,3) |
PairRDDFunctions[K, V] | subtractByKey |
[W] |
other: RDD[(K, W)] |
RDD[(K, V)] |
||||
PairRDDFunctions[K, V] | subtractByKey |
[W] |
other: RDD[(K, W)] |
RDD[(K, V)] |
||||
PairRDDFunctions[K, V] | join |
[W] |
other: RDD[(K, W)] |
RDD[(K, (V, W))] |
他のRDDを結合(inner join)する。 要素はタプル(K, V)である必要がある。 |
× | val rdd1 = sc.makeRDD(Seq("a"->1,
"b"->2, "a"->3, "b"->4)) |
(b,(2,foo)), (b,(4,foo)) |
PairRDDFunctions[K, V] | join |
[W] |
other: RDD[(K, W)] |
RDD[(K, (V, W))] |
||||
PairRDDFunctions[K, V] | join |
[W] |
other: RDD[(K, W)] |
RDD[(K, (V, W))] |
||||
PairRDDFunctions[K, V] | leftOuterJoin |
[W] |
other: RDD[(K, W)] |
RDD[(K, (V, Option[W]))] |
他のRDDを結合(left outer join)する。 要素はタプル(K, V)である必要がある。 |
× | val rdd1 = sc.makeRDD(Seq("a"->1,
"b"->2, "a"->3, "b"->4)) |
(a,(1,None)), (a,(3,None)), (b,(2,Some(foo))),
(b,(4,Some(foo))) |
PairRDDFunctions[K, V] | leftOuterJoin |
[W] |
other: RDD[(K, W)] |
RDD[(K, (V, Option[W]))] |
||||
PairRDDFunctions[K, V] | leftOuterJoin |
[W] |
other: RDD[(K, W)] |
RDD[(K, (V, Option[W]))] |
||||
PairRDDFunctions[K, V] | rightOuterJoin |
[W] |
other: RDD[(K, W)] |
RDD[(K, (Option[V], W))] |
他のRDDを結合(right outer join)する。 要素はタプル(K, V)である必要がある。 |
× | val rdd1 = sc.makeRDD(Seq("a"->1,
"b"->2, "a"->3, "b"->4)) |
(b,(Some(2),foo)), (b,(Some(4),foo)),
(c,(None,bar)) |
PairRDDFunctions[K, V] | rightOuterJoin |
[W] |
other: RDD[(K, W)] |
RDD[(K, (Option[V], W))] |
||||
PairRDDFunctions[K, V] | rightOuterJoin |
[W] |
other: RDD[(K, W)] |
RDD[(K, (Option[V], W))] |
||||
PairRDDFunctions[K, V] | cogroup |
[W] |
other: RDD[(K, W)] |
RDD[(K, (Iterable[V], Iterable[W]))] |
同じキー同士で値をまとめる。 要素はタプル(K, V)である必要がある。 キー毎の値の並び順は入力データと同じようだ。 |
× | val rdd1 = sc.makeRDD(Seq("a"->1,
"b"->2, "a"->3)) |
(a,(ArrayBuffer(1, 3),ArrayBuffer())), |
PairRDDFunctions[K, V] | cogroup |
[W] |
other: RDD[(K, W)] |
RDD[(K, (Iterable[V], Iterable[W]))] |
||||
PairRDDFunctions[K, V] | cogroup |
[W] |
other: RDD[(K, W)] |
RDD[(K, (Iterable[V], Iterable[W]))] |
||||
PairRDDFunctions[K, V] | cogroup |
[W1, W2] |
other1: RDD[(K, W1)] |
RDD[(K, (Iterable[V], Iterable[W1],
Iterable[W2]))] |
同じキー同士で値をまとめる。 要素はタプル(K, V)である必要がある。 |
× | ||
PairRDDFunctions[K, V] | cogroup |
[W1, W2] |
other1: RDD[(K, W1)] |
RDD[(K, (Iterable[V], Iterable[W1],
Iterable[W2]))] |
||||
PairRDDFunctions[K, V] | cogroup |
[W1, W2] |
other1: RDD[(K, W1)] |
RDD[(K, (Iterable[V], Iterable[W1],
Iterable[W2]))] |
||||
PairRDDFunctions[K, V] | groupWith |
[W] |
other: RDD[(K, W)] |
RDD[(K, (Iterable[V], Iterable[W]))] |
cogroupと同じ。 | × | ||
PairRDDFunctions[K, V] | groupWith |
[W1, W2] |
other1: RDD[(K, W1)] |
RDD[(K, (Iterable[V], Iterable[W1],
Iterable[W2]))] |
cogroupと同じ。 | × |
ジョブを実行して値を返す処理。(JavaのStreamの「終端処理」に当たる)
クラス | メソッド | 備考 | 出力順 | 例 | ||||
---|---|---|---|---|---|---|---|---|
名称 | 型 | 引数 | 戻り型 | コーディング | 結果 | |||
RDD[T] | foreach |
f: T => Unit |
Unit |
要素を1回ずつ処理する。 この処理はexecutorで行われるはず。 |
出力なし | val rdd = sc.makeRDD(Seq("a",
"bc", "def")) |
6 |
|
RDD[T] | foreachPartition |
f: Iterator[T] => Unit |
Unit |
パーティションを1回ずつ処理する。 | 出力なし | val rdd = sc.makeRDD(11
to 20) |
10 |
|
RDD[T] | collect |
Array[T] |
要素を配列に入れて返す。 ScalaのコレクションのtoArray相当。 →Scalaのコレクションのcollectに相当するのは、filterとmapを実行するcollect |
保持 | val rdd = sc.makeRDD(Seq("a",
"b", "c")) |
Array(a, b, c) |
||
PairRDDFunctions[K, V] | collectAsMap |
Map[K, V] |
要素をMapに入れて返す。 要素はタプル(K, V)である必要がある。 |
× | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3)) |
Map(b -> 2, a -> 1, c -> 3) |
||
RDD[T] | toLocalIterator |
Iterator[T] |
ローカル(driver上)で動くIteratorを返す。 このIteratorを使うときにジョブが動く。 |
保持 | val rdd = sc.makeRDD(Seq("a",
"b", "c")) |
a |
||
RDD[T] | toArray |
Array[T] |
要素を配列に入れて返す。 1.0.0で非推奨。→collect |
保持 | val rdd = sc.makeRDD(Seq("a",
"b", "c")) |
Array(a, b, c) |
||
RDD[T] | reduce |
f: (T, T) => T |
T |
要素同士の演算を行う。 Scalaのコレクションのreduce相当。 |
- | val rdd = sc.makeRDD(Seq(1,
2, 3)) |
6 |
|
PairRDDFunctions[K, V] | reduceByKeyLocally |
func: (V, V) => V |
Map[K, V] |
キー毎に値同士の演算を行う。 値の処理順も不定。 |
× | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3, "a"->4)) |
Map(a -> 5, b -> 2, c -> 3) |
|
PairRDDFunctions[K, V] | reduceByKeyToDriver |
func: (V, V) => V |
Map[K, V] |
1.0.0で非推奨。→reduceByKeyLocally | ||||
RDD[T] | fold |
zeroValue: T |
T |
畳み込みを行う。 Scalaのコレクションのfold相当。 |
- | val rdd = sc.makeRDD(Seq(1,
2, 3)) |
6 |
|
RDD[T] | aggregate |
[U] |
zeroValue: U |
U |
集約を行う。 Scalaのコレクションのaggregate相当。 各パーティション内の要素をseqOpで集約し、それらの結果をcombOpで集約する。 |
- | val rdd = sc.makeRDD(Seq(1,
2, 3)) |
BigDecimal(6) |
RDD[T] | count |
Long |
要素数を返す。 Scalaのコレクションのsize相当。 |
- | val rdd = sc.makeRDD(Seq("a",
"b", "c")) |
3 |
||
RDD[T] | countApprox |
timeout: Long |
PartialResult[BoundedDouble] |
タイムアウト付きcount。 | - | val rdd = sc.makeRDD(Seq("a",
"b", "c")) |
(final: [3.000, 3.000]) |
|
PairRDDFunctions[K, V] | countByKey |
Map[K, Long] |
キー毎に値の個数を返す。 要素はタプル(K, V)である必要がある。 |
× | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3, "a"->4)) |
Map(b -> 1, a -> 2, c -> 1) |
||
PairRDDFunctions[K, V] | countByKeyApprox |
timeout: Long |
PartialResult[Map[K, BoundedDouble]] |
タイムアウト付きcountByKey。 | × | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3, "a"->4)) |
(final: Map(a -> [2.000, 2.000], b -> [1.000,
1.000], c -> [1.000, 1.000])) |
|
RDD[T] | countByValue |
Map[T, Long] |
要素の種類毎の個数を返す。 | × | val rdd = sc.makeRDD(Seq("a",
"b", "c", "b", "a", "b")) |
Map(b -> 3, a -> 2, c -> 1) |
||
RDD[T] | countByValueApprox |
timeout: Long |
PartialResult[Map[T, BoundedDouble]] |
タイムアウト付きcountByValue。 | × | val rdd = sc.makeRDD(Seq("a",
"b", "c", "b", "a", "b")) |
(final: Map(a -> [2.000, 2.000], b -> [3.000,
3.000], c -> [1.000, 1.000])) |
|
RDD[T] | countApproxDistinct |
relativeSD: Double = 0.05 |
Long |
要素の種類数を返す。 | - | val rdd = sc.makeRDD(Seq("a",
"b", "c", "b", "a", "b")) |
3 |
|
RDD[T] | take |
num: Int |
Array[T] |
先頭num個の要素を配列に入れて返す。 | 保持 | val rdd = sc.makeRDD(Seq(3,
2, 5, 1, 4)) |
Array(3, 2, 5) |
|
RDD[T] | first |
T |
先頭の要素を返す。 take(1)と同等だが、要素が1個も無い場合はUnsupportedOperationExceptionが発生する。 |
保持 | val rdd = sc.makeRDD(Seq(3,
2, 5, 1, 4)) |
3 |
||
RDD[T] | top |
num: Int |
Array[T] |
値が大きい方から順番にnum個の要素を返す。 | - | val rdd = sc.makeRDD(Seq(3,
2, 5, 1, 4)) |
Array(5, 4, 3) |
|
RDD[T] | takeOrdered |
num: Int |
Array[T] |
値が小さい方から順番にnum個の要素を返す。 | - | val rdd = sc.makeRDD(Seq(3,
2, 5, 1, 4)) |
Array(1, 2, 3) |
|
RDD[T] | max |
T |
最大の値を返す。 | - | val rdd = sc.makeRDD(Seq(3,
2, 5, 1, 4)) |
5 |
||
RDD[T] | min |
T |
最小の値を返す。 | - | val rdd = sc.makeRDD(Seq(3,
2, 5, 1, 4)) |
1 |
||
RDD[T] | takeSample |
withReplacement: Boolean |
Array[T] |
指定された個数の要素をランダムに返す。 withReplacementにtrueを指定すると、同じ値が何度も返ることがある。 →sample |
× | val rdd = sc.makeRDD(1
to 10) |
Array(6, 1, 5) |
|
PairRDDFunctions[K, V] | lookup |
key: K |
Seq[V] |
キーを指定して値を取得する。 | 保持 | val rdd = sc.makeRDD(Seq("a"->1,
"b"->2, "c"->3, "a"->4)) |
WrappedArray(1, 4) |
|
RDD[T] | saveAsTextFile |
path: String |
Unit |
テキストファイルとして保存する。 指定するパスはディレクトリー相当。 その下に各executorがそれぞれファイルを格納する。 (つまり、基本的にHDFS等の分散ファイルシステムのパスを指定する) ディレクトリーが既に存在している場合はFileAlreadyExistsExceptionが発生する。 |
rdd.saveAsTextFile("/user/hishidama/spark1") |
|||
RDD[T] | saveAsTextFile |
path: String |
Unit |
圧縮されたテキストファイルとして保存する。 Codecにはorg.apache.hadoop.io.compressパッケージのCompressionCodecの具象クラスを指定する。 |
rdd.saveAsTextFile("/user/hishidama/spark2",
classOf[parquet.hadoop.codec.SnappyCodec]) |
|||
SequenceFileRDDFunctions[K, V] | saveAsSequenceFile |
path: String |
Unit |
HadoopのSequenceFileとして保存する。[2014-09-15] | ||||
RDD[T] | saveAsObjectFile |
path: String |
Unit |
キーがNullWritable、値がByteWritableであるHadoopのSequenceFileとして保存する。 | rdd.saveAsObjectFile("/user/hishidama/spark3") |
クラス | メソッド | 備考 | 例 | ||||
---|---|---|---|---|---|---|---|
名称 | 型 | 引数 | 戻り型 | コーディング | 結果 | ||
RDD[T] | persist |
newLevel: StorageLevel =
StorageLevel.MEMORY_ONLY |
自分自身 |
RDDを永続化する。 一度StorageLevelを設定したら、unpersistするまで、変更することは出来ない。 (「永続化」と聞くとアプリケーションが終了してもデータが残るイメージだが、ここでは、アプリ実行中だけの話だと思われる) →チェックポイントとの違い |
rdd.persist() |
||
RDD[T] | cache |
自分自身 |
persistと同じ。 | rdd.cache() |
|||
RDD[T] | unpersist |
blocking: Boolean = true |
自分自身 |
永続化を解除する。 | rdd.unpersist(true) |
||
RDD[T] | getStorageLevel |
StorageLevel |
rdd.getStorageLevel |
||||
RDD[T] | checkpoint |
チェックポイントを作る。 SparkContextでチェックポイントのディレクトリー(HDFS)を指定しておく必要がある。 →永続化との違い |
sc.setCheckpointDir("/tmp/spark-checkpoint") |
||||
RDD[T] | isCheckpointed |
Boolean |
チェックポイント化し、アクションを実行した後はtrueになる。 | rdd.isCheckpointed |
true |
||
RDD[T] | getCheckpointFile |
Option[String] |
チェックポイントファイルの場所を返す。 | rdd.getCheckpointFile |
Some(file:/tmp/spark-checkpoint/2a24bd25-29b9-449e-9d0c-5249b5af9974/rdd-464) |
クラス | メソッド | 備考 | 例 | ||||
---|---|---|---|---|---|---|---|
名称 | 型 | 引数 | 戻り型 | コーディング | 結果 | ||
RDD[T] | dependencies |
Seq[Dependency[_]] |
依存しているRDD一覧を返すっぽい。 | rdd.dependencies |
List(org.apache.spark.OneToOneDependency@d2c3a77) |
||
RDD[T] | partitions |
パーティションの一覧を返す。 | val size = rdd.partitions.size |
8 |
|||
RDD[T] | preferredLocations |
split: Partition |
Seq[String] |
データの置かれている場所を返す。 (データのローカリティー) (例えば、HDFSのファイルのブロックが置かれている場所) |
永続化(キャッシュ)でもチェックポイントでもデータをファイルに保存することが出来るので、似ているように感じられる。[2014-09-14]
しかし、永続化の保存先は自分のノード上であり、チェックポイントの保存先はHDFS等の分散ファイルシステム上である。
永続化のデフォルトの保存先はメモリー上だが、ディスクに保存するよう指定することが出来る。
チェックポイントはディレクトリーのパスを指定するが、これはHDFS(等の分散ファイルシステム)のパスである。
したがって、永続化によって保存されたデータは自分のノードだけでしか見られない(2箇所に保存するオプションだと、もう1つ別のノードにも保存されるかも?)が、
チェックポイントで保存されたデータは全ノードから参照することが出来る。
※「永続化」と聞くと、DBの永続化をイメージして「アプリケーションが終了してもデータが残り続ける」ような気がするが、Sparkにおける永続化は「アプリケーション実行中は残り続ける」という状態。