Skip to content
Snippets Groups Projects
Commit 7ad58cf1 authored by Erick Lavoie's avatar Erick Lavoie
Browse files

Updated loading function to use spark

parent a1d503bf
No related branches found
No related tags found
No related merge requests found
...@@ -9,7 +9,7 @@ mkdir -p $RUN ...@@ -9,7 +9,7 @@ mkdir -p $RUN
LOGS=$RUN/log.txt LOGS=$RUN/log.txt
source ./config.sh source ./config.sh
echo "------------------- OPTIMIZING ---------------------" >> $LOGS echo "------------------- OPTIMIZING ---------------------" >> $LOGS
sbt "runMain scaling.Optimizing --train $ML100Ku2base --test $ML100Ku2test --json $RUN/optimizing-100k.json --users 943 --movies 1682" 2>&1 >>$LOGS sbt "runMain scaling.Optimizing --train $ML100Ku2base --test $ML100Ku2test --json $RUN/optimizing-100k.json --users 943 --movies 1682 --master local[1]" 2>&1 >>$LOGS
echo "------------------- DISTRIBUTED EXACT ---------------------" >> $LOGS echo "------------------- DISTRIBUTED EXACT ---------------------" >> $LOGS
sbt "runMain distributed.Exact --train $ML100Ku2base --test $ML100Ku2test --json $RUN/exact-100k-4.json --k 10 --master local[4] --users 943 --movies 1682" 2>&1 >>$LOGS sbt "runMain distributed.Exact --train $ML100Ku2base --test $ML100Ku2test --json $RUN/exact-100k-4.json --k 10 --master local[4] --users 943 --movies 1682" 2>&1 >>$LOGS
sbt "runMain distributed.Exact --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/exact-1m-4.json --k 300 --master local[4] --users 6040 --movies 3952" 2>&1 >>$LOGS sbt "runMain distributed.Exact --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/exact-1m-4.json --k 300 --master local[4] --users 6040 --movies 3952" 2>&1 >>$LOGS
......
...@@ -54,8 +54,8 @@ object Approximate { ...@@ -54,8 +54,8 @@ object Approximate {
val conf_k = conf.k() val conf_k = conf.k()
println("Loading training data") println("Loading training data")
val train = load(conf.train(), conf.separator(), conf.users(), conf.movies()) val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies())
val test = load(conf.test(), conf.separator(), conf.users(), conf.movies()) val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies())
var knn : CSCMatrix[Double] = null var knn : CSCMatrix[Double] = null
println("Partitioning users") println("Partitioning users")
......
...@@ -52,8 +52,8 @@ object Exact { ...@@ -52,8 +52,8 @@ object Exact {
val conf_k = conf.k() val conf_k = conf.k()
println("Loading training data from: " + conf.train()) println("Loading training data from: " + conf.train())
val train = load(conf.train(), conf.separator(), conf.users(), conf.movies()) val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies())
val test = load(conf.test(), conf.separator(), conf.users(), conf.movies()) val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies())
val measurements = (1 to scala.math.max(1,conf.num_measurements())).map(_ => timingInMs( () => { val measurements = (1 to scala.math.max(1,conf.num_measurements())).map(_ => timingInMs( () => {
0.0 0.0
......
...@@ -6,6 +6,10 @@ import scala.collection.mutable.ArrayBuffer ...@@ -6,6 +6,10 @@ import scala.collection.mutable.ArrayBuffer
import ujson._ import ujson._
import shared.predictions._ import shared.predictions._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
package scaling { package scaling {
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
...@@ -15,6 +19,7 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { ...@@ -15,6 +19,7 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val users = opt[Int]() val users = opt[Int]()
val movies = opt[Int]() val movies = opt[Int]()
val separator = opt[String](default=Some("\t")) val separator = opt[String](default=Some("\t"))
val master = opt[String]()
val num_measurements = opt[Int](default=Some(1)) val num_measurements = opt[Int](default=Some(1))
verify() verify()
} }
...@@ -25,10 +30,20 @@ object Optimizing extends App { ...@@ -25,10 +30,20 @@ object Optimizing extends App {
// will be serialized with the parallelize implementations // will be serialized with the parallelize implementations
val conf_users = conf.users() val conf_users = conf.users()
val conf_movies = conf.movies() val conf_movies = conf.movies()
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = conf.master.toOption match {
case None => SparkSession.builder().getOrCreate();
case Some(master) => SparkSession.builder().master(master).getOrCreate();
}
spark.sparkContext.setLogLevel("ERROR")
val sc = spark.sparkContext
println("Loading training data from: " + conf.train()) println("Loading training data from: " + conf.train())
val train = load(conf.train(), conf.separator(), conf.users(), conf.movies()) val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies())
val test = load(conf.test(), conf.separator(), conf.users(), conf.movies()) val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies())
val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => { val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
0.0 0.0
...@@ -53,6 +68,7 @@ object Optimizing extends App { ...@@ -53,6 +68,7 @@ object Optimizing extends App {
"test" -> ujson.Str(conf.test()), "test" -> ujson.Str(conf.test()),
"users" -> ujson.Num(conf.users()), "users" -> ujson.Num(conf.users()),
"movies" -> ujson.Num(conf.movies()), "movies" -> ujson.Num(conf.movies()),
"master" -> ujson.Str(conf.master()),
"num_measurements" -> ujson.Num(conf.num_measurements()) "num_measurements" -> ujson.Num(conf.num_measurements())
), ),
"BR.1" -> ujson.Obj( "BR.1" -> ujson.Obj(
......
...@@ -51,6 +51,34 @@ package object predictions ...@@ -51,6 +51,34 @@ package object predictions
builder.result() builder.result()
} }
def loadSpark(sc : org.apache.spark.SparkContext, path : String, sep : String, nbUsers : Int, nbMovies : Int) : CSCMatrix[Double] = {
val file = sc.textFile(path)
val ratings = file
.map(l => {
val cols = l.split(sep).map(_.trim)
toInt(cols(0)) match {
case Some(_) => Some(((cols(0).toInt-1, cols(1).toInt-1), cols(2).toDouble))
case None => None
}
})
.filter({ case Some(_) => true
case None => false })
.map({ case Some(x) => x
case None => ((-1, -1), -1) }).collect()
val builder = new CSCMatrix.Builder[Double](rows=nbUsers, cols=nbMovies)
for ((k,v) <- ratings) {
v match {
case d: Double => {
val u = k._1
val i = k._2
builder.add(u, i, d)
}
}
}
return builder.result
}
def partitionUsers (nbUsers : Int, nbPartitions : Int, replication : Int) : Seq[Set[Int]] = { def partitionUsers (nbUsers : Int, nbPartitions : Int, replication : Int) : Seq[Set[Int]] = {
val r = new scala.util.Random(1337) val r = new scala.util.Random(1337)
val bins : Map[Int, collection.mutable.ListBuffer[Int]] = (0 to (nbPartitions-1)) val bins : Map[Int, collection.mutable.ListBuffer[Int]] = (0 to (nbPartitions-1))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment