Newer
Older
import org.rogach.scallop._
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import ujson._
import shared.predictions._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
package scaling {
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val json = opt[String]()
val users = opt[Int]()
val movies = opt[Int]()
val separator = opt[String](default=Some("\t"))
val num_measurements = opt[Int](default=Some(1))
verify()
}
object Optimizing extends App {
var conf = new Conf(args)
// conf object is not serializable, extract values that
// will be serialized with the parallelize implementations
val conf_users = conf.users()
val conf_movies = conf.movies()
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = conf.master.toOption match {
case None => SparkSession.builder().getOrCreate();
case Some(master) => SparkSession.builder().master(master).getOrCreate();
}
spark.sparkContext.setLogLevel("ERROR")
val sc = spark.sparkContext
val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies())
val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies())
val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
0.0
}))
val timings = measurements.map(t => t._2)
val mae = measurements(0)._1
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"train" -> ujson.Str(conf.train()),
"test" -> ujson.Str(conf.test()),
"users" -> ujson.Num(conf.users()),
"movies" -> ujson.Num(conf.movies()),
"master" -> ujson.Str(conf.master()),
"num_measurements" -> ujson.Num(conf.num_measurements())
),
"BR.1" -> ujson.Obj(
"1.k10u1v1" -> ujson.Num(0.0),
"2.k10u1v864" -> ujson.Num(0.0),
"3.k10u1v886" -> ujson.Num(0.0),
"4.PredUser1Item1" -> ujson.Num(0.0),
"5.PredUser327Item2" -> ujson.Num(0.0),
"6.Mae" -> ujson.Num(0.0)
),
"BR.2" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
}
}