S-JIS[2014-09-02] 変更履歴

Spark Streaming

Apache SparkのStreamingのメモ。


概要

Spark Streamingは、流れてくるデータ(ストリーム)を処理する機能。

次々に流れてくるデータを(短い間隔で)繰り返しバッチ処理する。
大抵は結果をファイルシステム上に格納する。バッチ処理の都度書き込まれるので、ファイルが増えていくことになる。

短い間隔でバッチ処理を行う形なので、高スループット(単位時間当たりの処理能力が高い)であるが、レスポンス(応答時間)は遅くなる。


Spark Streamingを扱う場合、StreamingContextクラスを使う。
入力データはDStream(discretized stream、離散ストリーム)というクラスで扱う。
DStreamはRDDではないが、RDDと似たメソッドを持っている。(それらのメソッドを使って処理を記述することを「バッチ処理」と呼んでいるように思う)

import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
  val sc: SparkContext = 
  val batchDuration = Seconds(1)
  val ssc = new StreamingContext(sc, batchDuration)

StreamingContextインスタンスはSparkContextを使って生成する。
引数のbatchDurationには、入力のストリームデータを後続のバッチ処理に送る間隔(スライド間隔)を指定する。

処理(メソッド)によっては、ウィンドウ間隔を指定するものもある。

短いバッチ処理を繰り返し実行していくことになるが、バッチ1回分の対象データをRDDで保持する形になっているようだ。


Spark Streamingの例

TCPソケットからテキストを読み込んでファイルに出力する例。

import org.apache.spark.SparkContext
import org.apache.spark.streaming._
object StreamingExample {

  val sc = new SparkContext("local[2]", "streaming-example")
  val ssc = new StreamingContext(sc, Seconds(1))
  def main(args: Array[String]): Unit = {
    // バッチ処理
    val stream1 = ssc.socketTextStream("localhost", 56789) // DStream[String]
    val stream2 = stream1.flatMap(_.split(" "))
    stream2.saveAsTextFiles("/tmp/spark/streaming1");

    // ストリーミング処理を3秒間実行
    ssc.start()
    try {
      Thread.sleep(3 * 1000)
    } finally {
      ssc.stop(true) // SparkContextもstopする
    }
  }
}

ストリームの読み込みを開始(ssc.start())する前に、DStreamのアクション(処理内容)を記述しておく。
この例では「socketTextStreamでサーバーに接続してStringを受け取り、変換を行ってファイルに書き込む」という処理を記述している。
RDDを記述するのとは異なり、DStreamの場合は、この時点ではまだ実際の処理は行われない。
 いわば、バッチ処理の内容を事前に定義(宣言)しているような感じ)

DStreamの処理内容を記述したら、StreamingContextのstart()でストリーム読み込みを開始する。
これによってDStreamの処理が実際に行われる。
stop()で読み込みを終了する。


サーバー側のサンプルは以下のような感じ。(何の変哲も無いTCPサーバー)

import java.net.ServerSocket

import scala.concurrent.ops.spawn
object StramingExampleServer {

  def main(args: Array[String]): Unit = {
    val server = new ServerSocket(56789)

    while (true) {
      val socket = server.accept()

      spawn {
        val os = socket.getOutputStream()
        try {
          for (i <- 0 to 9) {
            os.write(s"[${i}]\n".getBytes("UTF-8"))
            os.write(s"${new java.util.Date()}\n".getBytes("UTF-8"))

            Thread.sleep(200)
          }
        } finally {
          os.close()
        }
      }
    }
  }
}

StreamingContext

StreamingContextは、Spark Streaming用の基本的なクラス。

import org.apache.spark.streaming.StreamingContext

StreamingContextのコンストラクター

コンストラクター 説明
this(
  sparkContext: SparkContext,
  batchDuration: Duration
)
  val sc = new SparkContext(〜)
