S-JIS[2011-07-20/2021-06-10] 変更履歴

Apache Spark

Apache Sparkは、Scalaで(Hadoopのような)分散処理を行う為のライブラリー。


概要

Apache Sparkは、Scalaで(Hadoopのような)分散処理を行う為のライブラリー(OSS)。

最初はカリフォルニア大学バークレー校(UC Berkeley)のAMPLabで開発されていたが、
2013年6月にApacheのプロジェクト(incubator)になり、2014年2月にトップレベルプロジェクトに昇格した。[2014-08-16]
今はDatabricksという会社が主にサポート(開発)しているらしい。
(「spark」という名前だけで検索しようとすると色々なものが引っかかるので、昔は「mesos spark」「scala spark」等で検索する必要があったが、今は「apache spark」で検索できる)

Sparkは、Scala2.9の並列コレクションが(ローカルマシン上での)スレッドによる並列処理を普通のプログラミングで出来るようにしているのと同じような考えで、複数マシン上での分散処理を普通のプログラミングで出来るのを目指しているのではないかと思う。

種類 備考
普通のScala
(シングルスレッドの直列処理)
val file = scala.io.Source.fromFile("/tmp/log.txt")
val lines = file.getLines
val error = lines.filter(_.contains("ERROR")).size
filterメソッド等を使ってコーディングする。
(普通のプログラミング)
普通のScala
(マルチスレッドによる並列処理)
val file = scala.io.Source.fromFile("/tmp/log.txt")
val lines = file.getLines.toSeq.par
val error = lines.filter(_.contains("ERROR")).size
parを付けるだけで並列コレクション(マルチスレッドによる並列処理)になる。
Spark
(複数マシンに分散した並列処理)
val lines = sc.textFile("/tmp/log.txt")
val error = lines.filter(_.contains("ERROR")).count
左記のscSparkContextクラスの変数。
それを使う以外のコーディングスタイルは、普通のScalaプログラミングとほぼ同じ。

※Spark1系はRDDを使ってプログラミングしていたが、Spark2系ではDatasetを使う。[2017-01-14]


実際に分散させて実行するには、Spark0.3の頃はMesosというクラスターマネージャー(ライブラリー)が必要だった。[/2014-08-20]
今はMesosでなくてもHadoop2(YARN)上でも動くし、それら無しのSpark単独(standalone cluster manager)でも動く。
開発環境として単独(ローカル)で動かすときは、そういったクラスターは不要。
(どのモードで動かすのかは、アプリケーション実行時のmasterに指定するマスターURLによって決まる)

MesosやYARNはリソース管理(実行するコンテナーの管理)に使われるものであり、ファイルシステムではない。
Sparkアプリケーションの入出力先として分散ファイルシステムが必要になるが、そこでHadoopのHDFSが利用できる。
(Mesos経由でHDFSを対象に処理することも出来るらしい)
他にもJDBC経由でRDBを対象にしたりすることも出来るようだ。


Sparkアプリケーションを起動する側(メインプログラム)をdriverと呼ぶ。
各マシン(ワーカーノード)上で分散して実際に稼動する側(プロセス)をexecutorと呼ぶ。


Hadoopとの比較

SparkはHadoopよりも速い?

Sparkのトップページの右側に、WordCountのプログラムの例が載っている。
HDFSをインプットにし、Scalaのコレクションでよく見かける感じの関数名で処理を書いていて、これはなかなか楽しそう!

その近くにグラフがあって、実行時間がHadoopよりSparkの方が圧倒的に短くなっている。SparkはHadoopよりこんなに速いのか!
と、一見すると誤解しそうだが、これはWordCountの速度比較ではない。『Logistic regression』というサンプルの実行時間の比較。
詳細はSparkのサンプルのページ(Spark Examples)に出ているが、『Logistic Regression』は最初にファイルを読み込んでキャッシュし、それを使って処理しているようだ。この方式がHadoopよりも速い、ということらしい。

