Skip to content
Snippets Groups Projects
Commit 3d100ebc authored by hugolan's avatar hugolan
Browse files

approx

parents 9e0e6c3a 8f051ddb
No related branches found
No related tags found
No related merge requests found
...@@ -249,35 +249,41 @@ package object predictions ...@@ -249,35 +249,41 @@ package object predictions
} }
//5 //5
def distributed_knn_approximate(preprocessed_ratings : CSCMatrix[Double], k : Int, spark_context: SparkContext, nbPartitions : Int, replication : Int) : Array[Int] = { def distributed_knn_approximate(preprocessed_ratings : DenseMatrix[Double], k : Int, spark_context: SparkContext, nbPartitions : Int, replication : Int) : CSCMatrix[Double] = {
val new_ratings = new CSCMatrix[Double](ratings.rows, ratings.cols) val new_ratings = new CSCMatrix.Builder[Double](rows=preprocessed_ratings.rows, cols=preprocessed_ratings.cols)
//Seq[Set[Int]
users_partition = partitionUsers (preprocessed_ratings.rows, nbPartitions,replication) val users_partition = partitionUsers (preprocessed_ratings.rows, nbPartitions,replication)
val broadcast = sc.broadcast(preprocessed_ratings.toDense) val broadcast = spark_context.broadcast(preprocessed_ratings)
val approximate_topk = sc.parallelize(users_partition).map(partition_iterator => { val approximate_topk = spark_context.parallelize(users_partition).map(partition_iterator => {
val ratings = broadcast.value val ratings = broadcast.value
val partition = ratings(partition_iterator,::) //val all_users = (0 until ratings.rows).toSeq
val similarities = partition * partition.t //val exclude_users =all_users.diff(partition_iterator.toSeq)
val partition_index = partition.zipWith(Array[Int](partition.rows)).toMap //val partition = ratings.delete(exclude_users,Axis._0)
val slice = ratings(partition_iterator.toSeq.sortWith(_ < _), ::).toDenseMatrix
val similarities = slice * slice.t
val partition_index = (0 until similarities.rows).zip(partition_iterator.toSeq.sortWith(_ < _)).toMap
//TODO how to integrate indexes in knn //TODO how to integrate indexes in knn
val topk = (0 until partition.rows).toList.map(u => similarities(u, ::).t val topk = (0 until similarities.rows).toList.map(u => similarities(u, ::).t
.toArray .toArray
.zip(p.toSeq.sortWith(_ < _)) .zip(partition_iterator.toSeq.sortWith(_ < _))
.sortWith(_._1 > _._1) .sortWith(_._1 > _._1)
.slice(1, k+1) .slice(1, k+1)
.map(v => (u, v._2, v._1))).flatMap(x => x) .map(v => (u, v._2, v._1))).flatMap(x => x)
//val res = topk.map(x => knn(x,k,similarities)) //val res = topk.map(x => knn(x,k,similarities))
res.map{case (u, v, s) => (partition_index(u), v, s)} topk.map{case (u, v, s) => (partition_index(u), v, s)}
//val partition_index = partition.zipWith(Array[Int](partition.rows))
//TODO how to integrate indexes in knn
//val sorted_users =partition_iterator.toArray.sorted
//val topk = partition_iterator.map(x => (x,knn(sorted_users.indexOf(x),k,similarities)))
}).collect() }).collect()
//redo knn //redo knn
//val group = approximate_topk.groupby(_._1).map(x => x._2.map((_._2,_._3)).toArray.sortBy(-_).slice(0,k).map(z => (x._1,z._1,z._2))) //val group = approximate_topk.groupby(_._1).map(x => x._2.map((_._2,_._3)).toArray.sortBy(-_).slice(0,k).map(z => (x._1,z._1,z._2)))
val group = knns.flatMap(x => x) val group = approximate_topk.flatMap(x => x)
.groupBy(x => x._1) .groupBy(x => x._1)
.map(x => x._2.map(y => (y._2, y._3)) .map(x => x._2.map(y => (y._2, y._3))
.toList .toList
...@@ -290,22 +296,19 @@ def distributed_knn_approximate(preprocessed_ratings : CSCMatrix[Double], k : In ...@@ -290,22 +296,19 @@ def distributed_knn_approximate(preprocessed_ratings : CSCMatrix[Double], k : In
new_ratings.add(x._1,x._2,x._3) new_ratings.add(x._1,x._2,x._3)
} }
return new_ratings.result() return new_ratings.result
} }
def knn_with_index((global_user, local_user) : (Int, Int), k: Int, similarities : DenseMatrix[Double]) : Array[Int] = {
//first element is itself so take the tail
return argtopk(similarities(::,user),k+1).toArray.tail
}
def distributed_knn_predictor(spark_context: CSCMatrix[Double], k: Int, spark_context: SparkContext): (Int, Int) => Double = {
def distributed_knn_predictor_approximate(ratings: CSCMatrix[Double], k: Int, spark_context: SparkContext, nbPartitions : Int, replication : Int): (Int, Int) => Double = {
var user_average = compute_user_averages(ratings) var user_average = compute_user_averages(ratings)
var normalized_ratings_ = normalized_ratings(ratings, user_average) var normalized_ratings_ = normalized_ratings(ratings, user_average)
var preprocessed_ratings_ = preprocessed_ratings(normalized_ratings_) var preprocessed_ratings_ = preprocessed_ratings(normalized_ratings_)
val similarities_knn = distributed_knn_approximate(preprocessed_ratings_ , k, spark_context) val similarities_knn = distributed_knn_approximate(preprocessed_ratings_ , k, spark_context, nbPartitions, replication)
val item_deviations = compute_item_deviations(ratings, normalized_ratings_, similarities_knn) val item_deviations = compute_item_deviations(ratings, normalized_ratings_, similarities_knn.toDense)
return (u : Int, i : Int) => predict(user_average(u), item_deviations(u,i)) return (u : Int, i : Int) => predict(user_average(u), item_deviations(u,i))
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment