S-JIS[2017-01-16] 変更履歴

KeyValueGroupedDataset

Apache SparkDatasetで使用するKeyValueGroupedDatasetクラスについて。


概要

DatasetgroupByKeyメソッドで返ってくるのがKeyValueGroupedDatasetクラス。
つまり、キー毎に集められたデータを処理するにはKeyValueGroupedDatasetのメソッドを呼び出す。

KeyValueGroupedDatasetは「Dataset」という名前が付いているが、Datasetクラスを継承してはいないので、独自メソッドしか無い。


KeyValueGroupedDatasetのメソッド

KeyValueGroupedDataset[K, V]の主なメソッド。


例として、以下のようなデータを使っている。

import org.apache.spark.sql.SparkSession
case class Person(name: String, age: Long)
    val spark = SparkSession.builder().getOrCreate()
    import spark.implicits._

    val ds0 = Seq(
      Person("hoge", 20), Person("hoge", 40), Person("hoge", 30),
      Person("foo", 20),
      Person("bar", 30)
    ).toDS()
    val kv = ds0.groupByKey(person => person.name)

    // KeyValueGroupedDatasetにはshowメソッドが無いので、Datasetに変換して表示
    kv.mapGroups((k, i) => (k, i.toSeq)).show()

↓実行結果

+----+--------------------+
|  _1|                  _2|
+----+--------------------+
| bar|          [[bar,30]]|
| foo|          [[foo,20]]|
|hoge|[[hoge,20], [hoge...|
+----+--------------------+
メソッド ver 説明 実行結果
keyAs[L]: KeyValueGroupedDataset[L, V] 1.6.0 キーの型を変更する。(Datasetのasと同様)
LはEncoderが対応している必要がある。(ケースクラスならOK)
ケースクラスに変換する場合、スキーマ(のカラム名「value」およびデータ型)と一致している必要がある。
case class MyName(value: String)
val kv2 = kv.keyAs[MyName]
kv2.mapGroups((k, i) => (k, i.toSeq)).show()
+------+--------------------+
|    _1|                  _2|
+------+--------------------+
| [bar]|          [[bar,30]]|
| [foo]|          [[foo,20]]|
|[hoge]|[[hoge,20], [hoge...|
+------+--------------------+
keys: Dataset[K] 1.6.0 キーのDatasetを返す。
(カラム名は「value」になるので、変えた方が分かりやすそう)
val ds = kv.keys.withColumnRenamed("value", "name")
ds.show()
+----+
|name|
+----+
| bar|
| foo|
|hoge|
+----+
mapValues[W](func: V => W): KeyValueGroupedDataset[K, W] 2.1.0 値を変更する。 val kv2 = kv.mapValues(person => person.age)
kv2.mapGroups((k, i) => (k, i.toSeq)).show()
+----+------------+
|  _1|          _2|
+----+------------+
| bar|        [30]|
| foo|        [20]|
|hoge|[20, 40, 30]|
+----+------------+
mapValues[W](func: MapFunction[V, W], encoder: Encoder[W]): KeyValueGroupedDataset[K, W] 2.1.0 Java8の関数インターフェースラムダ式)用。    
mapGroups[U](f: (K, Iterator[V]) => U): Dataset[U] 1.6.0 値を処理する。(1つの値を返す)
flatMapGroups
キー毎に重複排除する例
val ds = kv.mapGroups((k, i) => i.toSeq.sortBy(person => person.age).head)
ds.show()
+----+---+
|name|age|
+----+---+
| bar| 30|
| foo| 20|
|hoge| 20|
+----+---+
mapGroups[U](f: MapGroupsFunction[K, V, U], encoder: Encoder[U]): Dataset[U] 1.6.0 Java8の関数インターフェースラムダ式)用。    
flatMapGroups[U](f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] 1.6.0 値を処理する。(0個の値や複数の値を返せる)
mapGroups
val ds = kv.flatMapGroups((k, i) => i.toSeq.sortBy(person => person.age).take(2))
ds.show()
+----+---+
|name|age|
+----+---+
| bar| 30|
| foo| 20|
|hoge| 20|
|hoge| 30|
+----+---+
flatMapGroups[U](f: FlatMapGroupsFunction[K, V, U], encoder: Encoder[U]): Dataset[U] 1.6.0 Java8の関数インターフェースラムダ式)用。    
reduceGroups(f: (V, V) => V): Dataset[(K, V)] 1.6.0 値を集約する。
返ってくるDatasetはタプルになっているので、Vがケースクラスの場合は_2だけ取り出すのが良さそう。
val ds = kv.reduceGroups((p, r) => Person(p.name, math.min(p.age, r.age))).map(_._2)
ds.show()
+----+---+
|name|age|
+----+---+
| bar| 30|
| foo| 20|
|hoge| 20|
+----+---+
reduceGroups(f: ReduceFunction[V]): Dataset[(K, V)] 1.6.0 Java8の関数インターフェースラムダ式)用。    
agg[U1](col1: TypedColumn[V, U1]): Dataset[(K, U1)]
agg[U1, U2](col1: TypedColumn[V, U1], col2: TypedColumn[V, U2]): Dataset[(K, U1, U2)]

引数4個版まで
1.6.0 カラムの集約を行う。
sql.functionsでColumnを取得でき、Columnに「.as[型]」を付けるとTypedColumnになる。
→Datasetのagg
import org.apache.spark.sql.functions.min

val ds = kv.agg(min("age").alias("m").as[Long])
ds.show()
+-----+---+
|value|  m|
+-----+---+
|  bar| 30|
|  foo| 20|
| hoge| 20|
+-----+---+
count(): Dataset[(K, Long)] 1.6.0 キー毎の件数を返す。 val ds = kv.count()
ds.show()
+-----+--------+
|value|count(1)|
+-----+--------+
|  bar|       1|
|  foo|       1|
| hoge|       3|
+-----+--------+
cogroup[U, R](
  other: KeyValueGroupedDataset[K, U])(
  f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]
): Dataset[R]
1.6.0 他のKeyValueGroupedDatasetと結合する。 case class Master(name: String, age: Long)
val ku = Seq(Master("hoge", 25), Master("zzz", 10)).toDS().groupByKey(_.name)
val ds = kv.cogroup(ku) { (k, ps, ms) =>
  val minAge = ms.toSeq.headOption.map(_.age).getOrElse(0L)
  ps.filter(_.age >= minAge).toSeq.sortBy(_.age).headOption
}
ds.show()
+----+---+
|name|age|
+----+---+
| bar| 30|
| foo| 20|
|hoge| 30|
+----+---+
ogroup[U, R](
  other: KeyValueGroupedDataset[K, U],
  f: CoGroupFunction[K, V, U, R],
  encoder: Encoder[R]
): Dataset[R]
1.6.0 Java8の関数インターフェースラムダ式)用。    

Datasetへ戻る / Spark目次へ戻る / Scalaへ戻る / 技術メモへ戻る
メールの送信先:ひしだま