もっとも、WordCountもHadoopよりSparkの方が速いらしいけど(笑)[2014-09-02]

SparkとHadoopとの比較

聞きかじった範囲での、SparkとHadoopの相違点・同じ点。[2014-09-02]

  Hadoop Spark
アプリケーションの実装方法 MapクラスReducerクラスを実装する。
場合によってはFormatWritableComparator等も実装する必要がある。
Asakusa Frameworkだと演算子のチェーンのような実装方法)
RDDのメソッドチェーンで処理を記述する。
アプリケーションの実行方法 1個のHadoopジョブは1回のMapReduceで終わる。
複数ジョブを連携させたい場合はHadoopの外側で何かしらの仕組みが必要。
Asakusa Frameworkは1回分のバッチを複数のHadoopジョブに変換する。
1つのHadoopジョブには複数の演算子が入れられる。
演算子のまとめ方を決める際にはDAGが利用される)
Sparkアプリケーションは複数のステージに変換される。
複数のステージが順次(あるいは並行に)実行される。
1つのステージにはRDDの複数のメソッドが入れられる。
(メソッドのまとめ方を決める際にはDAGが利用される)
処理のアルゴリズム
[/2014-09-12]
MapReduce。
Map Shuffle Reduce
各フェーズの出力は必ずファイルに書き出す。
Mapフェーズは(変換処理が不要でも)省略できないし、必ずファイルから読み込む。
ベースとなるアルゴリズムはMapReduce。
ステージ(Map) Shuffle ステージ(Reduce→Map) Shuffle →…
シャッフルが必要となる処理(Reduce系処理)があると、そこでステージが切られる。
つまりステージとステージの境界がシャッフル処理になる。
ステージ内の処理(Reduceのデータ(キーや値一覧)も全て)はメモリー上で行われる。
ステージ内の最後の処理の後(シャッフル処理の前)で、データはファイルに書き込まれる。
(なるべくシャッフルが起きないようにする最適化もあるらしい)
Reduce前にメモリー上にハッシュマップ(キーと値一覧)を構築するが、メモリーが足りない場合はファイルに退避(spill)する仕組みがあるらしい。
しかしファイル出力が追いつかなくてOutOfMemoryErrorで落ちることの方が多いらしい^^;
(Spark1.1.0で、そういった大規模データの為にソートベースのシャッフルが出来るようになった)
シリアライズ
[2014-09-08]
Java標準のシリアライズは使用しない)
Hadoop独自のWritableを使ってシリアライズする。
デフォルトではJava標準のシリアライズを使用する。
Kryoを使うことも出来る。(Kryoの方が高速なので、実際にはKryoを使うべき。なんでデフォルトになっていないのか疑うレベルw
Data Serialization
リソース管理 Hadoop1ではJobTracker。
Hadoop2ではYARNがリソース管理・コンテナー管理を行う。
自前のstandalone cluster managerで動くが、MesosやYARNを利用可能。
分散ファイルシステム HDFS(Hadoop分散ファイルシステム)を持っている。
(HDFS上に置かれたファイルは、(物理的には)分割・複製して複数のサーバー上に置かれる)
自前では分散ファイルシステムを持っていない。
HDFSを利用可能。
HDFSのローカリティー タスクの実行にはHDFSのローカリティーが考慮される。
(データの置かれているサーバー上でタスクが実行される)
HDFSから読み込むRDDのパーティションは、HDFSのローカリティーが考慮される。
(データの置かれているサーバー上でタスクが実行される)
ジョブの耐障害性
[/2014-09-08]
障害によってタスクの実行が失敗したら、別サーバーで再実行される。
データに関しては、Hadoopは実行結果を必ずファイルとして出力しているし、HDFSの機能により別サーバー上に複製されている。
障害によってタスクの実行が失敗したら、別サーバーで再実行される。
RDDの一部が消失したら、RDDの元データを使って再作成される。
RDDは“入力元となるRDD”と“演算方法(処理内容)”を保持しているので、基本的には再作成することが出来る。
(これは、RDDが「不変」という特徴を持っているから出来る)
タスクの起動
[/2014-09-08]
タスクはスレーブノード(ワーカーノード)上で実行される。
タスクを実行するJavaVMはタスク毎に起動される。
このオーバーヘッドが大きい為、Hadoop税などと呼ばれる。
executorは(YARN等の機能を使って)ワーカーノード上に起動される。
executorのJavaVMはアプリケーション実行中(SparkContextstopされるまで)は常駐する。
executorは複数のタスクで使い回される。
各タスクはexecutorのプロセス内でマルチスレッド(ExecutorService)で実行される。
ちなみに、通信はAkkaで行われる。
タスク数
[/2014-09-09]
Reduceタスク数は、クラスター全体のコア数(スロット数)より少し少ない程度が良いとされている。
つまり、10ノードのクラスターだと数十タスクになる。
パーティション数はHadoopよりかなり多め(100〜1万くらい)で良い。
Sparkはメモリー内で処理するので、1パーティションのサイズは(メモリーに乗り切るよう)小さめにし、パーティション数を多くする。
タスクはスレッドで実行されるので、オーバーヘッドもほとんどかからない。
さすがに1万を超えるとオーバーヘッドも無視できなくなってくるようだが。
push型/pull型
[/2014-09-08]
例として、データAからBを作り、BからC,Dを作ることを考える。
A→B→C
A→B→D→E
Hadoopの場合はAを作るジョブが終わったらBを作るジョブを実行することになる。
CやDを作る際にはBが出来ているので、それを読み込む。
(もしBが存在していなかったらエラーになる)
いわば、作ったデータを後続処理に渡す(push型)ということになる。
仮にEが作成不要となったら、Dを作るのは無駄になってしまう。
(push型では、作ったデータが後続処理で使われるかどうかは分からない(というか、Hadoopは後続ジョブのことなんか気にしないし)
Asakusa Frameworkだと、データが使われない場合はそのデータを作らないという最適化が行われる)
Sparkは、最終的に作るデータから見ていく仕組みになっているらしい。
Cを作る為にはBが必要、Bを作る為にはAが必要、と判断されて、Aの作成から順に実行されていく。
つまり必要なデータを引っ張ってくる(pull型)ということになる。
Eも同じく逆順に見ていって、Aの作成から順に実行されていく。
ここで、BはCとDから必要とされるが、B(やA)はそれぞれ用に(2回)作成されてしまう。
(RDDは「不変」という特徴を持っているので、基本的には何回実行しても結果は同じになるが、大きなデータの場合、効率は良くないだろう)
ただし明示的にBをキャッシュしておけば、初回に作成したデータが2回目以降で使われる。
何も考えずに全てのRDDをキャッシュしておけば必ず再利用されるだろうが、当然メモリー不足になる可能性が高い。
なお、ステージ内で最後に作られたデータは(ファイルに出力されるので)、明示的にキャッシュしなくても再利用されるらしい。
仮にEが作成不要となったら、(DはEがきっかけとなって作られるので、)Dも作られない。
ローカルファイルの共有 分散キャッシュにより、ファイルを各サーバーに配布することが出来る。 filesaddFileにより、ファイルを各サーバーに配布することが出来る。
共有変数 各フェーズ間で変数(値)を共有することは出来ない。
(強いて言えばカウンターが近いが、アプリ内からは使えない)
定数(固定値)を各executorに配布するブロードキャスト
各executorで集計した値をdriverで読み込むアキュムレーター
がある。

まとめると、Sparkも基本的なアルゴリズムはHadoopと同じくMapReduceだが、実装面などの色々な考慮でHadoopよりも高速化しているらしい。


Scalaへ戻る / 技術メモへ戻る
メールの送信先:ひしだま