diff --git a/run.sh b/run.sh index 3100e84374d439cbb5ee6b1ac16616113cc30aef..daf7e2ed1c2d8859061d8b8ebe4c7d884fb5c22c 100755 --- a/run.sh +++ b/run.sh @@ -9,7 +9,7 @@ mkdir -p $RUN LOGS=$RUN/log.txt source ./config.sh echo "------------------- OPTIMIZING ---------------------" >> $LOGS -sbt "runMain scaling.Optimizing --train $ML100Ku2base --test $ML100Ku2test --json $RUN/optimizing-100k.json --users 943 --movies 1682" 2>&1 >>$LOGS +sbt "runMain scaling.Optimizing --train $ML100Ku2base --test $ML100Ku2test --json $RUN/optimizing-100k.json --users 943 --movies 1682 --master local[1]" 2>&1 >>$LOGS echo "------------------- DISTRIBUTED EXACT ---------------------" >> $LOGS sbt "runMain distributed.Exact --train $ML100Ku2base --test $ML100Ku2test --json $RUN/exact-100k-4.json --k 10 --master local[4] --users 943 --movies 1682" 2>&1 >>$LOGS sbt "runMain distributed.Exact --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/exact-1m-4.json --k 300 --master local[4] --users 6040 --movies 3952" 2>&1 >>$LOGS diff --git a/src/main/scala/distributed/Approximate.scala b/src/main/scala/distributed/Approximate.scala index a4c4ca601099f370ad3740a1541e8bb8a6fc91ea..dac81bb42efc36b202259087330f65377e195c55 100644 --- a/src/main/scala/distributed/Approximate.scala +++ b/src/main/scala/distributed/Approximate.scala @@ -54,8 +54,8 @@ object Approximate { val conf_k = conf.k() println("Loading training data") - val train = load(conf.train(), conf.separator(), conf.users(), conf.movies()) - val test = load(conf.test(), conf.separator(), conf.users(), conf.movies()) + val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies()) + val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies()) var knn : CSCMatrix[Double] = null println("Partitioning users") diff --git a/src/main/scala/distributed/Exact.scala b/src/main/scala/distributed/Exact.scala index 94dcad37dea73eeedba52e68b49e250d53ba2738..b2b6ebfcf2ea7d876d8d11ba8484223ddd943a2c 100644 --- a/src/main/scala/distributed/Exact.scala +++ b/src/main/scala/distributed/Exact.scala @@ -52,8 +52,8 @@ object Exact { val conf_k = conf.k() println("Loading training data from: " + conf.train()) - val train = load(conf.train(), conf.separator(), conf.users(), conf.movies()) - val test = load(conf.test(), conf.separator(), conf.users(), conf.movies()) + val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies()) + val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies()) val measurements = (1 to scala.math.max(1,conf.num_measurements())).map(_ => timingInMs( () => { 0.0 diff --git a/src/main/scala/optimizing/Optimizing.scala b/src/main/scala/optimizing/Optimizing.scala index 3432900572753e62e536509cd80accdd5bb8737a..0afcc5f8e486b04b55e6baa01212f010a9714aa5 100644 --- a/src/main/scala/optimizing/Optimizing.scala +++ b/src/main/scala/optimizing/Optimizing.scala @@ -6,6 +6,10 @@ import scala.collection.mutable.ArrayBuffer import ujson._ import shared.predictions._ +import org.apache.spark.sql.SparkSession +import org.apache.log4j.Logger +import org.apache.log4j.Level + package scaling { class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { @@ -15,6 +19,7 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { val users = opt[Int]() val movies = opt[Int]() val separator = opt[String](default=Some("\t")) + val master = opt[String]() val num_measurements = opt[Int](default=Some(1)) verify() } @@ -25,10 +30,20 @@ object Optimizing extends App { // will be serialized with the parallelize implementations val conf_users = conf.users() val conf_movies = conf.movies() + + // Remove these lines if encountering/debugging Spark + Logger.getLogger("org").setLevel(Level.OFF) + Logger.getLogger("akka").setLevel(Level.OFF) + val spark = conf.master.toOption match { + case None => SparkSession.builder().getOrCreate(); + case Some(master) => SparkSession.builder().master(master).getOrCreate(); + } + spark.sparkContext.setLogLevel("ERROR") + val sc = spark.sparkContext println("Loading training data from: " + conf.train()) - val train = load(conf.train(), conf.separator(), conf.users(), conf.movies()) - val test = load(conf.test(), conf.separator(), conf.users(), conf.movies()) + val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies()) + val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies()) val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => { 0.0 @@ -53,6 +68,7 @@ object Optimizing extends App { "test" -> ujson.Str(conf.test()), "users" -> ujson.Num(conf.users()), "movies" -> ujson.Num(conf.movies()), + "master" -> ujson.Str(conf.master()), "num_measurements" -> ujson.Num(conf.num_measurements()) ), "BR.1" -> ujson.Obj( diff --git a/src/main/scala/shared/predictions.scala b/src/main/scala/shared/predictions.scala index 63485e37174cd3daf42ae99f7b92b16dcdf16dc6..09f08c9f456a241e89aab1a0731d7bed5bbcbbc1 100644 --- a/src/main/scala/shared/predictions.scala +++ b/src/main/scala/shared/predictions.scala @@ -51,6 +51,34 @@ package object predictions builder.result() } + def loadSpark(sc : org.apache.spark.SparkContext, path : String, sep : String, nbUsers : Int, nbMovies : Int) : CSCMatrix[Double] = { + val file = sc.textFile(path) + val ratings = file + .map(l => { + val cols = l.split(sep).map(_.trim) + toInt(cols(0)) match { + case Some(_) => Some(((cols(0).toInt-1, cols(1).toInt-1), cols(2).toDouble)) + case None => None + } + }) + .filter({ case Some(_) => true + case None => false }) + .map({ case Some(x) => x + case None => ((-1, -1), -1) }).collect() + + val builder = new CSCMatrix.Builder[Double](rows=nbUsers, cols=nbMovies) + for ((k,v) <- ratings) { + v match { + case d: Double => { + val u = k._1 + val i = k._2 + builder.add(u, i, d) + } + } + } + return builder.result + } + def partitionUsers (nbUsers : Int, nbPartitions : Int, replication : Int) : Seq[Set[Int]] = { val r = new scala.util.Random(1337) val bins : Map[Int, collection.mutable.ListBuffer[Int]] = (0 to (nbPartitions-1))