S-JIS[2011-07-22] 変更履歴
Sparkのサンプルとして、偏差値を算出する処理を作ってみる。
Cascadingで作った偏差値算出サンプルと同様の処理をSparkで作ってみる。
偏差値は以下のような計算式で算出できる。
まずはクラスやメソッドのインポート部分。何の変哲も無し。
import spark._ import SparkContext._
各個人の科目毎の点数・偏差値を格納するクラスを用意。
case class Student( val sid: String, //番号 val sname: String, //名前 val kid: String, //科目名 val ten: Int, //点数 val average: Double = 0, //平均点 val score: Double = 0 //偏差値 )
最初に番号から点数までを格納し、最終的に平均点・偏差値を入れる。
(Scalaらしく、ケースクラスを使い、各フィールドは全て不変(笑))
ファイルを読み込み、Student一覧を作成する。
val host = "local[2]" val spark = new SparkContext(host, "SparkStandardScore") //ファイル読み込み val fname = "file://C:/temp/ten.utf8.txt" val file = spark.textFile(fname) val students = file.flatMap { line => if (line.startsWith("#")) Seq.empty else { val arr = line.split(",") val sid = arr(0).trim val sname = arr(1).trim var kid = -1 var sum = 0 val list = for (t <- arr.drop(2)) yield { kid += 1 val ten = t.trim.toInt sum += ten Student(sid, sname, kid2kname(kid), ten) } Student(sid, sname, "合計", sum) +: list } }.cache()
private val knames = "数学,国語,理科,社会,英語".split(",") //科目名の配列 private def kid2kname(kid: Int): String = knames(kid)
SparkContextを作り、textFile()メソッドでファイルを指定する。
処理本体はflatMap。
「#」で始まっている行はコメントなのでスキップする。
それ以外の場合はデータ行なので、カンマで分割し、各項目を割り当てたStudentインスタンス(個人・科目別の点数を保持)を作っている。
(ループする際に、drop関数で配列の最初2つをとばしている。初めてまともにdropを使ったけど、こういう時には便利だなぁ(笑))
データは5教科分なので、ついでに合計も算出している。
最後にcache()を呼び出している。
これが何なのかよく分かっていないのだけれども(爆)、名前からしてデータをキャッシュするのだと思う。(実際に分散環境で動かしたときに大容量データをキャッシュできるのかは、知らない)
ここで作ったstudentsは後で何度も使うのでキャッシュしている。
students(個人・科目別の点数の一覧)から、科目別に平均点を算出する。
val average = students.map { s => (s.kid, (s.ten, 1)) } .reduceByKey { (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2) } .map { t => val (kid, (sum, count)) = t (kid, (sum.toDouble / count, count)) } val amap = spark.accumulator(average.toArray().toMap) println("平均点・人数: " + amap)
平均点を出すためには、合計点と人数を出す必要がある。(これはWordCountと同じ要領)
キーは科目名。
reduceByKeyでキー毎に演算(合算)を行えるが、これを使う場合はタプル(Tuple2)でないとダメなようだ。
合計点と人数の集計を同時にしたいので、データ部もタプルという、ちょっと微妙な構造になっている(苦笑)
(理想論的には点数と人数を集計する専用クラスを作る方が美しいし、平均点と人数を保持するクラスを作った方が後続処理の中が分かりやすくなる
が、今回は手抜き…)
reduceByKeyでデータ部を集計した後で、mapによって平均点に変換している。
valによるパターンマッチを使っているが、ここは下記のようにcaseで書き換えられるなぁ。
.map { case (kid, (sum, count)) => (kid, (sum.toDouble / count, count)) }
さて、こうして算出した科目毎の平均点(average)だが、当然、後続処理の中で使いたい。
Sparkには2つのRDDを結合する関数(union・++)はあるのだが、これは同じ型でないと結合できない。(studentsはRDD[Student]でaverageはRDD[(String,(Double,Int))]なので、結合できない)
MapReduce(Hadoop)ならComparatorとかを駆使してソート順を上手くすることで異なる型で結合する技法があるが、Spark0.3にはソートメソッドも無さそう。
groupByはあるので無理矢理AnyRefに統一して一緒に扱えるが、科目毎に“学生一覧+平均点データ”(のArrayBuffer)になるので、分散処理時にデータ件数が多くなることを考えると、使うべきではない。
そもそも科目数はたかが知れているので、Mapに入れておいて科目をキーにして取り出す方が(結合するよりも)手っ取り早い。
Mapにするには、RDDはtoArrayを使って配列に変換できるので、配列からtoMapで作成できる。
しかし、ローカル変数のMapに入れて後続処理で使おうとしても、Scalaの文法上は問題ないが、Sparkで実際に動かすときは(分散処理なので)データが転送(共有)されない。
//ダメな例 val kmap = average.toArray().toMap students.map{ s => val t = kmap(s.kid) }
そこで、かなり無理矢理ではあるが、accumulatorを使ってMapを共有してみることにした。
accumulatorは円周率算出のサンプルで使っているが、数値データを集計する為の共有変数のようなものだと思う。
しかしaccumulatorの型パラメーターは単なるTなので、数値以外も入れられる!w
ただし暗黙の引数としてAccumulatorParam[T]トレイトのインスタンスが必要になるので、Map用のを作ってやった。
private implicit def MapAccumulator[K, V] = new AccumulatorParam[Map[K, V]] { type T = Map[K, V] def addInPlace(t1: T, t2: T): T = { if (t1 == t2) t1 else throw new UnsupportedOperationException("t1=" + t1 + ", t2=" + t2) } def zero(initialValue: T): T = initialValue }
AccumulatorParamでは2つのメソッドを実装しなければならない。
zeroの方は初期化用だと思うので、引数をそのまま返せば問題ないだろう。
addInPlaceは2つの値を合算するもの。本来は数値の加算の為に呼ばれると思うので、今回のMapの使い方では呼ばれないだろうと思っていたら、
別々のスレッド(あるいは分散されたマシン)での集計結果をさらに合算する為にも呼ばれるようだ。
その場合は同じMapが入ってくるはずなので、一応同じかどうかをチェックして、どちらかを返すことにしてみた。
平均点が算出できたので、次に標準偏差を計算する。
標準偏差σ = √((煤i各学生の点数 - 平均点)2)÷人数)
val sigma = students.map { s => val ave = amap.value(s.kid)._1 (s.kid, scala.math.pow(s.ten - ave, 2)) //(点数-平均点)の二乗 }.reduceByKey(_ + _) //全合計 .map { t => val kid = t._1 val count = amap.value(kid)._2 (kid, scala.math.sqrt(t._2 / count)) //人数で割って平方根をとる } val smap = spark.accumulator(sigma.toArray().toMap) println("標準偏差: " + smap)
accumulatorに関しては、平均点算出のと同様。
最後に各個人・科目の偏差値を算出して表示する。
val score = students.map { s => val ave = amap.value(s.kid)._1 val sco = (s.ten - ave) * 10 / smap.value(s.kid) + 50 s.copy(average = ave, score = sco) } score.foreach(println)
出力結果はこんな感じ。
平均点・人数: Map(合計 -> (278.25,8), 英語 -> (37.375,8), 数学 -> (56.25,8), 理科 -> (60.0,8), 国語 -> (66.5,8), 社会 -> (58.125,8)) 標準偏差: Map(合計 -> 123.2129761835173, 英語 -> 29.01696701931475, 数学 -> 33.04826016600571, 理科 -> 28.83140648667699, 国語 -> 24.68805379125702, 社会 -> 23.443749166888814) Student(101,天才,合計,500,278.25,67.99729272586669) Student(101,天才,数学,100,56.25,63.23821580326409) Student(101,天才,国語,100,66.5,63.56931586558015) Student(101,天才,理科,100,60.0,63.87375951238592) Student(101,天才,社会,100,58.125,67.86190412715338) Student(101,天才,英語,100,37.375,71.58220049611475) 〜 Student(102,ひしだま,合計,315,278.25,52.982640395380386) Student(102,ひしだま,数学,90,56.25,60.21233790537516) Student(102,ひしだま,国語,85,66.5,57.49350279143978) Student(102,ひしだま,理科,80,60.0,56.93687975619296) Student(102,ひしだま,社会,50,58.125,46.534257408164265) Student(102,ひしだま,英語,10,37.375,40.565864453794156) 〜
ソート処理が無いので、出てくる順番はけっこう適当。
出来上がったソースはこちら→SparkStandardScore1.scala
(ちなみに、インデントはEclipse(Scalaプラグイン)のソース整形機能によるもの)
見てみると分かるけど、mainメソッドの中に長々とロジックを書いているので、ちょっと嫌な感じ^^;
というわけでメソッド分割してみた。→SparkStandardScore2.scala
しかしこれ、分割された各メソッドの引数の型を書くのが意外と面倒(苦笑)
valを使って“型を書くのが省略できる”効果を生かすには、一メソッドの中が多くなっちゃっても仕方ないのかなぁ。