val ssc = new StreamingContext(sc, Seconds(1))
this(
  conf: SparkConf,
  batchDuration: Duration
)
内部では、confからSparkContextを生成する。 val conf = new SparkConf()
val ssc = new StreamingContext(conf, Seconds(1))
this(
  master: String,
  appName: String,
  batchDuration: Duration,
  sparkHome: String = null,
  jars: Seq[String] = Nil,
  environment: Map[String, String] = Map()
)
内部では、引数からSparkContextを生成する。
引数はSparkConfと同様。
val ssc = new StreamingContext("local[2]", "streaming-example", Seconds(1))
this(
  path: String,
  hadoopConf: Configuration = new Configuration
)
チェックポイントファイルからStreamingContextを再作成する。
pathはチェックポイントファイルのパス。
 

引数のbatchDurationには、入力のストリームデータを後続のバッチ処理に送る間隔を指定する。
Durationクラス


StreamingContextには以下のようなメソッドがある。

DStream生成系

メソッド 説明
networkStream[T](
  receiver: Receiver[T]
): ReceiverInputDStream[T]
1.0.0で非推奨。→receiverStream  
receiverStream[T](
  receiver: Receiver[T]
): ReceiverInputDStream[T]
Receiverを入力とするDStreamを生成する。  
actorStream[T](
  props: Props,
  name: String,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
  supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
): ReceiverInputDStream[T]
AkkaのActor(Props)を入力とするDStreamを生成する。  
socketTextStream(
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
TCPサーバーに接続し、TCPソケットからテキストを入力するDStreamを生成する。 val stream = ssc.socketTextStream("localhost", 12345)
socketStream[T](
  hostname: String,
  port: Int,
  converter: (InputStream) => Iterator[T],
  storageLevel: StorageLevel
): ReceiverInputDStream[T]
TCPソケットを入力とするDStreamを生成する。  
rawSocketStream[T](
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[T]
TCPソケットを入力とするDStreamを生成する。
受け取ったデータがそのまま後続処理に渡される。
 
fileStream[K, V, F <: NewInputFormat[K, V]] (
  directory: String
): InputDStream[(K, V)]
ファイル(HDFS)を入力とするDStreamを生成する。
ピリオド「.」から始まるファイルは無視される。
NewInputFormatは、Hadoopの新APIのInputFormatのこと。
 
fileStream[K, V, F <: NewInputFormat[K, V]] (
  directory: String,
  filter: Path => Boolean,
  newFilesOnly: Boolean
): InputDStream[(K, V)]
ファイル(HDFS)を入力とするDStreamを生成する。
引数filterは、ファイル名(パス)のフィルター。
 
textFileStream(
  directory: String
): DStream[String]
ファイル(HDFS)をテキストとして読み込むDStreamを生成する。  
queueStream[T](
  queue: Queue[RDD[T]],
  oneAtATime: Boolean = true
): InputDStream[T]
RDDを入力とするDStreamを生成する。
引数defaultRDDは、queueが空になった後に使われるRDD。
後からqueueにRDDを追加することも出来るが、同期する必要があるだろうからSynchronizedQueueを使った方がいいかも。
val rdd = sc.makeRDD(Seq("a", "b", "c"))
val queue = scala.collection.mutable.Queue(rdd)
val stream = ssc.queueStream(queue)
queueStream[T: ClassTag](
  queue: Queue[RDD[T]],
  oneAtATime: Boolean,
  defaultRDD: RDD[T]
): InputDStream[T]
val rdd = sc.makeRDD(Seq("a", "b", "c"))
val queue = scala.collection.mutable.Queue(rdd)
val stream = ssc.queueStream(queue, true, sc.makeRDD(Seq("empty")))
union[T](
  streams: Seq[DStream[T]]
): DStream[T]
複数のDStreamを統合して1つのDStreamにする。
DStream.union
 
transform[T](
  dstreams: Seq[DStream[_]],
  transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T]
新しいDStreamを生成する。
DStream.transform
 

実行系

メソッド 説明
start(): Unit ストリーム処理の実行を開始する。 ssc.start()
awaitTermination(): Unit 実行が停止するのを待つ。
引数timeoutはミリ秒単位。
 
awaitTermination(timeout: Long): Unit  
stop(stopSparkContext: Boolean = true): Unit stop(stopSparkContext, false)と同じ。 ssc.stop()
stop(
  stopSparkContext: Boolean,
  stopGracefully: Boolean
): Unit
ストリーム処理の実行を停止する。
引数stopSparkContextをtrueにすると、SparkContextもstopする。
引数stopGracefullyをtrueにすると、全データの処理が終わるまで待つ。
 

設定系

メソッド 説明
sparkContext: SparkContext SparkContextを返す。 val sc = ssc.sparkContext
remember(duration: Duration): Unit    
checkpoint(directory: String): Unit チェックポイントをセットする。
引数directoryにチェックポイントのディレクトリー(HDFS)を指定する。
nullを指定するとチェックポイントを解除する?
 
addStreamingListener(streamingListener: StreamingListener): Unit StreamingListenerを登録する。  

DStream

DStream(discretized stream、離散ストリーム)は、Spark Streaming用の“RDD相当”のクラス。
(直接RDDを継承してはいないが、似たメソッドを持っている)

DStream内部では、スライド間隔に応じたデータ群をRDDとして扱っているようだ。


変換処理

新しいDStreamを返す処理。

reducecountといった複数データを扱う(かつウィンドウ間隔を指定しない)メソッドの場合、処理対象データの範囲は、前回処理以降に来たデータになる模様。

クラス メソッド 備考
名称 引数 戻り型 コーディング 結果
DStream[T] map [U] mapFunc: T => U DStream[U] 要素を別の型に変換する。 val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("abc", "de", "f"))))
val s2 = s1.map(_.length)
3
2
1
DStream[T] flatMap [U] flatMapFunc: T => Traversable[U] DStream[U] 要素を別の型(要素数は複数可)に変換する。 val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a b", "c d"))))
val s2 = s1.flatMap(_.split(" "))
a
b
c
d
DStream[T] filter   filterFunc: T => Boolean DStream[T] 条件を満たす要素だけ抽出する。 val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("abc", "de", "f"))))
val s2 = s1.filter(_.startsWith("a"))
abc
DStream[T] glom     DStream[Array[T]] 各パーティションの内容を配列化したDStreamを生成する。 val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("abc", "de", "f"))))
val s2 = s1.glom()
Array(abc)
Array(de,f)
DStream[T] repartition   numPartitions: Int DStream[T] パーティション数を変更する。 val s2 = s1.repartition(3)  
DStream[T] mapPartitions [U] mapPartFunc: Iterator[T] => Iterator[U]
preservePartitioning: Boolean = false
DStream[U] パーティションを別の型のパーティションに変換する。 val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq(1, 2, 3))))
val s2 = s1.mapPartitions(i => i.map("a" * _))
a
aa
aaa
DStream[T] reduce   reduceFunc: (T, T) => T DStream[T] 要素同士の演算を行う。 val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq(1, 2, 3))))
val s2 = s1.reduce(_ + _)
6
DStream[T] count     DStream[Long] 要素数を返す。 val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a", "b", "c"))))
val s2 = s1.count()
3
DStream[T] countByValue   numPartitions: Int = defaultParallelism DStream[(T, Long)] 要素の種類毎の個数を返す。 val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a", "b", "c", "b", "a", "b"))))
val s2 = s1.countByValue()
(b,3)
(a,2)
(c,1)
DStream[T] transform [U] transformFunc: RDD[T] => RDD[U] DStream[U] DStream内のRDDを別の型のRDDに変換する。 val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a", "b", "c", "b"))))
val s2 = s1.transform(rdd => rdd.zipWithIndex)
(a,0)
(b,1)
(c,2)
(b,3)
DStream[T] transform [U] transformFunc: (RDD[T], Time) => RDD[U] DStream[U] val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a", "b", "c"))))
val s2 = s1.transform((rdd, time) => rdd.map(s => (new java.util.Date(time.milliseconds), s)))
(Tue Sep 02 12:57:32 JST 2014,a)
(Tue Sep 02 12:57:32 JST 2014,b)
(Tue Sep 02 12:57:32 JST 2014,c)
DStream[T] transformWith [U, V] other: DStream[U]
transformFunc: (RDD[T], RDD[U]) => RDD[V]
DStream[V] 自分のRDDと他DStreamのRDDを別のRDDに変換する。 val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq(10, 20, 30))))
val s2 = s1.reduce(_ + _).repartition(1)
val s3 = s1.count().repartition(1)
val s4 = s2.transformWith(s3, (r: RDD[Int], c: RDD[Long]) => r.zip(c))
(60,3)
DStream[T] transformWith [U, V] other: DStream[U]
transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
DStream[V]    
DStream[T] window   windowDuration: Duration DStream[T] 指定されたウィンドウ間隔・スライド間隔のDStreamを返す。
スライド間隔を指定しない場合は元のDStreamと同じになる。
新しいウィンドウ間隔およびスライド間隔は、元のスライド間隔の整数倍の必要がある。
println(s1.slideDuration)
val s2 = s1.window(Seconds(5))
println(s2.slideDuration)
Duration(1000)
Duration(1000)
DStream[T] window   windowDuration: Duration
slideDuration:
Duration
DStream[T] println(s1.slideDuration)
val s2 = s1.window(Seconds(5), Seconds(2))
println(s2.slideDuration)
Duration(1000)
Duration(2000)
DStream[T] reduceByWindow   reduceFunc: (T, T) => T
windowDuration:
Duration
slideDuration:
Duration
DStream[T] ウィンドウ内の要素の演算を行う。 val s2 = s1.reduceByWindow(_ + _, Seconds(5), Seconds(2))  
DStream[T] reduceByWindow   reduceFunc: (T, T) => T
invReduceFunc: (T, T) => T
windowDuration:
Duration
slideDuration:
Duration
DStream[T] ウィンドウ内の要素の演算を行う。
引数invReduceFuncは逆写像(inverse)の関数。
   
