S-JIS[2014-09-02] 変更履歴
Apache SparkのStreamingのメモ。
Spark Streamingは、流れてくるデータ(ストリーム)を処理する機能。
次々に流れてくるデータを(短い間隔で)繰り返しバッチ処理する。
大抵は結果をファイルシステム上に格納する。バッチ処理の都度書き込まれるので、ファイルが増えていくことになる。
短い間隔でバッチ処理を行う形なので、高スループット(単位時間当たりの処理能力が高い)であるが、レスポンス(応答時間)は遅くなる。
Spark Streamingを扱う場合、StreamingContextクラスを使う。
入力データはDStream(discretized stream、離散ストリーム)というクラスで扱う。
DStreamはRDDではないが、RDDと似たメソッドを持っている。(それらのメソッドを使って処理を記述することを「バッチ処理」と呼んでいるように思う)
import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._
val sc: SparkContext = 〜 val batchDuration = Seconds(1) val ssc = new StreamingContext(sc, batchDuration)
StreamingContextインスタンスはSparkContextを使って生成する。
引数のbatchDurationには、入力のストリームデータを後続のバッチ処理に送る間隔(スライド間隔)を指定する。
処理(メソッド)によっては、ウィンドウ間隔を指定するものもある。
短いバッチ処理を繰り返し実行していくことになるが、バッチ1回分の対象データをRDDで保持する形になっているようだ。
TCPソケットからテキストを読み込んでファイルに出力する例。
import org.apache.spark.SparkContext import org.apache.spark.streaming._
object StreamingExample { val sc = new SparkContext("local[2]", "streaming-example") val ssc = new StreamingContext(sc, Seconds(1))
def main(args: Array[String]): Unit = { // バッチ処理 val stream1 = ssc.socketTextStream("localhost", 56789) // DStream[String] val stream2 = stream1.flatMap(_.split(" ")) stream2.saveAsTextFiles("/tmp/spark/streaming1"); // ストリーミング処理を3秒間実行 ssc.start() try { Thread.sleep(3 * 1000) } finally { ssc.stop(true) // SparkContextもstopする } } }
ストリームの読み込みを開始(ssc.start()
)する前に、DStreamのアクション(処理内容)を記述しておく。
この例では「socketTextStreamでサーバーに接続してStringを受け取り、変換を行ってファイルに書き込む」という処理を記述している。
(RDDを記述するのとは異なり、DStreamの場合は、この時点ではまだ実際の処理は行われない。
いわば、バッチ処理の内容を事前に定義(宣言)しているような感じ)
DStreamの処理内容を記述したら、StreamingContextのstart()でストリーム読み込みを開始する。
これによってDStreamの処理が実際に行われる。
stop()で読み込みを終了する。
サーバー側のサンプルは以下のような感じ。(何の変哲も無いTCPサーバー)
import java.net.ServerSocket import scala.concurrent.ops.spawn
object StramingExampleServer { def main(args: Array[String]): Unit = { val server = new ServerSocket(56789) while (true) { val socket = server.accept() spawn { val os = socket.getOutputStream() try { for (i <- 0 to 9) { os.write(s"[${i}]\n".getBytes("UTF-8")) os.write(s"${new java.util.Date()}\n".getBytes("UTF-8")) Thread.sleep(200) } } finally { os.close() } } } } }
StreamingContextは、Spark Streaming用の基本的なクラス。
import org.apache.spark.streaming.StreamingContext
コンストラクター | 説明 | 例 |
---|---|---|
this( |
val sc = new SparkContext(〜) |
|
this( |
内部では、confからSparkContextを生成する。 | val conf = new SparkConf() |
this( |
内部では、引数からSparkContextを生成する。 引数はSparkConfと同様。 |
val ssc = new StreamingContext("local[2]",
"streaming-example", Seconds(1)) |
this( |
チェックポイントファイルからStreamingContextを再作成する。 pathはチェックポイントファイルのパス。 |
引数のbatchDurationには、入力のストリームデータを後続のバッチ処理に送る間隔を指定する。
→Durationクラス
StreamingContextには以下のようなメソッドがある。
メソッド | 説明 | 例 |
---|---|---|
networkStream[T]( |
1.0.0で非推奨。→receiverStream | |
receiverStream[T]( |
Receiverを入力とするDStreamを生成する。 | |
actorStream[T]( |
AkkaのActor(Props)を入力とするDStreamを生成する。 | |
socketTextStream( |
TCPサーバーに接続し、TCPソケットからテキストを入力するDStreamを生成する。 | val stream = ssc.socketTextStream("localhost",
12345) |
socketStream[T]( |
TCPソケットを入力とするDStreamを生成する。 | |
rawSocketStream[T]( |
TCPソケットを入力とするDStreamを生成する。 受け取ったデータがそのまま後続処理に渡される。 |
|
fileStream[K, V, F <: NewInputFormat[K, V]] ( |
ファイル(HDFS)を入力とするDStreamを生成する。 ピリオド「 . 」から始まるファイルは無視される。NewInputFormatは、Hadoopの新APIのInputFormatのこと。 |
|
fileStream[K, V, F <: NewInputFormat[K, V]] ( |
ファイル(HDFS)を入力とするDStreamを生成する。 引数filterは、ファイル名(パス)のフィルター。 |
|
textFileStream( |
ファイル(HDFS)をテキストとして読み込むDStreamを生成する。 | |
queueStream[T]( |
RDDを入力とするDStreamを生成する。 引数defaultRDDは、queueが空になった後に使われるRDD。 後からqueueにRDDを追加することも出来るが、同期する必要があるだろうからSynchronizedQueueを使った方がいいかも。 |
val rdd = sc.makeRDD(Seq("a",
"b", "c")) |
queueStream[T: ClassTag]( |
val rdd = sc.makeRDD(Seq("a",
"b", "c")) |
|
union[T]( |
複数のDStreamを統合して1つのDStreamにする。 →DStream.union |
|
transform[T]( |
新しいDStreamを生成する。 →DStream.transform |
メソッド | 説明 | 例 |
---|---|---|
start(): Unit |
ストリーム処理の実行を開始する。 | ssc.start() |
awaitTermination(): Unit |
実行が停止するのを待つ。 引数timeoutはミリ秒単位。 |
|
awaitTermination(timeout: Long): Unit |
||
stop(stopSparkContext: Boolean = true): Unit |
stop(stopSparkContext, false) と同じ。 |
ssc.stop() |
stop( |
ストリーム処理の実行を停止する。 引数stopSparkContextをtrueにすると、SparkContextもstopする。 引数stopGracefullyをtrueにすると、全データの処理が終わるまで待つ。 |
メソッド | 説明 | 例 |
---|---|---|
sparkContext: SparkContext |
SparkContextを返す。 | val sc = ssc.sparkContext |
remember(duration: Duration ): Unit |
||
checkpoint(directory: String): Unit |
チェックポイントをセットする。 引数directoryにチェックポイントのディレクトリー(HDFS)を指定する。 nullを指定するとチェックポイントを解除する? |
|
addStreamingListener(streamingListener:
StreamingListener): Unit |
StreamingListenerを登録する。 |
DStream(discretized stream、離散ストリーム)は、Spark Streaming用の“RDD相当”のクラス。
(直接RDDを継承してはいないが、似たメソッドを持っている)
DStream内部では、スライド間隔に応じたデータ群をRDDとして扱っているようだ。
新しいDStreamを返す処理。
reduceやcountといった複数データを扱う(かつウィンドウ間隔を指定しない)メソッドの場合、処理対象データの範囲は、前回処理以降に来たデータになる模様。
クラス | メソッド | 備考 | 例 | ||||
---|---|---|---|---|---|---|---|
名称 | 型 | 引数 | 戻り型 | コーディング | 結果 | ||
DStream[T] | map |
[U] |
mapFunc: T => U |
DStream[U] |
要素を別の型に変換する。 | val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("abc",
"de", "f")))) |
3 |
DStream[T] | flatMap |
[U] |
flatMapFunc: T => Traversable[U] |
DStream[U] |
要素を別の型(要素数は複数可)に変換する。 | val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a
b", "c d")))) |
a |
DStream[T] | filter |
filterFunc: T => Boolean |
DStream[T] |
条件を満たす要素だけ抽出する。 | val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("abc",
"de", "f")))) |
abc |
|
DStream[T] | glom |
DStream[Array[T]] |
各パーティションの内容を配列化したDStreamを生成する。 | val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("abc",
"de", "f")))) |
Array(abc) |
||
DStream[T] | repartition |
numPartitions: Int |
DStream[T] |
パーティション数を変更する。 | val s2 = s1.repartition(3) |
||
DStream[T] | mapPartitions |
[U] |
mapPartFunc: Iterator[T] => Iterator[U] |
DStream[U] |
パーティションを別の型のパーティションに変換する。 | val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq(1,
2, 3)))) |
a |
DStream[T] | reduce |
reduceFunc: (T, T) => T |
DStream[T] |
要素同士の演算を行う。 | val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq(1,
2, 3)))) |
6 |
|
DStream[T] | count |
DStream[Long] |
要素数を返す。 | val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a",
"b", "c")))) |
3 |
||
DStream[T] | countByValue |
numPartitions: Int =
defaultParallelism |
DStream[(T, Long)] |
要素の種類毎の個数を返す。 | val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a",
"b", "c", "b", "a", "b")))) |
(b,3) |
|
DStream[T] | transform |
[U] |
transformFunc: RDD[T] => RDD[U] |
DStream[U] |
DStream内のRDDを別の型のRDDに変換する。 | val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a",
"b", "c", "b")))) |
(a,0) |
DStream[T] | transform |
[U] |
transformFunc: (RDD[T], Time) => RDD[U] |
DStream[U] |
val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a",
"b", "c")))) |
(Tue Sep 02 12:57:32 JST 2014,a) |
|
DStream[T] | transformWith |
[U, V] |
other: DStream[U] |
DStream[V] |
自分のRDDと他DStreamのRDDを別のRDDに変換する。 | val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq(10,
20, 30)))) |
(60,3) |
DStream[T] | transformWith |
[U, V] |
other: DStream[U] |
DStream[V] |
|||
DStream[T] | window |
windowDuration: Duration |
DStream[T] |
指定されたウィンドウ間隔・スライド間隔のDStreamを返す。 スライド間隔を指定しない場合は元のDStreamと同じになる。 新しいウィンドウ間隔およびスライド間隔は、元のスライド間隔の整数倍の必要がある。 |
println(s1.slideDuration) |
Duration(1000) |
|
DStream[T] | window |
windowDuration: Duration
Duration |
DStream[T] |
println(s1.slideDuration) |
Duration(1000) |
||
DStream[T] | reduceByWindow |
reduceFunc: (T, T) => T Duration
Duration |
DStream[T] |
ウィンドウ内の要素の演算を行う。 | val s2 = s1.reduceByWindow(_ + _, Seconds(5), Seconds(2)) |
||
DStream[T] | reduceByWindow |
reduceFunc: (T, T) => T Duration
Duration |
DStream[T] |
ウィンドウ内の要素の演算を行う。 引数invReduceFuncは逆写像(inverse)の関数。 |
|||
DStream[T] | countByWindow |
windowDuration: Duration
Duration |
DStream[Long] |
ウィンドウ内の要素の個数を返す。 | |||
DStream[T] |
countByValueAndWindow |
windowDuration: Duration
Duration
|
DStream[(T, Long)] |
ウィンドウ内の要素の種類毎の個数を返す。 | |||
DStream[T] | union |
that: DStream[T] |
DStream[T] |
他のDStreamと統合する。 |
ストリーム処理の結果を保存する処理。(RDDに合わせて、便宜上アクションという分類にしてみた)
Spark Streamingの場合はデータが次々と来るので、基本的にはファイルに出力することになるだろう。
(RDDのアクション系メソッドの場合、そのメソッドを実行したら実際に処理が行われるが、Spark
Streamingの場合は、これらのメソッドを呼び出しても実際の処理は行われない。
StreamingContextをstartすると(定期的に)実行される)
printやforeachRDD・saveといった処理は、スライド間隔(slideDuration)の時間ごとに実行される。
クラス | メソッド | 備考 | 例 | ||||
---|---|---|---|---|---|---|---|
名称 | 型 | 引数 | 戻り型 | コーディング | 結果 | ||
DStream[T] | print |
Unit |
先頭10件をコンソールに表示する。 たぶんデバッグ用。 |
val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("abc",
"de", "f")))) |
------------------------------------------- |
||
DStream[T] | foreach |
foreachFunc: RDD[T] => Unit |
Unit |
0.9.0で非推奨。→foreachRDD | |||
DStream[T] | foreach |
foreachFunc: (RDD[T], Time) => Unit |
Unit |
||||
DStream[T] | foreachRDD |
foreachFunc: RDD[T] => Unit |
Unit |
DStream内のRDDを処理する。 たぶんrddのファイル保存メソッドを呼び出す使い方をする。 |
val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a",
"b", "c")))) |
b |
|
DStream[T] | foreachRDD |
foreachFunc: (RDD[T], Time) => Unit |
Unit |
val s1 = ssc.queueStream(Queue(sc.makeRDD(Seq("a",
"b", "c")))) |
b Tue Sep 02 14:18:43 JST 2014 |
||
DStream[T] | saveAsTextFiles |
prefix: String |
Unit |
テキストファイルとして保存する。 引数に指定するのはディレクトリー名の一部。 実際に生成されるディレクトリーは、prefixとsuffixの間に(タイムスタンプの)数字列が入る。 →RDD.saveAsTextFile |
s1.saveAsTextFiles("/tmp/spark/streaming1") |
||
DStream[T] |
saveAsObjectFiles |
prefix: String |
Unit |
キーがNullWritable、値がByteWritableであるHadoopのSequenceFileとして保存する。 実際に生成されるディレクトリーは、prefixとsuffixの間に(タイムスタンプの)数字列が入る。 →RDD.saveAsObjectFile |
s1.saveAsObjectFiles("/tmp/spark/streaming2") |
||
DStream[T] | slice |
fromTime: Time |
Seq[RDD[T]] |
指定された時刻間のRDDを返す。 StreamingContextがstartした後でのみ使える。 |
val toTime = Time(System.currentTimeMillis()) |
a |
クラス | メソッド | 備考 | 例 | ||||
---|---|---|---|---|---|---|---|
名称 | 型 | 引数 | 戻り型 | コーディング | 結果 | ||
DStream[T] | persist |
level: StorageLevel =
StorageLevel.MEMORY_ONLY_SER |
DStream[T] |
||||
DStream[T] | cache |
DStream[T] |
persistと同じ。 | ||||
DStream[T] | checkpoint |
interval: Duration |
DStream[T] |
クラス | メソッド | 備考 | 例 | ||||
---|---|---|---|---|---|---|---|
名称 | 型 | 引数 | 戻り型 | コーディング | 結果 | ||
DStream[T] | slideDuration |
Duration |
スライド間隔を返す。 初期値はStreamingContext生成時に指定したbatchDuration。 |
||||
DStream[T] | dependencies |
List[DStream[_]] |
依存しているDStreamを返すっぽい。 | ||||
DStream[T] | compute |
validTime: Time |
Option[RDD[T]] |
指定された時刻のRDDを返す。 | |||
DStream[T] | context |
StreamingContext |
StreamingContextを返す。 |
Durationは間隔[ms]を表すクラス。
Spark Streamingで使用するDurationはScala標準のscala.concurrent.duration
ではなく、Spark
Streaming独自のorg.apache.spark.streaming.Duration
である。
import org.apache.spark.streaming._
Durationを生成する為のオブジェクトとして以下のものが用意されている。
オブジェクト | 説明 | 例 | |
---|---|---|---|
コーディング | 結果 | ||
Milliseconds |
ミリ秒。 | val d = Milliseconds(1) |
Duration(1) |
Seconds |
秒。 | val d = Seconds(1) |
Duration(1000) |
Minutes |
分。 | val d = Minutes(1) |
Duration(60000) |
Durationには四則演算+,-,*,/
や比較演算<,<=,>=,>
等のメソッドが定義されている。
Timeは日時を表すクラス。
(内部では値をミリ秒で保持している)
import org.apache.spark.streaming.Time
val time1 = Time(System.currentTimeMillis()) val time2 = time1 + Seconds(1) println(new java.util.Date(time2.milliseconds))
Timeには加減算+,-
や比較演算<,<=,>=,>
等のメソッドが定義されている。