package sample1 //Scala2.9.0-1 //Spark0.3 import spark._ import SparkContext._ /** * 個人の科目別の点数・偏差値 */ case class Student( val sid: String, val sname: String, val kid: String, val ten: Int, val average: Double = 0, val score: Double = 0) /** * 偏差値算出 */ object SparkStandardScore { private val knames = "数学,国語,理科,社会,英語".split(",") private def kid2kname(kid: Int): String = knames(kid) private implicit def MapAccumulator[K, V] = new AccumulatorParam[Map[K, V]] { type T = Map[K, V] def addInPlace(t1: T, t2: T): T = { //2スレッドの合流時に呼ばれるっぽい if (t1 == t2) t1 else throw new UnsupportedOperationException("t1=" + t1 + ", t2=" + t2) } def zero(initialValue: T): T = initialValue } def main(args: Array[String]) { val host = "local[2]" val spark = new SparkContext(host, "SparkStandardScore") //ファイル読み込み val fname = "file://C:/temp/ten.utf8.txt" val file = spark.textFile(fname) val students = file .flatMap { line => if (line.startsWith("#")) Seq.empty else { val arr = line.split(",") val sid = arr(0).trim val sname = arr(1).trim var kid = -1 var sum = 0 val list = for (t <- arr.drop(2)) yield { kid += 1 val ten = t.trim.toInt sum += ten Student(sid, sname, kid2kname(kid), ten) } Student(sid, sname, "合計", sum) +: list } }.cache() //平均点算出 val average = students.map { s => (s.kid, (s.ten, 1)) } .reduceByKey { (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2) }.map { case (kid, (sum, count)) => (kid, (sum.toDouble / count, count)) } val amap = spark.accumulator(average.toArray().toMap) println("平均点・人数: " + amap) //標準偏差算出 val sigma = students.map { s => val ave = amap.value(s.kid)._1 (s.kid, scala.math.pow(s.ten - ave, 2)) }.reduceByKey(_ + _) .map { t => val kid = t._1 val count = amap.value(kid)._2 (kid, scala.math.sqrt(t._2 / count)) } val smap = spark.accumulator(sigma.toArray().toMap) println("標準偏差: " + smap) //各個人の偏差値算出 val score = students.map { s => val ave = amap.value(s.kid)._1 val sco = (s.ten - ave) * 10 / smap.value(s.kid) + 50 s.copy(average = ave, score = sco) } score.foreach(println) } }