DStream[T] countByWindow   windowDuration: Duration
slideDuration:
Duration
DStream[Long] ウィンドウ内の要素の個数を返す。    
DStream[T] countByValueAndWindow   windowDuration: Duration
slideDuration:
Duration
numPartitions: Int = defaultParallelism
DStream[(T, Long)] ウィンドウ内の要素の種類毎の個数を返す。    
DStream[T] union   that: DStream[T] DStream[T] 他のDStreamと統合する。    

アクション

ストリーム処理の結果を保存する処理。RDDに合わせて、便宜上アクションという分類にしてみた)
Spark Streamingの場合はデータが次々と来るので、基本的にはファイルに出力することになるだろう。

RDDのアクション系メソッドの場合、そのメソッドを実行したら実際に処理が行われるが、Spark Streamingの場合は、これらのメソッドを呼び出しても実際の処理は行われない。
StreamingContextをstartすると(定期的に)実行される)

printforeachRDDsaveといった処理は、スライド間隔(slideDuration)の時間ごとに実行される。

クラス メソッド 備考
名称 引数 戻り型 コーディング 結果
DStream[T] print     Unit 先頭10件をコンソールに表示する。
たぶんデバッグ用。
val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("abc", "de", "f"))))
s1.print()
-------------------------------------------
Time: 1409634129000 ms
-------------------------------------------
abc
de
f
DStream[T] foreach   foreachFunc: RDD[T] => Unit Unit 0.9.0で非推奨。→foreachRDD    
DStream[T] foreach   foreachFunc: (RDD[T], Time) => Unit Unit    
DStream[T] foreachRDD   foreachFunc: RDD[T] => Unit Unit DStream内のRDDを処理する。
たぶんrddのファイル保存メソッドを呼び出す使い方をする。
val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a", "b", "c"))))
s1.foreachRDD(rdd => rdd.foreach(println))
b
a
c
DStream[T] foreachRDD   foreachFunc: (RDD[T], Time) => Unit Unit val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a", "b", "c"))))
s1.foreachRDD((rdd, time) => rdd.foreach { s =>
  printf("%s %s\n", s, new java.util.Date(time.milliseconds))
})
b Tue Sep 02 14:18:43 JST 2014
c Tue Sep 02 14:18:43 JST 2014
a Tue Sep 02 14:18:43 JST 2014
DStream[T] saveAsTextFiles   prefix: String
suffix: String = ""
Unit テキストファイルとして保存する。
引数に指定するのはディレクトリー名の一部。
実際に生成されるディレクトリーは、prefixとsuffixの間に(タイムスタンプの)数字列が入る。
RDD.saveAsTextFile
s1.saveAsTextFiles("/tmp/spark/streaming1")  
DStream[T] saveAsObjectFiles   prefix: String
suffix: String = ""
Unit キーがNullWritable、値がByteWritableであるHadoopのSequenceFileとして保存する。
実際に生成されるディレクトリーは、prefixとsuffixの間に(タイムスタンプの)数字列が入る。
RDD.saveAsObjectFile
s1.saveAsObjectFiles("/tmp/spark/streaming2")  
DStream[T] slice   fromTime: Time
toTime: Time
Seq[RDD[T]] 指定された時刻間のRDDを返す。
StreamingContextがstartした後でのみ使える。
val toTime = Time(System.currentTimeMillis())
val fromTime = toTime - Seconds(1)
val seq = s1.slice(fromTime, toTime)
seq.foreach(rdd => rdd.foreach(println))
a
b
c

