S-JIS[2017-01-16] 変更履歴
Apache SparkのDatasetで使用するKeyValueGroupedDatasetクラスについて。
|
|
DatasetのgroupByKeyメソッドで返ってくるのがKeyValueGroupedDatasetクラス。
つまり、キー毎に集められたデータを処理するにはKeyValueGroupedDatasetのメソッドを呼び出す。
KeyValueGroupedDatasetは「Dataset」という名前が付いているが、Datasetクラスを継承してはいないので、独自メソッドしか無い。
KeyValueGroupedDataset[K, V]の主なメソッド。
例として、以下のようなデータを使っている。
import org.apache.spark.sql.SparkSession
case class Person(name: String, age: Long)
val spark = SparkSession.builder().getOrCreate() import spark.implicits._ val ds0 = Seq( Person("hoge", 20), Person("hoge", 40), Person("hoge", 30), Person("foo", 20), Person("bar", 30) ).toDS() val kv = ds0.groupByKey(person => person.name) // KeyValueGroupedDatasetにはshowメソッドが無いので、Datasetに変換して表示 kv.mapGroups((k, i) => (k, i.toSeq)).show()
↓実行結果
+----+--------------------+ | _1| _2| +----+--------------------+ | bar| [[bar,30]]| | foo| [[foo,20]]| |hoge|[[hoge,20], [hoge...| +----+--------------------+
メソッド | ver | 説明 | 例 | 実行結果 |
---|---|---|---|---|
keyAs[L]: KeyValueGroupedDataset[L, V] |
1.6.0 | キーの型を変更する。(Datasetのasと同様) LはEncoderが対応している必要がある。(ケースクラスならOK) ケースクラスに変換する場合、スキーマ(のカラム名「value」およびデータ型)と一致している必要がある。 |
case class MyName(value: String) |
+------+--------------------+ |
keys: Dataset[K] |
1.6.0 | キーのDatasetを返す。 (カラム名は「value」になるので、変えた方が分かりやすそう) |
val ds = kv.keys.withColumnRenamed("value",
"name") |
+----+ |
mapValues[W](func: V => W): KeyValueGroupedDataset[K, W] |
2.1.0 | 値を変更する。 | val kv2 = kv.mapValues(person =>
person.age) |
+----+------------+ |
mapValues[W](func: MapFunction[V, W], encoder: Encoder[W]): KeyValueGroupedDataset[K, W] |
2.1.0 | Java8の関数インターフェース(ラムダ式)用。 | ||
mapGroups[U](f: (K, Iterator[V]) => U): Dataset[U] |
1.6.0 | 値を処理する。(1つの値を返す) →flatMapGroups →キー毎に重複排除する例 |
val ds = kv.mapGroups((k, i) => i.toSeq.sortBy(person
=> person.age).head) |
+----+---+ |
mapGroups[U](f: MapGroupsFunction[K, V, U], encoder: Encoder[U]): Dataset[U] |
1.6.0 | Java8の関数インターフェース(ラムダ式)用。 | ||
flatMapGroups[U](f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] |
1.6.0 | 値を処理する。(0個の値や複数の値を返せる) →mapGroups |
val ds = kv.flatMapGroups((k, i) => i.toSeq.sortBy(person
=> person.age).take(2)) |
+----+---+ |
flatMapGroups[U](f: FlatMapGroupsFunction[K, V, U], encoder: Encoder[U]): Dataset[U] |
1.6.0 | Java8の関数インターフェース(ラムダ式)用。 | ||
reduceGroups(f: (V, V) => V): Dataset[(K, V)] |
1.6.0 | 値を集約する。 返ってくるDatasetはタプルになっているので、Vがケースクラスの場合は_2だけ取り出すのが良さそう。 |
val ds = kv.reduceGroups((p, r) =>
Person(p.name, math.min(p.age, r.age))).map(_._2) |
+----+---+ |
reduceGroups(f: ReduceFunction[V]): Dataset[(K, V)] |
1.6.0 | Java8の関数インターフェース(ラムダ式)用。 | ||
agg[U1](col1:
TypedColumn[V, U1]): Dataset[(K, U1)] 引数4個版まで |
1.6.0 | カラムの集約を行う。 sql.functionsでColumnを取得でき、Columnに「 .as[型] 」を付けるとTypedColumnになる。→Datasetのagg |
import org.apache.spark.sql.functions.min |
+-----+---+ |
count(): Dataset[(K, Long)] |
1.6.0 | キー毎の件数を返す。 | val ds = kv.count() |
+-----+--------+ |
cogroup[U, R]( |
1.6.0 | 他のKeyValueGroupedDatasetと結合する。 | case class Master(name: String, age: Long) |
+----+---+ |
ogroup[U, R]( |
1.6.0 | Java8の関数インターフェース(ラムダ式)用。 |