package shared import breeze.linalg._ import breeze.numerics._ import scala.io.Source import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkContext package object predictions { // ------------------------ For template case class Rating(user: Int, item: Int, rating: Double) def timingInMs(f : ()=>Double ) : (Double, Double) = { val start = System.nanoTime() val output = f() val end = System.nanoTime() return (output, (end-start)/1000000.0) } def toInt(s: String): Option[Int] = { try { Some(s.toInt) } catch { case e: Exception => None } } def mean(s :Seq[Double]): Double = if (s.size > 0) s.reduce(_+_) / s.length else 0.0 def std(s :Seq[Double]): Double = { if (s.size == 0) 0.0 else { val m = mean(s) scala.math.sqrt(s.map(x => scala.math.pow(m-x, 2)).sum / s.length.toDouble) } } def load(path : String, sep : String, nbUsers : Int, nbMovies : Int) : CSCMatrix[Double] = { val file = Source.fromFile(path) val builder = new CSCMatrix.Builder[Double](rows=nbUsers, cols=nbMovies) for (line <- file.getLines) { val cols = line.split(sep).map(_.trim) toInt(cols(0)) match { case Some(_) => builder.add(cols(0).toInt-1, cols(1).toInt-1, cols(2).toDouble) case None => None } } file.close builder.result() } def loadSpark(sc : org.apache.spark.SparkContext, path : String, sep : String, nbUsers : Int, nbMovies : Int) : CSCMatrix[Double] = { val file = sc.textFile(path) val ratings = file .map(l => { val cols = l.split(sep).map(_.trim) toInt(cols(0)) match { case Some(_) => Some(((cols(0).toInt-1, cols(1).toInt-1), cols(2).toDouble)) case None => None } }) .filter({ case Some(_) => true case None => false }) .map({ case Some(x) => x case None => ((-1, -1), -1) }).collect() val builder = new CSCMatrix.Builder[Double](rows=nbUsers, cols=nbMovies) for ((k,v) <- ratings) { v match { case d: Double => { val u = k._1 val i = k._2 builder.add(u, i, d) } } } return builder.result } def partitionUsers (nbUsers : Int, nbPartitions : Int, replication : Int) : Seq[Set[Int]] = { val r = new scala.util.Random(1337) val bins : Map[Int, collection.mutable.ListBuffer[Int]] = (0 to (nbPartitions-1)) .map(p => (p -> collection.mutable.ListBuffer[Int]())).toMap (0 to (nbUsers-1)).foreach(u => { val assignedBins = r.shuffle(0 to (nbPartitions-1)).take(replication) for (b <- assignedBins) { bins(b) += u } }) bins.values.toSeq.map(_.toSet) } }