永続化(キャッシュ)処理

クラス メソッド 備考
名称 引数 戻り型 コーディング 結果
DStream[T] persist   level: StorageLevel = StorageLevel.MEMORY_ONLY_SER DStream[T]      
DStream[T] cache     DStream[T] persistと同じ。    
DStream[T] checkpoint   interval: Duration DStream[T]      

その他

クラス メソッド 備考
名称 引数 戻り型 コーディング 結果
DStream[T] slideDuration     Duration スライド間隔を返す。
初期値はStreamingContext生成時に指定したbatchDuration。
   
DStream[T] dependencies     List[DStream[_]] 依存しているDStreamを返すっぽい。    
DStream[T] compute   validTime: Time Option[RDD[T]] 指定された時刻のRDDを返す。    
DStream[T] context     StreamingContext StreamingContextを返す。    

Duration

Durationは間隔[ms]を表すクラス。
Spark Streamingで使用するDurationはScala標準のscala.concurrent.durationではなく、Spark Streaming独自のorg.apache.spark.streaming.Durationである。

import org.apache.spark.streaming._

Durationを生成する為のオブジェクトとして以下のものが用意されている。

オブジェクト 説明
コーディング 結果
Milliseconds ミリ秒。 val d = Milliseconds(1) Duration(1)
Seconds 秒。 val d = Seconds(1) Duration(1000)
Minutes 分。 val d = Minutes(1) Duration(60000)

Durationには四則演算+,-,*,/や比較演算<,<=,>=,>等のメソッドが定義されている。


Time

Timeは日時を表すクラス。
(内部では値をミリ秒で保持している)

import org.apache.spark.streaming.Time
val time1 = Time(System.currentTimeMillis())
val time2 = time1 + Seconds(1)

println(new java.util.Date(time2.milliseconds))

Timeには加減算+,-や比較演算<,<=,>=,>等のメソッドが定義されている。


Spark目次へ戻る / Scalaへ戻る / 技術メモへ戻る
メールの送信先:ひしだま