diff --git a/Milestone-1-QA-template.tex b/Milestone-1-QA-template.tex
deleted file mode 100644
index fa3d99db295354743cc684885336a749e89a9945..0000000000000000000000000000000000000000
--- a/Milestone-1-QA-template.tex
+++ /dev/null
@@ -1,149 +0,0 @@
-\documentclass{article}
-\usepackage{hyperref}
-\usepackage{algorithm}
-\usepackage{algpseudocode}
-\usepackage{ dsfont }
-\usepackage{amsmath}
-\usepackage{filemod}
-\usepackage{ulem}
-\usepackage{graphicx}
-\usepackage{todonotes}
-
-\newcommand{\MilestoneOneGlobalDeviationEq}{4~}
-\newcommand{\MilestoneOneBaselineEq}{5~}
-\newcommand{\MilestoneOneComputingTime}{3.1.5~}
-
-% If you use BibTeX in apalike style, activate the following line:
-\bibliographystyle{acm}
-
-\title{CS-449 Project Milestone 1: Personalized Recommender with k-NN}
-
-\author{
-\textbf{Name}: xxx\\
-\textbf{Sciper}: xxx\\
-\textbf{Email:} xxx\\
-\textbf{Name}: xxx\\
-\textbf{Sciper}: xxx\\
-\textbf{Email:} xxx\\
-}
-
-\begin{document}
-\maketitle
-
-\section{Motivation: Movie Recommender}
-(No Q)
-\section{Proxy Problem: Predicting Ratings}
-(No Q)
-
-\section{Baseline: Prediction based on Global Average Deviation}
-\subsection{Questions}
-\label{section:q1}
-
-Implement the previous prediction methods using Scala's standard library, without using Spark.
-
-\begin{equation}
-    \label{eq:baseline}
-    p_{u,i} = \bar r_{u,\bullet} + \bar{\hat r}_{\bullet,i} * scale( (\bar r_{u,\bullet} + \bar{\hat r}_{\bullet,i}), \bar r_{u,\bullet})
-\end{equation}
-  
-
-\begin{itemize}
-  % python stats.py data/ml-100k/u.data
-  
-    \item[\textbf{B.1}] \textit{Compute and output the global average rating ($\bar r_{\bullet,\bullet}$), the average rating for user 1 ($\bar r_{1,\bullet}$),  the average rating for item 1 ($\bar r_{\bullet,1}$), the average deviation for item 1 ($\bar{\hat r}_{\bullet,1}$), and the predicted rating of user 1 for item 1 ($p_{1,1}$, Eq~\ref{eq:baseline}) using \texttt{data/ml-100k/u2.base} for training. When computing the item average for items that do not have ratings in the training set, use the global average ($\bar r_{\bullet, \bullet}$). When making predictions for items that are not in the training set, use the user average if present, otherwise the global average.}
-  
-    \item [\textbf{B.2}] \textit{Compute the prediction accuracy (average MAE on \texttt{ml-100k/u2.test}) of the previous methods ($\bar r_{\bullet, \bullet}$, $\bar r_{u,\bullet}$, $\bar r_{\bullet,i}$) and that of the proposed baseline ($p_{u,i}$, Eq.~\ref{eq:baseline}). }
-
-  \item [\textbf{B.3}] \textit{Measure the time required for computing the MAE for all ratings in the test set (\texttt{ml-100k/u2.test}) with all four methods by recording the current time before and after (ex: with \texttt{System.nanoTime()} in Scala). The duration is the difference between the two. } 
- 
- \textit{ 
-Include the time for computing all values required to obtain the answer from the input dataset provided in the template: recompute from scratch all necessary values even if they are available after computing previous results (ex: global average $\bar r_{\bullet, \bullet}$). Also ensure you store the results in some auxiliary data structure (ex: $\texttt{Seq[(mae, timing)]}$) as you are performing measurements to ensure the compiler won't optimize away the computations that would otherwise be unnecessary.}
-
-\textit{
- For all four methods, perform three measurements and compute the average and standard-deviation.}
- 
- \textit{In your report, show in a figure the relationship between prediction precision (MAE) on the x axis, and the computation time on the y axis including the standard-deviation. Report also the technical specifications (model, CPU speed, RAM, OS, Scala language version, and JVM version) of the machine on which you ran the tests. Which of the four prediction methods is the most expensive to compute? Is the relationship between MAE and computation linear? What do you conclude on the computing needs of more accurate prediction methods?}
-
-\end{itemize}
-
-\section{Spark Distribution Overhead}
-
-\subsection{Questions}
-\label{section:q5}
-
-Implement $p_{u,i}$ using Spark RDDs. Your distributed implementation should give the same results as your previous implementation using Scala's standard library. Once your implementation works well with \texttt{data/ml-100k/u2.base} and \texttt{data/ml-100k/u2.test}, stress test its performance with the bigger \newline \texttt{data/ml-25m/r2.train} and \texttt{data/ml-25m/r2.test}. 
-
-\begin{itemize}
-  
-   \item [\textbf{D.1}] \textit{Ensure the results of your distributed implementation are consistent with \textbf{B.1} and \textbf{B.2} on  \texttt{data/ml-100k/u2.train} and \texttt{data/ml-100k/u2.test}. Compute and output the global average rating ($\bar r_{\bullet,\bullet}$), the average rating for user 1 ($\bar r_{1,\bullet}$),  the average rating for item 1 ($\bar r_{\bullet,1}$), the average deviation for item 1 ($\bar{\hat r}_{\bullet,1}$), and the predicted rating of user 1 for item 1 ($p_{1,1}$, Eq~\ref{eq:baseline}). Compute the prediction accuracy (average MAE on \texttt{ml-100k/u2.test}) of the proposed baseline ($p_{u,i}$, Eq.~\ref{eq:baseline}). } 
-  
-    \item [\textbf{D.2}] \textit{Measure the combined time to (1) pre-compute the required baseline values for predictions and (2) to predict all values of the test set on the 25M dataset, \texttt{data/ml-25m/r2.train} and \texttt{data/ml-25m/r2.test}. Compare the time required by your implementation using Scala's standard library (\textbf{B.1} and \textbf{B.2}) on your machine, and your new distributed implementation using Spark on \texttt{iccluster028}. Use 1 and 4 executors for Spark and repeat all three experiments (predict.Baseline, distributed.Baseline 1 worker, distributed.Baseline 4 workers) 3 times. Write in your report the average and standard deviation for all three experiments, as well as the specification of the machine on which you ran the tests (similar to B.3).}
-    
-    \textit{As a comparison, our reference implementation runs in 44s on the cluster with 4 workers. Ensure you obtain results roughly in the same ballpark or faster. Don't worry if your code is slower during some executions because the cluster is busy.}
-    
-    \textit{Try optimizing your local Scala implementation by avoiding temporary objects, instead preferring the use of mutable collations and data structures. Can you make it faster, running locally on your machine without Spark, than on the cluster with 4 workers? Explain the changes you have made to make your code faster in your report.}
-  
-\end{itemize}
-
-\section{\textit{Personalized} Predictions}
-
-\subsection{Questions}
-\label{section:q1}
-
-\begin{equation}
-    \label{eq:similarity}
-    %s_{u,v} = \frac{\sum_{r_{u,i},r_{v,i} \in \text{Train}} \hat r_{u,i} * \hat r_{v,i}}
-    %                     {\sum_{r_{u,i},r_{v,i} \in \text{Train}} | \hat r_{u,i}| * |\hat r_{v,i}|}
-    s_{u,v} = \begin{cases}
-                   \frac{\sum_{i \in (I(u) \cap I(v))} \hat r_{u,i} * \hat r_{v,i}}
-                         { \sqrt{\sum_{i \in I(u)} {(\hat r_{u,i})}^{2}} * \sqrt{\sum_{i \in I(v)} {(\hat r_{v,i})}^{2}}} &
-                                    (I(u) \cup I(v)) \neq \emptyset;
-                                \exists_{i \in I(u)} \hat r_{u,i} \neq 0; 
-                                 \exists_{i \in I(v)} \hat r_{v,i} \neq 0 \\
-                         0 & \text{otherwise}
-                    \end{cases}
-\end{equation}
-
-
-\begin{equation}
-    \label{eq:personalized-prediction}
-    p_{u,i} = \bar r_{u,\bullet} + \bar{\hat r}_{\bullet,i}(u) * scale( (\bar r_{u,\bullet} + \bar{\hat r}_{\bullet,i}(u)), \bar r_{u,\bullet})
-\end{equation}
-
-  
-\begin{itemize}
-    \item [\textbf{P.1}] \textit{Using uniform similarities of 1 between all users, compute the predicted rating of user 1 for item 1 ($p_{1,1}$) and the prediction accuracy (MAE on \texttt{ml-100k/u2.test}) of the personalized baseline predictor.} 
-    
-    \item [\textbf{P.2}] \textit{Using the the adjusted cosine similarity (Eq.~\ref{eq:similarity}), compute the similarity between user $1$ and user $2$ ($s_{1,2}$), the predicted rating of user 1 for item 1 ($p_{1,1}$ Eq.~\ref{eq:personalized-prediction}) and the prediction accuracy (MAE on \texttt{ml-100k/u2.test}) of the personalized baseline predictor.} 
-    
-        \item [\textbf{P.3}] \textit{Implement the Jaccard Coefficient\footnote{\url{https://en.wikipedia.org/wiki/Jaccard_index}}. Provide the mathematical formulation of your similarity metric in your report. User the jaccard similarity, compute the similarity between user $1$ and user $2$ ($s_{1,2}$), the predicted rating of user 1 for item 1 ($p_{1,1}$ Eq.~\ref{eq:personalized-prediction}) and the prediction accuracy (MAE on \texttt{ml-100k/u2.test}) of the personalized baseline predictor. Is the Jaccard Coefficient better or worst than Adjusted Cosine similarity?}
-\end{itemize}
-
-\section{Neighbourhood-Based Predictions}
-
-
-\subsection{Questions}
-\label{section:q2}
-
-\begin{itemize}    
-        \item [\textbf{N.1}] \textit{Implement the k-NN predictor. Do not include self-similarity in the k-nearest neighbours. Using $k=10$,  \texttt{data/ml-100k/u2.base} for training output the similarities between: (1) user $1$ and itself; (2) user $1$ and user $864$; (3) user $1$ and user $886$. Still using $k=10$, output the prediction for user 1 and item 1 ($p_{1,1}$), and make sure that you obtain an MAE of $0.8287 \pm 0.0001$ on \texttt{data/ml-100k/u2.test}.} 
-    
-    \item [\textbf{N.2}] \textit{Report the MAE on \texttt{data/ml-100k/u2.test} for $k = {10, 30, 50, 100, 200, 300, 400, 800, 943}$. What is the lowest $k$ such that the MAE is lower than for the baseline (non-personalized) method?} 
-    
-     \item [\textbf{N.3}] \label{q-total-time} \textit{Measure the time required for computing predictions (without using Spark) on \texttt{data/ml-100k/u2.test}. Include the time to train the predictor on \newline \texttt{data/ml-100k/u2.base} including computing the similarities $s_{u,v}$ and using $k=300$. Try reducing the computation time with alternative implementation techniques (making sure you keep obtaining the same results). Mention in your report which alternatives you tried,  which ones were fastest, and by how much. The teams with the correct answer and shortest times on a secret test set will obtain more points on this question.}
-\end{itemize}
-
-
-\section{Recommendation}
-
-\subsection{Questions}
-\label{section:q4}
-
-\begin{itemize} 
-   \item [\textbf{R.1}] \textit{Train a k-NN predictor with training data from \texttt{data/ml-100k/u.data}, augmented with additional ratings from user "$944$" provided in \texttt{personal.csv}, using adjusted cosine similarity and $k=300$. Report the prediction for user 1 item 1 ($p_{1,1}$)}.
-
-  \item [\textbf{R.2}] \textit{Report the top 3 recommendations for user "$944$" using the same k-NN predictor as for \textbf{R.1}.  Include the movie identifier, the movie title, and the prediction score in the output. If additional recommendations have the same predicted value as the top 3 recommendations, prioritize the movies with the smallest identifiers in your top 3 (ex: if the top 8 recommendations all have predicted scores of \texttt{5.0}, choose the top 3 with the smallest ids.) so your results do not depend on the initial permutation of the recommendations.}
-\end{itemize}
-
-
-\end{document}
diff --git a/Milestone-1.pdf b/Milestone-1.pdf
deleted file mode 100644
index 0d74ac6cba215d70cc3eb62fd8814da6cb6cc876..0000000000000000000000000000000000000000
Binary files a/Milestone-1.pdf and /dev/null differ
diff --git a/README.md b/README.md
index 6191a8ba4e5b1af56830aa92bcf6473a466c39a7..4752d19c11148369a5342f7a725d501ade4b7cf4 100644
--- a/README.md
+++ b/README.md
@@ -1,10 +1,10 @@
 # Milestone Description
 
-[Milestone-1.pdf](./Milestone-1.pdf)
+[To Be Released](./Milestone-2.pdf)
 
 Note: Section 'Updates' lists the updates since the original release of the Milestone.
 
-Mu has prepared a report template for your convenience here: [Report Template](./Milestone-1-QA-template.tex).
+Mu has prepared a report template for your convenience here: [Report Template](./Milestone-2-QA-template.tex).
 
 # Dependencies
 
@@ -28,33 +28,16 @@ openjdk@8 installed through Homebrew, you would do:
 
 # Dataset
 
-Download [data.zip](https://gitlab.epfl.ch/sacs/cs-449-sds-public/project/dataset/-/raw/main/data.zip).
+Download [data-m2.zip](https://gitlab.epfl.ch/sacs/cs-449-sds-public/project/dataset/-/raw/main/data-m2.zip).
 
 Unzip:
 ````
-> unzip data.zip
+> unzip data-m2.zip
 ````
 
-It should unzip into ````data/```` by default. If not, manually move ````ml-100k```` and ````ml-25m```` into ````data/````.
+It should unzip into ````data/```` by default. If not, manually move ````ml-100k```` and ````ml-1m```` into ````data/````.
 
 
-# Personal Ratings
-
-Additional personal ratings are provided in the 'data/personal.csv' file in a
-csv format with ````<movie>, <movie title>, <rating>```` to test your recommender.
-You can copy this file and change the ratings, with values [1,5] to obtain
-references more to your liking!
-
-Entries with no rating are in the following format:
-````
-1,Toy Story (1995),
-````
-
-Entries with ratings are in the following format:
-````
-1,Toy Story (1995),5
-````
-
 # Repository Structure
 
 ````src/main/scala/shared/predictions.scala````:
@@ -63,11 +46,10 @@ This code should then be used in the following applications and tests.
 
 ## Applications
 
-    1. ````src/main/scala/predict/Baseline.scala````: Output answers to questions **B.X**.
-    2. ````src/main/scala/distributed/DistributedBaseline.scala````: Output answers to questions **D.X**.
-    3. ````src/main/scala/predict/Personalized.scala````: Output answers to questions questions **P.X**.
-    4. ````src/main/scala/predict/kNN.scala````: Output answers to questions questions **N.X**.
-    5. ````src/main/scala/recommend/Recommender.scala````: Output answers to questions questions **R.X**.
+    1. ````src/main/scala/optimizing/Optimizing.scala````: Output answers to questions **BR.X**.
+    2. ````src/main/scala/distributed/Exact.scala````: Output answers to questions **EK.X**.
+    3. ````src/main/scala/distributed/Approximate.scala````: Output answers to questions **AK.X**.
+    4. ````src/main/scala/economics/Economics.scala````: Output answers to questions **E.X**
 
 Applications are separate from tests to make it easier to test with different
 inputs and permit outputting your answers and timings in JSON format for easier
@@ -75,14 +57,12 @@ grading.
 
 ## Unit Tests
 
-Corresponding unit tests for each application:
+Corresponding unit tests for each application (except Economics.scala):
 
 ````
-    src/test/scala/predict/BaselineTests.scala
-    src/test/scala/distributed/DistributedBaselineTests.scala
-    src/test/scala/predict/PersonalizedTests.scala
-    src/test/scala/predict/kNNTests.scala
-    src/test/scala/recommend/RecommenderTests.scala
+    src/test/scala/optimizing/OptimizingTests.scala
+    src/test/scala/distributed/ExactTests.scala
+    src/test/scala/distributed/ApproximateTests.scala
 ````
 
 Your tests should demonstrate how to call your code to obtain the answers of
@@ -103,44 +83,28 @@ You should fill all tests and ensure they all succeed prior to submission.
 
 ## Run applications 
 
-### Baseline
-
-On ````ml-100k````:
-````
-    sbt "runMain predict.Baseline --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json baseline-100k.json"
-````
-
-On ````ml-25m````:
-````
-    sbt "runMain predict.Baseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --separator , --json baseline-25m.json"
-````
-
-### Distributed Baseline
+### Optimizing
 
 ````
-    sbt "runMain distributed.DistributedBaseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test  --separator , --json distributed-25m-4.json --master local[4]"
+sbt "runMain scaling.Optimizing --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json optimizing-100k.json --users 943 --movies 1682"
 ````
 
-You can vary the number of executors used locally by using ````local[X]```` with X being an integer representing the number of cores you want to use locally.
-
-You can vary the number of executors on the cluster by disabling dynamic allocation with ````--conf "spark.dynamicAllocation.enabled=false"````, using ````--master yarn````, and setting the number of executors with ````--num-executors X````.
-
-### Personalized
+### Parallel Exact KNN
 
 ````
-    sbt "runMain predict.Personalized --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json personalized-100k.json"
+sbt "runMain distributed.Exact --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json exact-100k-4.json --k 10 --master local[4] --users 943 --movies 1682"
 ````
 
-### kNN
+### Approximate KNN
 
 ````
-    sbt "runMain predict.kNN --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json knn-100k.json"
+sbt "runMain distributed.Approximate --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json approximate-100k-4-k10-r2.json --k 10 --master local[4] --users 943 --movies 1682 --partitions 10 --replication 2"
 ````
 
-### Recommender
+### Economics
 
 ````
-    sbt "runMain recommend.Recommender --data data/ml-100k/u.data --personal data/personal.csv --json recommender-100k.json"
+sbt "runMain economics.Economics --json economics.json"
 ````
 
 ## Time applications
@@ -158,7 +122,7 @@ for other students.
 ````sbt clean````: clean up temporary files and previous assembly packages.
 
 ````sbt assembly````: create a new jar
-````target/scala-2.11/m1_yourid-assembly-1.0.jar```` that can be used with
+````target/scala-2.11/m2_yourid-assembly-1.0.jar```` that can be used with
 ````spark-submit````.
 
 Prefer packaging your application locally and upload the tar archive of your application
@@ -167,31 +131,24 @@ before running on cluster.
 ### Upload jar on Cluster 
 
 ````
-    scp target/scala-2.11/m1_yourid-assembly-1.0.jar <username>@iccluster028.iccluster.epfl.ch:~
+    scp target/scala-2.11/m2_yourid-assembly-1.0.jar <username>@iccluster028.iccluster.epfl.ch:~
 ````
 
 ### Run on Cluster
 
+See [config.sh](./config.sh) for HDFS paths to pre-uploaded train and test datasets to replace TRAIN and TEST with in the command in the example command below:
+ 
 ````
-spark-submit --class distributed.DistributedBaseline --master yarn --num-executors 1 m1_yourid-assembly-1.0.jar  --train TRAIN --test TEST --separator , --json distributed-25m-1.json --num_measurements 1
+spark-submit --class distributed.Exact --master yarn --conf "spark.dynamicAllocation.enabled=false" --num-executors 1 m2_yourid-assembly-1.0.jar --train TRAIN --test TEST
 ````
 
-See [config.sh](./config.sh) for HDFS paths to pre-uploaded train and test datasets to replace TRAIN and TEST with in the command. For instance, if you want to run on ML-25m, you should first run [config.sh](./config.sh) and then use the above command adapted as such:
-````
-spark-submit --class distributed.DistributedBaseline --master yarn --num-executors 1 m1_yourid-assembly-1.0.jar  --train $ML25Mr2train --test $ML25Mr2test --separator , --json distributed-25m-1.json --num_measurements 1
-````
-
-You can vary the number of executors with ````--num-executors X````, and number of measurements with ````--num_measurements Y````.
-
 ## Grading scripts
 
 We will use the following scripts to grade your submission:
 
     1. ````./test.sh````: Run all unit tests.
     2. ````./run.sh````: Run all applications without timing measurements.
-    3. ````./timeTrials.sh````: Time applications to determine which student implementations are fastest.
-    4. ````./timeOthers.sh````: Time applications to check report answers against independent measurements. 
-    5. ````./timeCluster.sh````: Package and time applications on Spark Cluster.
+    3. ````./time.sh````: Run all timing measurements. 
 
 All scripts will produce execution logs in the ````logs````
 directory, including answers produced in the JSON format. Logs directories are
@@ -199,12 +156,7 @@ in the format ````logs/<scriptname>-<datetime>-<machine>/```` and include at
 least an execution log ````log.txt```` as well as possible JSON outputs from
 applications. 
 
-Ensure all scripts run correctly locally before submitting. Avoid running
-````timeCluster.sh```` on iccluster as the packaging and measurements will
-interfere with other students working on their Milestone at the same time. If
-````timeCluster.sh```` correctly runs locally on your machine, this should be
-sufficient.
-
+Ensure all scripts run correctly locally before submitting. 
 
 ## Submission
 
diff --git a/build.sbt b/build.sbt
index 78536a555f1e1dbcdb6587eb649dd47f95cc9785..078728a600d683c5de56f44ab08ae05facf00afa 100644
--- a/build.sbt
+++ b/build.sbt
@@ -1,9 +1,11 @@
-name := "m1_yourid"
+name := "m2_yourid"
 version := "1.0"
 
 libraryDependencies += "org.rogach" %% "scallop" % "4.0.2"
 libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.7"
 libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.7"
+libraryDependencies += "org.scalanlp" %% "breeze" % "0.13.2"
+libraryDependencies += "org.scalanlp" %% "breeze-natives" % "0.13.2"
 libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.0" % Test
 libraryDependencies += "com.lihaoyi" %% "ujson" % "1.5.0"
 
diff --git a/config.sh b/config.sh
index 38a9cd241d464e6b73f98fb38553d5b710d3b772..550004b48b5914dc4d5896c157d2a4822986f6bb 100755
--- a/config.sh
+++ b/config.sh
@@ -1,16 +1,17 @@
 if [ $(hostname) == 'iccluster028' ]; 
 then  
-    export ML100Ku2base=hdfs://iccluster028.iccluster.epfl.ch:8020/cs449/data/ml-100k/u2.base;
-    export ML100Ku2test=hdfs://iccluster028.iccluster.epfl.ch:8020/cs449/data/ml-100k/u2.test;
-    export ML100Kudata=hdfs://iccluster028.iccluster.epfl.ch:8020/cs449/data/ml-100k/u.data;
-    export ML25Mr2train=hdfs://iccluster028.iccluster.epfl.ch:8020/cs449/data/ml-25m/r2-min-1.train;
-    export ML25Mr2test=hdfs://iccluster028.iccluster.epfl.ch:8020/cs449/data/ml-25m/r2-min-1.test;
+    ICCLUSTER=hdfs://iccluster028.iccluster.epfl.ch:8020
+    export ML100Ku2base=$ICCLUSTER/cs449/data/ml-100k/u2.base;
+    export ML100Ku2test=$ICCLUSTER/cs449/data/ml-100k/u2.test;
+    export ML100Kudata=$ICCLUSTER/cs449/data/ml-100k/u.data;
+    export ML1Mrbtrain=$ICCLUSTER/cs449/data/ml-1m/rb.train;
+    export ML1Mrbtest=$ICCLUSTER/cs449/data/ml-1m/rb.test;
     export SPARKMASTER='yarn'
 else 
     export ML100Ku2base=data/ml-100k/u2.base;
     export ML100Ku2test=data/ml-100k/u2.test;
     export ML100Kudata=data/ml-100k/u.data;
-    export ML25Mr2train=data/ml-25m/r2.train;
-    export ML25Mr2test=data/ml-25m/r2.test;
+    export ML1Mrbtrain=data/ml-1m/rb.train;
+    export ML1Mrbtest=data/ml-1m/rb.test;
     export SPARKMASTER='local[4]'
 fi;
diff --git a/run.sh b/run.sh
index 75e8876083c98c2ac29edf8d9e71e28ccf8ac288..3100e84374d439cbb5ee6b1ac16616113cc30aef 100755
--- a/run.sh
+++ b/run.sh
@@ -8,14 +8,16 @@ RUN=./logs/run-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
 mkdir -p $RUN
 LOGS=$RUN/log.txt
 source ./config.sh 
-echo "------------------- BASELINE    ---------------------" >> $LOGS
-sbt "runMain predict.Baseline --train $ML100Ku2base --test $ML100Ku2test --json $RUN/baseline-100k.json" 2>&1 >>$LOGS
-echo "------------------- DISTRIBUTED ---------------------" >> $LOGS
-sbt "runMain predict.Baseline --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/baseline-25m.json" 2>&1 >>$LOGS
-sbt "runMain distributed.DistributedBaseline --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/distributed-25m-4.json --master $SPARKMASTER" 2>&1 >>$LOGS
-echo "------------------- PERSONALIZED --------------------" >> $LOGS
-sbt "runMain predict.Personalized --train $ML100Ku2base --test $ML100Ku2test --json $RUN/personalized-100k.json" 2>&1 >>$LOGS
-echo "------------------- KNN -----------------------------" >> $LOGS
-sbt "runMain predict.kNN --train $ML100Ku2base --test $ML100Ku2test --json $RUN/knn-100k.json" 2>&1 >>$LOGS
-echo "------------------- RECOMMEND -----------------------" >> $LOGS
-sbt "runMain recommend.Recommender --data $ML100Kudata --personal data/personal.csv --json $RUN/recommender-100k.json" 2>&1 >>$LOGS
+echo "------------------- OPTIMIZING    ---------------------" >> $LOGS
+sbt "runMain scaling.Optimizing --train $ML100Ku2base --test $ML100Ku2test --json $RUN/optimizing-100k.json --users 943 --movies 1682" 2>&1 >>$LOGS
+echo "------------------- DISTRIBUTED EXACT ---------------------" >> $LOGS
+sbt "runMain distributed.Exact --train $ML100Ku2base --test $ML100Ku2test --json $RUN/exact-100k-4.json --k 10 --master local[4] --users 943 --movies 1682" 2>&1 >>$LOGS
+sbt "runMain distributed.Exact --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/exact-1m-4.json --k 300 --master local[4] --users 6040 --movies 3952" 2>&1 >>$LOGS
+echo "------------------- DISTRIBUTED APPROXIMATE ---------------------" >> $LOGS
+sbt "runMain distributed.Approximate --train $ML100Ku2base --test $ML100Ku2test --json $RUN/approximate-100k-4-k10-r2.json --k 10 --master local[4] --users 943 --movies 1682 --partitions 10 --replication 2" 2>&1 >>$LOGS;
+for R in 1 2 3 4 6 8; do
+    sbt "runMain distributed.Approximate --train $ML100Ku2base --test $ML100Ku2test --json $RUN/approximate-100k-4-k300-r$R.json --k 300 --master local[4] --users 943 --movies 1682 --partitions 10 --replication $R" 2>&1 >>$LOGS;
+done
+sbt "runMain distributed.Approximate --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/approximate-1m-4.json --k 300 --master local[4] --users 6040 --movies 3952 --partitions 8 --replication 1" 2>&1 >>$LOGS
+echo "------------------- ECONOMICS -----------------------------------" >> $LOGS
+sbt "runMain economics.Economics --json $RUN/economics.json" 2>&1 >>$LOGS
diff --git a/src/main/scala/distributed/Approximate.scala b/src/main/scala/distributed/Approximate.scala
new file mode 100644
index 0000000000000000000000000000000000000000..a4c4ca601099f370ad3740a1541e8bb8a6fc91ea
--- /dev/null
+++ b/src/main/scala/distributed/Approximate.scala
@@ -0,0 +1,131 @@
+import org.rogach.scallop._
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+import breeze.linalg._
+import breeze.numerics._
+import scala.io.Source
+import scala.collection.mutable.ArrayBuffer
+import ujson._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import shared.predictions._
+
+package distributed {
+
+class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
+  val train = opt[String](required = true)
+  val test = opt[String](required = true)
+  val k = opt[Int]()
+  val json = opt[String]()
+  val users = opt[Int]()
+  val movies = opt[Int]()
+  val separator = opt[String](default=Some("\t"))
+  val replication = opt[Int](default=Some(1))
+  val partitions = opt[Int](default=Some(1))
+  val master = opt[String]()
+  val num_measurements = opt[Int](default=Some(1))
+  verify()
+}
+
+object Approximate {
+  def main(args: Array[String]) {
+    var conf = new Conf(args)
+
+    // Remove these lines if encountering/debugging Spark
+    Logger.getLogger("org").setLevel(Level.OFF)
+    Logger.getLogger("akka").setLevel(Level.OFF)
+    val spark = conf.master.toOption match {
+      case None => SparkSession.builder().getOrCreate();
+      case Some(master) => SparkSession.builder().master(master).getOrCreate();
+    }
+    val sc = spark.sparkContext
+
+    println("")
+    println("******************************************************")
+
+    // conf object is not serializable, extract values that
+    // will be serialized with the parallelize implementations
+    val conf_users = conf.users()
+    val conf_movies = conf.movies()
+    val conf_k = conf.k()
+
+    println("Loading training data")
+    val train = load(conf.train(), conf.separator(), conf.users(), conf.movies())
+    val test = load(conf.test(), conf.separator(), conf.users(), conf.movies())
+    var knn : CSCMatrix[Double] = null
+
+    println("Partitioning users")
+    var partitionedUsers : Seq[Set[Int]] = partitionUsers(
+      conf.users(), 
+      conf.partitions(), 
+      conf.replication()
+    )
+    val measurements = (1 to scala.math.max(1,conf.num_measurements()))
+      .map(_ => timingInMs( () => {
+      // Use partitionedUsers here
+      0.0
+    }))
+    val mae = measurements(0)._1
+    val timings = measurements.map(_._2)
+
+    // Save answers as JSON
+    def printToFile(content: String,
+                    location: String = "./answers.json") =
+      Some(new java.io.PrintWriter(location)).foreach{
+        f => try{
+          f.write(content)
+        } finally{ f.close }
+    }
+    conf.json.toOption match {
+      case None => ;
+      case Some(jsonFile) => {
+        val answers = ujson.Obj(
+          "Meta" -> ujson.Obj(
+            "train" -> ujson.Str(conf.train()),
+            "test" -> ujson.Str(conf.test()),
+            "k" -> ujson.Num(conf.k()),
+            "users" -> ujson.Num(conf.users()),
+            "movies" -> ujson.Num(conf.movies()),
+            "master" -> ujson.Str(sc.getConf.get("spark.master")),
+            "num-executors" -> ujson.Str(if (sc.getConf.contains("spark.executor.instances"))
+                                            sc.getConf.get("spark.executor.instances")
+                                         else
+                                            ""),
+            "num_measurements" -> ujson.Num(conf.num_measurements()),
+            "partitions" -> ujson.Num(conf.partitions()),
+            "replication" -> ujson.Num(conf.replication()) 
+          ),
+          "AK.1" -> ujson.Obj(
+            "knn_u1v1" -> ujson.Num(0.0),
+            "knn_u1v864" -> ujson.Num(0.0),
+            "knn_u1v344" -> ujson.Num(0.0),
+            "knn_u1v16" -> ujson.Num(0.0),
+            "knn_u1v334" -> ujson.Num(0.0),
+            "knn_u1v2" -> ujson.Num(0.0)
+          ),
+          "AK.2" -> ujson.Obj(
+            "mae" -> ujson.Num(mae) 
+          ),
+          "AK.3" -> ujson.Obj(
+            "average (ms)" -> ujson.Num(mean(timings)),
+            "stddev (ms)" -> ujson.Num(std(timings))
+          )
+        )
+        val json = write(answers, 4)
+
+        println(json)
+        println("Saving answers in: " + jsonFile)
+        printToFile(json, jsonFile)
+      }
+    }
+
+    println("")
+    spark.stop()
+  } 
+}
+
+}
diff --git a/src/main/scala/distributed/DistributedBaseline.scala b/src/main/scala/distributed/DistributedBaseline.scala
deleted file mode 100644
index cf9e8320fe86b07dd782032bd96a502644548564..0000000000000000000000000000000000000000
--- a/src/main/scala/distributed/DistributedBaseline.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-package distributed
-
-import org.rogach.scallop._
-import org.apache.spark.rdd.RDD
-import ujson._
-
-import org.apache.spark.sql.SparkSession
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-
-import scala.math
-import shared.predictions._
-
-class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
-  val train = opt[String](required = true)
-  val test = opt[String](required = true)
-  val separator = opt[String](default=Some("\t"))
-  val master = opt[String](default=Some(""))
-  val num_measurements = opt[Int](default=Some(0))
-  val json = opt[String]()
-  verify()
-}
-
-object DistributedBaseline extends App {
-  var conf = new Conf(args) 
-
-  // Remove these lines if encountering/debugging Spark
-  Logger.getLogger("org").setLevel(Level.OFF)
-  Logger.getLogger("akka").setLevel(Level.OFF)
-  val spark = if (conf.master() != "") {
-    SparkSession.builder().master(conf.master()).getOrCreate()
-  } else {
-    SparkSession.builder().getOrCreate()
-  }
-  spark.sparkContext.setLogLevel("ERROR") 
-
-  println("")
-  println("******************************************************")
-
-  println("Loading training data from: " + conf.train()) 
-  val train = load(spark, conf.train(), conf.separator())
-  println("Loading test data from: " + conf.test()) 
-  val test = load(spark, conf.test(), conf.separator())
-
-  val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
-    Thread.sleep(1000) // Do everything here from train and test
-    42        // Output answer as last value
-  }))
-  val timings = measurements.map(t => t._2) // Retrieve the timing measurements
-
-  // Save answers as JSON
-  def printToFile(content: String, 
-                  location: String = "./answers.json") =
-    Some(new java.io.PrintWriter(location)).foreach{
-      f => try{
-        f.write(content)
-      } finally{ f.close }
-  }
-  conf.json.toOption match {
-    case None => ; 
-    case Some(jsonFile) => {
-      val answers = ujson.Obj(
-        "Meta" -> ujson.Obj(
-          "1.Train" -> conf.train(),
-          "2.Test" -> conf.test(),
-          "3.Master" -> conf.master(),
-          "4.Measurements" -> conf.num_measurements()
-        ),
-        "D.1" -> ujson.Obj(
-          "1.GlobalAvg" -> ujson.Num(0.0), // Datatype of answer: Double
-          "2.User1Avg" -> ujson.Num(0.0),  // Datatype of answer: Double
-          "3.Item1Avg" -> ujson.Num(0.0),   // Datatype of answer: Double
-          "4.Item1AvgDev" -> ujson.Num(0.0), // Datatype of answer: Double,
-          "5.PredUser1Item1" -> ujson.Num(0.0), // Datatype of answer: Double
-          "6.Mae" -> ujson.Num(0.0) // Datatype of answer: Double
-        ),
-        "D.2" -> ujson.Obj(
-          "1.DistributedBaseline" -> ujson.Obj(
-            "average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
-            "stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
-          )            
-        )
-      )
-      val json = write(answers, 4)
-
-      println(json)
-      println("Saving answers in: " + jsonFile)
-      printToFile(json, jsonFile)
-    }
-  }
-
-  println("")
-  spark.close()
-}
diff --git a/src/main/scala/distributed/Exact.scala b/src/main/scala/distributed/Exact.scala
new file mode 100644
index 0000000000000000000000000000000000000000..94dcad37dea73eeedba52e68b49e250d53ba2738
--- /dev/null
+++ b/src/main/scala/distributed/Exact.scala
@@ -0,0 +1,112 @@
+import org.rogach.scallop._
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+import breeze.linalg._
+import breeze.numerics._
+import scala.io.Source
+import scala.collection.mutable.ArrayBuffer
+import ujson._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import shared.predictions._
+
+package distributed {
+
+class ExactConf(arguments: Seq[String]) extends ScallopConf(arguments) {
+  val train = opt[String](required = true)
+  val test = opt[String](required = true)
+  val k = opt[Int](default=Some(10))
+  val json = opt[String]()
+  val users = opt[Int]()
+  val movies = opt[Int]()
+  val separator = opt[String](default=Some("\t"))
+  val master = opt[String]()
+  val num_measurements = opt[Int](default=Some(1))
+  verify()
+}
+
+object Exact {
+  def main(args: Array[String]) {
+    var conf = new ExactConf(args)
+
+    // Remove these lines if encountering/debugging Spark
+    Logger.getLogger("org").setLevel(Level.OFF)
+    Logger.getLogger("akka").setLevel(Level.OFF)
+    val spark = conf.master.toOption match {
+      case None => SparkSession.builder().getOrCreate();
+      case Some(master) => SparkSession.builder().master(master).getOrCreate();
+    }
+    spark.sparkContext.setLogLevel("ERROR")
+    val sc = spark.sparkContext
+
+    println("")
+    println("******************************************************")
+    // conf object is not serializable, extract values that
+    // will be serialized with the parallelize implementations
+    val conf_users = conf.users()
+    val conf_movies = conf.movies()
+    val conf_k = conf.k()
+
+    println("Loading training data from: " + conf.train())
+    val train = load(conf.train(), conf.separator(), conf.users(), conf.movies())
+    val test = load(conf.test(), conf.separator(), conf.users(), conf.movies())
+
+    val measurements = (1 to scala.math.max(1,conf.num_measurements())).map(_ => timingInMs( () => {
+      0.0
+    }))
+    val timings = measurements.map(_._2)
+
+    // Save answers as JSON
+    def printToFile(content: String,
+                    location: String = "./answers.json") =
+      Some(new java.io.PrintWriter(location)).foreach{
+        f => try{
+          f.write(content)
+        } finally{ f.close }
+    }
+    conf.json.toOption match {
+      case None => ;
+      case Some(jsonFile) => {
+        val answers = ujson.Obj(
+          "Meta" -> ujson.Obj(
+            "train" -> ujson.Str(conf.train()),
+            "test" -> ujson.Str(conf.test()),
+            "k" -> ujson.Num(conf.k()),
+            "users" -> ujson.Num(conf.users()),
+            "movies" -> ujson.Num(conf.movies()),
+            "master" -> ujson.Str(sc.getConf.get("spark.master")),
+            "num-executors" -> ujson.Str(if (sc.getConf.contains("spark.executor.instances"))
+                                            sc.getConf.get("spark.executor.instances")
+                                         else
+                                            ""),
+            "num_measurements" -> ujson.Num(conf.num_measurements())
+          ),
+          "EK.1" -> ujson.Obj(
+            "1.knn_u1v1" -> ujson.Num(0.0),
+            "2.knn_u1v864" -> ujson.Num(0.0),
+            "3.knn_u1v886" -> ujson.Num(0.0),
+            "4.PredUser1Item1" -> ujson.Num(0.0),
+            "5.Mae" -> ujson.Num(measurements(0)._1)
+          ),
+          "EK.2" ->  ujson.Obj(
+            "average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
+            "stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
+          )
+        )
+        val json = write(answers, 4)
+        println(json)
+        println("Saving answers in: " + jsonFile)
+        printToFile(json, jsonFile)
+      }
+    }
+
+    println("")
+    spark.stop()
+  } 
+}
+
+}
diff --git a/src/main/scala/economics/Economics.scala b/src/main/scala/economics/Economics.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f8ab904658d9d0df73bf6bce708de00fa3c25b2e
--- /dev/null
+++ b/src/main/scala/economics/Economics.scala
@@ -0,0 +1,63 @@
+import org.rogach.scallop._
+import breeze.linalg._
+import breeze.numerics._
+import scala.io.Source
+import scala.collection.mutable.ArrayBuffer
+import ujson._
+
+package economics {
+
+class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
+  val json = opt[String]()
+  verify()
+}
+
+object Economics {
+  def main(args: Array[String]) {
+    println("")
+    println("******************************************************")
+
+    var conf = new Conf(args)
+
+    // Save answers as JSON
+    def printToFile(content: String,
+                    location: String = "./answers.json") =
+      Some(new java.io.PrintWriter(location)).foreach{
+        f => try{
+          f.write(content)
+        } finally{ f.close }
+    }
+    conf.json.toOption match {
+      case None => ;
+      case Some(jsonFile) => {
+
+        val answers = ujson.Obj(
+          "E.1" -> ujson.Obj(
+            "MinRentingDays" -> ujson.Num(0.0) // Datatype of answer: Double
+          ),
+          "E.2" -> ujson.Obj(
+            "ContainerDailyCost" -> ujson.Num(0.0),
+            "4RPisDailyCostIdle" -> ujson.Num(0.0),
+            "4RPisDailyCostComputing" -> ujson.Num(0.0),
+            "MinRentingDaysIdleRPiPower" -> ujson.Num(0.0),
+            "MinRentingDaysComputingRPiPower" -> ujson.Num(0.0) 
+          ),
+          "E.3" -> ujson.Obj(
+            "NbRPisEqBuyingICCM7" -> ujson.Num(0.0),
+            "RatioRAMRPisVsICCM7" -> ujson.Num(0.0),
+            "RatioComputeRPisVsICCM7" -> ujson.Num(0.0)
+          )
+        )
+
+        val json = write(answers, 4)
+        println(json)
+        println("Saving answers in: " + jsonFile)
+        printToFile(json, jsonFile)
+      }
+    }
+
+    println("")
+  } 
+}
+
+}
diff --git a/src/main/scala/optimizing/Optimizing.scala b/src/main/scala/optimizing/Optimizing.scala
new file mode 100644
index 0000000000000000000000000000000000000000..3432900572753e62e536509cd80accdd5bb8737a
--- /dev/null
+++ b/src/main/scala/optimizing/Optimizing.scala
@@ -0,0 +1,81 @@
+import org.rogach.scallop._
+import breeze.linalg._
+import breeze.numerics._
+import scala.io.Source
+import scala.collection.mutable.ArrayBuffer
+import ujson._
+import shared.predictions._
+
+package scaling {
+
+class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
+  val train = opt[String](required = true)
+  val test = opt[String](required = true)
+  val json = opt[String]()
+  val users = opt[Int]()
+  val movies = opt[Int]()
+  val separator = opt[String](default=Some("\t"))
+  val num_measurements = opt[Int](default=Some(1))
+  verify()
+}
+
+object Optimizing extends App {
+    var conf = new Conf(args)
+    // conf object is not serializable, extract values that
+    // will be serialized with the parallelize implementations
+    val conf_users = conf.users()
+    val conf_movies = conf.movies()
+
+    println("Loading training data from: " + conf.train())
+    val train = load(conf.train(), conf.separator(), conf.users(), conf.movies())
+    val test = load(conf.test(), conf.separator(), conf.users(), conf.movies())
+
+    val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
+      0.0
+    }))
+    val timings = measurements.map(t => t._2)
+    val mae = measurements(0)._1
+
+    // Save answers as JSON
+    def printToFile(content: String,
+                    location: String = "./answers.json") =
+      Some(new java.io.PrintWriter(location)).foreach{
+        f => try{
+          f.write(content)
+        } finally{ f.close }
+    }
+    conf.json.toOption match {
+      case None => ;
+      case Some(jsonFile) => {
+        val answers = ujson.Obj(
+          "Meta" -> ujson.Obj(
+            "train" -> ujson.Str(conf.train()),
+            "test" -> ujson.Str(conf.test()),
+            "users" -> ujson.Num(conf.users()),
+            "movies" -> ujson.Num(conf.movies()),
+            "num_measurements" -> ujson.Num(conf.num_measurements())
+          ),
+          "BR.1" -> ujson.Obj(
+            "1.k10u1v1" -> ujson.Num(0.0),
+            "2.k10u1v864" -> ujson.Num(0.0),
+            "3.k10u1v886" -> ujson.Num(0.0),
+            "4.PredUser1Item1" -> ujson.Num(0.0),
+            "5.Mae" -> ujson.Num(0.0)
+          ),
+          "BR.2" ->  ujson.Obj(
+            "average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
+            "stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
+          )
+        )
+
+        val json = write(answers, 4)
+
+        println(json)
+        println("Saving answers in: " + jsonFile)
+        printToFile(json, jsonFile)
+      }
+    }
+
+    println("")
+} 
+}
diff --git a/src/main/scala/predict/Baseline.scala b/src/main/scala/predict/Baseline.scala
deleted file mode 100644
index b3775d7e49b4be7ed491a089fab0d70ee5840b53..0000000000000000000000000000000000000000
--- a/src/main/scala/predict/Baseline.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-package predict
-
-import org.rogach.scallop._
-import org.apache.spark.rdd.RDD
-
-import org.apache.spark.sql.SparkSession
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-
-import scala.math
-import shared.predictions._
-
-
-class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
-  val train = opt[String](required = true)
-  val test = opt[String](required = true)
-  val separator = opt[String](default=Some("\t"))
-  val num_measurements = opt[Int](default=Some(0))
-  val json = opt[String]()
-  verify()
-}
-
-object Baseline extends App {
-  // Remove these lines if encountering/debugging Spark
-  Logger.getLogger("org").setLevel(Level.OFF)
-  Logger.getLogger("akka").setLevel(Level.OFF)
-  val spark = SparkSession.builder()
-    .master("local[1]")
-    .getOrCreate()
-  spark.sparkContext.setLogLevel("ERROR") 
-
-  println("")
-  println("******************************************************")
-
-  var conf = new Conf(args) 
-  // For these questions, data is collected in a scala Array 
-  // to not depend on Spark
-  println("Loading training data from: " + conf.train()) 
-  val train = load(spark, conf.train(), conf.separator()).collect()
-  println("Loading test data from: " + conf.test()) 
-  val test = load(spark, conf.test(), conf.separator()).collect()
-
-  val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
-    Thread.sleep(1000) // Do everything here from train and test
-    42        // Output answer as last value
-  }))
-  val timings = measurements.map(t => t._2) // Retrieve the timing measurements
-
-  // Save answers as JSON
-  def printToFile(content: String, 
-                  location: String = "./answers.json") =
-    Some(new java.io.PrintWriter(location)).foreach{
-      f => try{
-        f.write(content)
-      } finally{ f.close }
-  }
-  conf.json.toOption match {
-    case None => ; 
-    case Some(jsonFile) => {
-      var answers = ujson.Obj(
-        "Meta" -> ujson.Obj(
-          "1.Train" -> ujson.Str(conf.train()),
-          "2.Test" -> ujson.Str(conf.test()),
-          "3.Measurements" -> ujson.Num(conf.num_measurements())
-        ),
-        "B.1" -> ujson.Obj(
-          "1.GlobalAvg" -> ujson.Num(0.0), // Datatype of answer: Double
-          "2.User1Avg" -> ujson.Num(0.0),  // Datatype of answer: Double
-          "3.Item1Avg" -> ujson.Num(0.0),   // Datatype of answer: Double
-          "4.Item1AvgDev" -> ujson.Num(0.0), // Datatype of answer: Double
-          "5.PredUser1Item1" -> ujson.Num(0.0) // Datatype of answer: Double
-        ),
-        "B.2" -> ujson.Obj(
-          "1.GlobalAvgMAE" -> ujson.Num(0.0), // Datatype of answer: Double
-          "2.UserAvgMAE" -> ujson.Num(0.0),  // Datatype of answer: Double
-          "3.ItemAvgMAE" -> ujson.Num(0.0),   // Datatype of answer: Double
-          "4.BaselineMAE" -> ujson.Num(0.0)   // Datatype of answer: Double
-        ),
-        "B.3" -> ujson.Obj(
-          "1.GlobalAvg" -> ujson.Obj(
-            "average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
-            "stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
-          ),
-          "2.UserAvg" -> ujson.Obj(
-            "average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
-            "stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
-          ),
-          "3.ItemAvg" -> ujson.Obj(
-            "average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
-            "stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
-          ),
-          "4.Baseline" -> ujson.Obj(
-            "average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
-            "stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
-          )
-        )
-      )
-
-      val json = ujson.write(answers, 4)
-      println(json)
-      println("Saving answers in: " + jsonFile)
-      printToFile(json.toString, jsonFile)
-    }
-  }
-
-  println("")
-  spark.close()
-}
diff --git a/src/main/scala/predict/Personalized.scala b/src/main/scala/predict/Personalized.scala
deleted file mode 100644
index 3f1d7093a9c3078a62aa40866a758bc9c73e4166..0000000000000000000000000000000000000000
--- a/src/main/scala/predict/Personalized.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-package predict
-
-import org.rogach.scallop._
-import org.apache.spark.rdd.RDD
-import ujson._
-
-import org.apache.spark.sql.SparkSession
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-
-import scala.math
-import shared.predictions._
-
-
-class PersonalizedConf(arguments: Seq[String]) extends ScallopConf(arguments) {
-  val train = opt[String](required = true)
-  val test = opt[String](required = true)
-  val separator = opt[String](default=Some("\t"))
-  val num_measurements = opt[Int](default=Some(0))
-  val json = opt[String]()
-  verify()
-}
-
-object Personalized extends App {
-  // Remove these lines if encountering/debugging Spark
-  Logger.getLogger("org").setLevel(Level.OFF)
-  Logger.getLogger("akka").setLevel(Level.OFF)
-  val spark = SparkSession.builder()
-    .master("local[1]")
-    .getOrCreate()
-  spark.sparkContext.setLogLevel("ERROR") 
-
-  println("")
-  println("******************************************************")
-
-  var conf = new PersonalizedConf(args) 
-  println("Loading training data from: " + conf.train()) 
-  val train = load(spark, conf.train(), conf.separator()).collect()
-  println("Loading test data from: " + conf.test()) 
-  val test = load(spark, conf.test(), conf.separator()).collect()
-  
-  // Compute here
-
-  // Save answers as JSON
-  def printToFile(content: String, 
-                  location: String = "./answers.json") =
-    Some(new java.io.PrintWriter(location)).foreach{
-      f => try{
-        f.write(content)
-      } finally{ f.close }
-  }
-  conf.json.toOption match {
-    case None => ; 
-    case Some(jsonFile) => {
-      val answers = ujson.Obj(
-        "Meta" -> ujson.Obj(
-          "1.Train" -> ujson.Str(conf.train()),
-          "2.Test" -> ujson.Str(conf.test()),
-          "3.Measurements" -> ujson.Num(conf.num_measurements())
-        ),
-        "P.1" -> ujson.Obj(
-          "1.PredUser1Item1" -> ujson.Num(0.0), // Prediction of item 1 for user 1 (similarity 1 between users)
-          "2.OnesMAE" -> ujson.Num(0.0)         // MAE when using similarities of 1 between all users
-        ),
-        "P.2" -> ujson.Obj(
-          "1.AdjustedCosineUser1User2" -> ujson.Num(0.0), // Similarity between user 1 and user 2 (adjusted Cosine)
-          "2.PredUser1Item1" -> ujson.Num(0.0),  // Prediction item 1 for user 1 (adjusted cosine)
-          "3.AdjustedCosineMAE" -> ujson.Num(0.0) // MAE when using adjusted cosine similarity
-        ),
-        "P.3" -> ujson.Obj(
-          "1.JaccardUser1User2" -> ujson.Num(0.0), // Similarity between user 1 and user 2 (jaccard similarity)
-          "2.PredUser1Item1" -> ujson.Num(0.0),  // Prediction item 1 for user 1 (jaccard)
-          "3.JaccardPersonalizedMAE" -> ujson.Num(0.0) // MAE when using jaccard similarity
-        )
-      )
-      val json = write(answers, 4)
-      println(json)
-      println("Saving answers in: " + jsonFile)
-      printToFile(json, jsonFile)
-    }
-  }
-
-  println("")
-  spark.close()
-}
diff --git a/src/main/scala/predict/kNN.scala b/src/main/scala/predict/kNN.scala
deleted file mode 100644
index ea602cd1a5025b9d99a21fa205b1159f1d48f72d..0000000000000000000000000000000000000000
--- a/src/main/scala/predict/kNN.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-package predict
-
-import org.rogach.scallop._
-import org.apache.spark.rdd.RDD
-import ujson._
-
-import org.apache.spark.sql.SparkSession
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-
-import scala.math
-import shared.predictions._
-
-
-class kNNConf(arguments: Seq[String]) extends ScallopConf(arguments) {
-  val train = opt[String](required = true)
-  val test = opt[String](required = true)
-  val separator = opt[String](default=Some("\t"))
-  val num_measurements = opt[Int](default=Some(0))
-  val json = opt[String]()
-  verify()
-}
-
-object kNN extends App {
-  // Remove these lines if encountering/debugging Spark
-  Logger.getLogger("org").setLevel(Level.OFF)
-  Logger.getLogger("akka").setLevel(Level.OFF)
-  val spark = SparkSession.builder()
-    .master("local[1]")
-    .getOrCreate()
-  spark.sparkContext.setLogLevel("ERROR") 
-
-  println("")
-  println("******************************************************")
-
-  var conf = new PersonalizedConf(args) 
-  println("Loading training data from: " + conf.train()) 
-  val train = load(spark, conf.train(), conf.separator()).collect()
-  println("Loading test data from: " + conf.test()) 
-  val test = load(spark, conf.test(), conf.separator()).collect()
-
-
-  val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
-    Thread.sleep(1000) // Do everything here from train and test
-    42        // Output answer as last value
-  }))
-  val timings = measurements.map(t => t._2) // Retrieve the timing measurements
-
-  // Save answers as JSON
-  def printToFile(content: String, 
-                  location: String = "./answers.json") =
-    Some(new java.io.PrintWriter(location)).foreach{
-      f => try{
-        f.write(content)
-      } finally{ f.close }
-  }
-  conf.json.toOption match {
-    case None => ; 
-    case Some(jsonFile) => {
-      val answers = ujson.Obj(
-        "Meta" -> ujson.Obj(
-          "1.Train" -> conf.train(),
-          "2.Test" -> conf.test(),
-          "3.Measurements" -> conf.num_measurements()
-        ),
-        "N.1" -> ujson.Obj(
-          "1.k10u1v1" -> ujson.Num(0.0), // Similarity between user 1 and user 1 (k=10)
-          "2.k10u1v864" -> ujson.Num(0.0), // Similarity between user 1 and user 864 (k=10)
-          "3.k10u1v886" -> ujson.Num(0.0), // Similarity between user 1 and user 886 (k=10)
-          "4.PredUser1Item1" -> ujson.Num(0.0) // Prediction of item 1 for user 1 (k=10)
-        ),
-        "N.2" -> ujson.Obj(
-          "1.kNN-Mae" -> List(10,30,50,100,200,300,400,800,943).map(k => 
-              List(
-                k,
-                0.0 // Compute MAE
-              )
-          ).toList
-        ),
-        "N.3" -> ujson.Obj(
-          "1.kNN" -> ujson.Obj(
-            "average (ms)" -> ujson.Num(mean(timings)),
-            "stddev (ms)" -> ujson.Num(std(timings))
-          )
-        )
-      )
-      val json = write(answers, 4)
-
-      println(json)
-      println("Saving answers in: " + jsonFile)
-      printToFile(json, jsonFile)
-    }
-  }
-
-  println("")
-  spark.close()
-}
diff --git a/src/main/scala/recommend/Recommender.scala b/src/main/scala/recommend/Recommender.scala
deleted file mode 100644
index 83c7a1fc9419228e3f02a89f1faeb09a799bc69b..0000000000000000000000000000000000000000
--- a/src/main/scala/recommend/Recommender.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-package recommend
-
-import org.rogach.scallop._
-import org.apache.spark.rdd.RDD
-import ujson._
-
-import org.apache.spark.sql.SparkSession
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-
-import shared.predictions._
-
-class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
-  val data = opt[String](required = true)
-  val personal = opt[String](required = true)
-  val separator = opt[String](default = Some("\t"))
-  val json = opt[String]()
-  verify()
-}
-
-object Recommender extends App {
-  // Remove these lines if encountering/debugging Spark
-  Logger.getLogger("org").setLevel(Level.OFF)
-  Logger.getLogger("akka").setLevel(Level.OFF)
-  val spark = SparkSession.builder()
-    .master("local[1]")
-    .getOrCreate()
-  spark.sparkContext.setLogLevel("ERROR") 
-
-  println("")
-  println("******************************************************")
-
-  var conf = new Conf(args) 
-  println("Loading data from: " + conf.data()) 
-  val data = load(spark, conf.data(), conf.separator()).collect()
-  assert(data.length == 100000, "Invalid data")
-
-  println("Loading personal data from: " + conf.personal()) 
-  val personalFile = spark.sparkContext.textFile(conf.personal())
-  val personal = personalFile.map(l => {
-      val cols = l.split(",").map(_.trim)
-      if (cols(0) == "id") 
-        Rating(944,0,0.0)
-      else 
-        if (cols.length < 3) 
-          Rating(944, cols(0).toInt, 0.0)
-        else
-          Rating(944, cols(0).toInt, cols(2).toDouble)
-  }).filter(r => r.rating != 0).collect()
-  val movieNames = personalFile.map(l => {
-      val cols = l.split(",").map(_.trim)
-      if (cols(0) == "id") (0, "header")
-      else (cols(0).toInt, cols(1).toString)
-  }).collect().toMap
-
-
-  // Save answers as JSON
-  def printToFile(content: String, 
-                  location: String = "./answers.json") =
-    Some(new java.io.PrintWriter(location)).foreach{
-      f => try{
-        f.write(content)
-      } finally{ f.close }
-  }
-  conf.json.toOption match {
-    case None => ; 
-    case Some(jsonFile) => {
-      val answers = ujson.Obj(
-        "Meta" -> ujson.Obj(
-          "data" -> conf.data(),
-          "personal" -> conf.personal()
-        ),
-        "R.1" -> ujson.Obj(
-          "PredUser1Item1" -> ujson.Num(0.0) // Prediction for user 1 of item 1
-        ),
-          // IMPORTANT: To break ties and ensure reproducibility of results,
-          // please report the top-3 recommendations that have the smallest
-          // movie identifier.
-
-        "R.2" -> List((254, 0.0), (338, 0.0), (615, 0.0)).map(x => ujson.Arr(x._1, movieNames(x._1), x._2))
-       )
-      val json = write(answers, 4)
-
-      println(json)
-      println("Saving answers in: " + jsonFile)
-      printToFile(json, jsonFile)
-    }
-  }
-
-  println("")
-  spark.close()
-}
diff --git a/src/main/scala/shared/predictions.scala b/src/main/scala/shared/predictions.scala
index 89bff73ca0dc7a8776b539bd251fda1a206b5c26..63485e37174cd3daf42ae99f7b92b16dcdf16dc6 100644
--- a/src/main/scala/shared/predictions.scala
+++ b/src/main/scala/shared/predictions.scala
@@ -1,7 +1,14 @@
 package shared
 
+import breeze.linalg._
+import breeze.numerics._
+import scala.io.Source
+import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.SparkContext
+
 package object predictions
 {
+  // ------------------------ For template
   case class Rating(user: Int, item: Int, rating: Double)
 
   def timingInMs(f : ()=>Double ) : (Double, Double) = {
@@ -11,36 +18,53 @@ package object predictions
     return (output, (end-start)/1000000.0)
   }
 
+  def toInt(s: String): Option[Int] = {
+    try {
+      Some(s.toInt)
+    } catch {
+      case e: Exception => None
+    }
+  }
+
   def mean(s :Seq[Double]): Double =  if (s.size > 0) s.reduce(_+_) / s.length else 0.0
+
   def std(s :Seq[Double]): Double = {
     if (s.size == 0) 0.0
-    else {
+    else { 
       val m = mean(s)
-      scala.math.sqrt(s.map(x => scala.math.pow(m-x, 2)).sum / s.length.toDouble)
+      scala.math.sqrt(s.map(x => scala.math.pow(m-x, 2)).sum / s.length.toDouble) 
     }
   }
 
-  def toInt(s: String): Option[Int] = {
-    try {
-      Some(s.toInt)
-    } catch {
-      case e: Exception => None
+
+  def load(path : String, sep : String, nbUsers : Int, nbMovies : Int) : CSCMatrix[Double] = {
+    val file = Source.fromFile(path)
+    val builder = new CSCMatrix.Builder[Double](rows=nbUsers, cols=nbMovies) 
+    for (line <- file.getLines) {
+      val cols = line.split(sep).map(_.trim)
+      toInt(cols(0)) match {
+        case Some(_) => builder.add(cols(0).toInt-1, cols(1).toInt-1, cols(2).toDouble)
+        case None => None
+      }
     }
+    file.close
+    builder.result()
   }
 
-  def load(spark : org.apache.spark.sql.SparkSession,  path : String, sep : String) : org.apache.spark.rdd.RDD[Rating] = {
-       val file = spark.sparkContext.textFile(path)
-       return file
-         .map(l => {
-           val cols = l.split(sep).map(_.trim)
-           toInt(cols(0)) match {
-             case Some(_) => Some(Rating(cols(0).toInt, cols(1).toInt, cols(2).toDouble))
-             case None => None
-           }
-       })
-         .filter({ case Some(_) => true 
-                   case None => false })
-         .map({ case Some(x) => x 
-                case None => Rating(-1, -1, -1)})
+  def partitionUsers (nbUsers : Int, nbPartitions : Int, replication : Int) : Seq[Set[Int]] = {
+    val r = new scala.util.Random(1337)
+    val bins : Map[Int, collection.mutable.ListBuffer[Int]] = (0 to (nbPartitions-1))
+       .map(p => (p -> collection.mutable.ListBuffer[Int]())).toMap
+    (0 to (nbUsers-1)).foreach(u => {
+      val assignedBins = r.shuffle(0 to (nbPartitions-1)).take(replication)
+      for (b <- assignedBins) {
+        bins(b) += u
+      }
+    })
+    bins.values.toSeq.map(_.toSet)
   }
+
+
 }
+
+
diff --git a/src/test/scala/AllTests.scala b/src/test/scala/AllTests.scala
index 56825e9d3dcde00fd09b364a88db4eef498d3524..ffdb8425e3859a2c3f124ac606b3d1fa536a10b0 100644
--- a/src/test/scala/AllTests.scala
+++ b/src/test/scala/AllTests.scala
@@ -3,14 +3,12 @@ package test
 import org.scalatest._
 import funsuite._
 
+import test.optimizing._
 import test.distributed._
-import test.predict._
 
 class AllTests extends Sequential(
-  new test.predict.BaselineTests,
-  new test.distributed.DistributedBaselineTests,
-  new test.predict.PersonalizedTests,
-  new test.predict.kNNTests,
-  new test.recommend.RecommenderTests
+  new OptimizingTests,
+  new ExactTests,
+  new ApproximateTests
 )
 
diff --git a/src/test/scala/distributed/ApproximateTests.scala b/src/test/scala/distributed/ApproximateTests.scala
new file mode 100644
index 0000000000000000000000000000000000000000..e688899ada25f04f6e58661608ac480b99dc9fdf
--- /dev/null
+++ b/src/test/scala/distributed/ApproximateTests.scala
@@ -0,0 +1,63 @@
+package test.distributed
+
+import breeze.linalg._
+import breeze.numerics._
+import org.scalatest.funsuite.AnyFunSuite
+import org.scalatest.BeforeAndAfterAll
+import shared.predictions._
+import test.shared.helpers._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.SparkContext
+
+class ApproximateTests extends AnyFunSuite with BeforeAndAfterAll {
+  
+   val separator = "\t"
+   val train2Path = "data/ml-100k/u2.base"
+   val test2Path = "data/ml-100k/u2.test"
+   var train2 : CSCMatrix[Double] = null
+   var test2 : CSCMatrix[Double] = null
+   var sc : SparkContext = null
+
+   override def beforeAll {
+     train2 = load(train2Path, separator, 943, 1682)
+     test2 = load(test2Path, separator, 943, 1682)
+
+     val spark = SparkSession.builder().master("local[2]").getOrCreate();
+     spark.sparkContext.setLogLevel("ERROR")
+     sc = spark.sparkContext
+   }
+
+   // Provide tests to show how to call your code to do the following tasks.
+   // Ensure you use the same function calls to produce the JSON outputs in
+   // the corresponding application.
+   // Add assertions with the answer you expect from your code, up to the 4th
+   // decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
+   test("Approximate kNN predictor with 10 partitions and replication of 2") { 
+    var partitionedUsers : Seq[Set[Int]] = partitionUsers(
+      943, 
+      10, 
+      2 
+    )
+
+     // Similarity between user 1 and itself
+     assert(within(1.0, 0.0, 0.0001))
+ 
+     // Similarity between user 1 and 864
+     assert(within(1.0, 0.0, 0.0001))
+
+     // Similarity between user 1 and 344
+     assert(within(1.0, 0.0, 0.0001))
+
+     // Similarity between user 1 and 16
+     assert(within(1.0, 0.0, 0.0001))
+
+     // Similarity between user 1 and 334
+     assert(within(1.0, 0.0, 0.0001))
+
+     // Similarity between user 1 and 2
+     assert(within(1.0, 0.0, 0.0001))
+
+     // MAE on test
+     assert(within(1.0, 0.0, 0.0001))
+   } 
+}
diff --git a/src/test/scala/distributed/DistributedBaselineTests.scala b/src/test/scala/distributed/DistributedBaselineTests.scala
deleted file mode 100644
index 927f925546070e78134e831627c5e7cacc25b9ae..0000000000000000000000000000000000000000
--- a/src/test/scala/distributed/DistributedBaselineTests.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-package test.distributed
-
-import org.scalatest._
-
-import funsuite._
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-
-import shared.predictions._
-import tests.shared.helpers._
-
-class DistributedBaselineTests extends AnyFunSuite with BeforeAndAfterAll {
-
-   val separator = "\t"
-   var spark : org.apache.spark.sql.SparkSession = _
-
-   val train2Path = "data/ml-100k/u2.base"
-   val test2Path = "data/ml-100k/u2.test"
-   var train2 : org.apache.spark.rdd.RDD[shared.predictions.Rating] = null
-   var test2 : org.apache.spark.rdd.RDD[shared.predictions.Rating] = null
-
-   override def beforeAll {
-       Logger.getLogger("org").setLevel(Level.OFF)
-       Logger.getLogger("akka").setLevel(Level.OFF)
-       spark = SparkSession.builder()
-           .master("local[1]")
-           .getOrCreate()
-       spark.sparkContext.setLogLevel("ERROR")
-       train2 = load(spark, train2Path, separator)
-       test2 = load(spark, test2Path, separator)
-   }
-
-   // All the functions definitions for the tests below (and the tests in other suites) 
-   // should be in a single library, 'src/main/scala/shared/predictions.scala'.
-
-   // Provide tests to show how to call your code to do the following tasks (each in with their own test):
-   // each method should be invoked with a single function call. 
-   // Ensure you use the same function calls to produce the JSON outputs in
-   // src/main/scala/predict/Baseline.scala.
-   // Add assertions with the answer you expect from your code, up to the 4th
-   // decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
-   test("Compute global average")                           { assert(within(1.0, 0.0, 0.0001)) }
-   test("Compute user 1 average")                           { assert(within(1.0, 0.0, 0.0001)) }
-   test("Compute item 1 average")                           { assert(within(1.0, 0.0, 0.0001)) }
-   test("Compute item 1 average deviation")                 { assert(within(1.0, 0.0, 0.0001)) }
-   test("Compute baseline prediction for user 1 on item 1") { assert(within(1.0, 0.0, 0.0001)) }
-
-   // Show how to compute the MAE on all four non-personalized methods:
-   // 1. There should be four different functions, one for each method, to create a predictor
-   // with the following signature: ````predictor: (train: Seq[shared.predictions.Rating]) => ((u: Int, i: Int) => Double)````;
-   // 2. There should be a single reusable function to compute the MAE on the test set, given a predictor;
-   // 3. There should be invocations of both to show they work on the following datasets.
-   test("MAE on all four non-personalized methods on data/ml-100k/u2.base and data/ml-100k/u2.test") {
-     assert(within(1.0, 0.0, 0.0001))
-     assert(within(1.0, 0.0, 0.0001))
-     assert(within(1.0, 0.0, 0.0001))
-     assert(within(1.0, 0.0, 0.0001))
-   }
-}
diff --git a/src/test/scala/distributed/ExactTests.scala b/src/test/scala/distributed/ExactTests.scala
new file mode 100644
index 0000000000000000000000000000000000000000..ad94055585c8652c8b7e2791d189c71ef3918ae1
--- /dev/null
+++ b/src/test/scala/distributed/ExactTests.scala
@@ -0,0 +1,52 @@
+package test.distributed
+
+import breeze.linalg._
+import breeze.numerics._
+import org.scalatest.funsuite.AnyFunSuite
+import org.scalatest.BeforeAndAfterAll
+import shared.predictions._
+import test.shared.helpers._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.SparkContext
+
+class ExactTests extends AnyFunSuite with BeforeAndAfterAll {
+  
+   val separator = "\t"
+   val train2Path = "data/ml-100k/u2.base"
+   val test2Path = "data/ml-100k/u2.test"
+   var train2 : CSCMatrix[Double] = null
+   var test2 : CSCMatrix[Double] = null
+   var sc : SparkContext = null
+
+   override def beforeAll {
+     train2 = load(train2Path, separator, 943, 1682)
+     test2 = load(test2Path, separator, 943, 1682)
+
+     val spark = SparkSession.builder().master("local[2]").getOrCreate();
+     spark.sparkContext.setLogLevel("ERROR")
+     sc = spark.sparkContext
+   }
+
+   // Provide tests to show how to call your code to do the following tasks.
+   // Ensure you use the same function calls to produce the JSON outputs in
+   // the corresponding application.
+   // Add assertions with the answer you expect from your code, up to the 4th
+   // decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
+   test("kNN predictor with k=10") { 
+
+     // Similarity between user 1 and itself
+     assert(within(1.0, 0.0, 0.0001))
+ 
+     // Similarity between user 1 and 864
+     assert(within(1.0, 0.0, 0.0001))
+
+     // Similarity between user 1 and 886
+     assert(within(1.0, 0.0, 0.0001))
+
+     // Prediction user 1 and item 1
+     assert(within(1.0, 0.0, 0.0001))
+
+     // MAE on test
+     assert(within(1.0, 0.0, 0.0001)) 
+   } 
+}
diff --git a/src/test/scala/optimizing/OptimizingTests.scala b/src/test/scala/optimizing/OptimizingTests.scala
new file mode 100644
index 0000000000000000000000000000000000000000..b13b1c9aaa5994b585e7b9793ed1b20c7d6ae372
--- /dev/null
+++ b/src/test/scala/optimizing/OptimizingTests.scala
@@ -0,0 +1,47 @@
+package test.optimizing
+
+import breeze.linalg._
+import breeze.numerics._
+import org.scalatest.funsuite.AnyFunSuite
+import org.scalatest.BeforeAndAfterAll
+import shared.predictions._
+import test.shared.helpers._
+
+class OptimizingTests extends AnyFunSuite with BeforeAndAfterAll {
+  
+   val separator = "\t"
+   val train2Path = "data/ml-100k/u2.base"
+   val test2Path = "data/ml-100k/u2.test"
+   var train2 : CSCMatrix[Double] = null
+   var test2 : CSCMatrix[Double] = null
+
+   override def beforeAll {
+       // For these questions, train and test are collected in a scala Array
+       // to not depend on Spark
+       train2 = load(train2Path, separator, 943, 1682)
+       test2 = load(test2Path, separator, 943, 1682)
+   }
+
+   // Provide tests to show how to call your code to do the following tasks.
+   // Ensure you use the same function calls to produce the JSON outputs in
+   // the corresponding application.
+   // Add assertions with the answer you expect from your code, up to the 4th
+   // decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
+   test("kNN predictor with k=10") { 
+
+     // Similarity between user 1 and itself
+     assert(within(1.0, 0.0, 0.0001))
+ 
+     // Similarity between user 1 and 864
+     assert(within(1.0, 0.0, 0.0001))
+
+     // Similarity between user 1 and 886
+     assert(within(1.0, 0.0, 0.0001))
+
+     // Prediction user 1 and item 1
+     assert(within(1.0, 0.0, 0.0001))
+
+     // MAE on test2
+     assert(within(1.0, 0.0, 0.0001)) 
+   } 
+}
diff --git a/src/test/scala/predict/BaselineTests.scala b/src/test/scala/predict/BaselineTests.scala
deleted file mode 100644
index 65765f8ce4df6007020ced25683d5822643e250b..0000000000000000000000000000000000000000
--- a/src/test/scala/predict/BaselineTests.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-package test.predict
-
-import org.scalatest._
-import funsuite._
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-
-import shared.predictions._
-import tests.shared.helpers._
-import ujson._
-
-class BaselineTests extends AnyFunSuite with BeforeAndAfterAll {
-
-   val separator = "\t"
-   var spark : org.apache.spark.sql.SparkSession = _
-
-   val train2Path = "data/ml-100k/u2.base"
-   val test2Path = "data/ml-100k/u2.test"
-   var train2 : Array[shared.predictions.Rating] = null
-   var test2 : Array[shared.predictions.Rating] = null
-
-   override def beforeAll {
-       Logger.getLogger("org").setLevel(Level.OFF)
-       Logger.getLogger("akka").setLevel(Level.OFF)
-       spark = SparkSession.builder()
-           .master("local[1]")
-           .getOrCreate()
-       spark.sparkContext.setLogLevel("ERROR")
-
-       // For these questions, train and test are collected in a scala Array
-       // to not depend on Spark
-       train2 = load(spark, train2Path, separator).collect()
-       test2 = load(spark, test2Path, separator).collect()
-   }
-
-   // All the functions definitions for the tests below (and the tests in other suites) 
-   // should be in a single library, 'src/main/scala/shared/predictions.scala'.
-
-   // Provide tests to show how to call your code to do the following tasks (each in with their own test):
-   // each method should be invoked with a single function call. 
-   // Ensure you use the same function calls to produce the JSON outputs in
-   // src/main/scala/predict/Baseline.scala.
-   // Add assertions with the answer you expect from your code, up to the 4th
-   // decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
-   test("Compute global average")                           { assert(within(1.0, 0.0, 0.0001)) }
-   test("Compute user 1 average")                           { assert(within(1.0, 0.0, 0.0001)) }
-   test("Compute item 1 average")                           { assert(within(1.0, 0.0, 0.0001)) }
-   test("Compute item 1 average deviation")                 { assert(within(1.0, 0.0, 0.0001)) }
-   test("Compute baseline prediction for user 1 on item 1") { assert(within(1.0, 0.0, 0.0001)) }
-
-   // Show how to compute the MAE on all four non-personalized methods:
-   // 1. There should be four different functions, one for each method, to create a predictor
-   // with the following signature: ````predictor: (train: Seq[shared.predictions.Rating]) => ((u: Int, i: Int) => Double)````;
-   // 2. There should be a single reusable function to compute the MAE on the test set, given a predictor;
-   // 3. There should be invocations of both to show they work on the following datasets.
-   test("MAE on all four non-personalized methods on data/ml-100k/u2.base and data/ml-100k/u2.test") {
-     assert(within(1.0, 0.0, 0.0001))
-     assert(within(1.0, 0.0, 0.0001))
-     assert(within(1.0, 0.0, 0.0001))
-     assert(within(1.0, 0.0, 0.0001))
-   }
-}
diff --git a/src/test/scala/predict/PersonalizedTests.scala b/src/test/scala/predict/PersonalizedTests.scala
deleted file mode 100644
index 068597940ca231d57e31567b28b70120f0b433d9..0000000000000000000000000000000000000000
--- a/src/test/scala/predict/PersonalizedTests.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-package test.predict
-
-import org.scalatest._
-import funsuite._
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-
-import shared.predictions._
-import tests.shared.helpers._
-import ujson._
-
-class PersonalizedTests extends AnyFunSuite with BeforeAndAfterAll {
-
-   val separator = "\t"
-   var spark : org.apache.spark.sql.SparkSession = _
-
-   val train2Path = "data/ml-100k/u2.base"
-   val test2Path = "data/ml-100k/u2.test"
-   var train2 : Array[shared.predictions.Rating] = null
-   var test2 : Array[shared.predictions.Rating] = null
-
-   override def beforeAll {
-       Logger.getLogger("org").setLevel(Level.OFF)
-       Logger.getLogger("akka").setLevel(Level.OFF)
-       spark = SparkSession.builder()
-           .master("local[1]")
-           .getOrCreate()
-       spark.sparkContext.setLogLevel("ERROR")
-       // For these questions, train and test are collected in a scala Array
-       // to not depend on Spark
-       train2 = load(spark, train2Path, separator).collect()
-       test2 = load(spark, test2Path, separator).collect()
-   }
-
-   // All the functions definitions for the tests below (and the tests in other suites) 
-   // should be in a single library, 'src/main/scala/shared/predictions.scala'.
-
-   // Provide tests to show how to call your code to do the following tasks.
-   // Ensure you use the same function calls to produce the JSON outputs in
-   // src/main/scala/predict/Baseline.scala.
-   // Add assertions with the answer you expect from your code, up to the 4th
-   // decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
-   test("Test uniform unary similarities") { 
-     // Create predictor with uniform similarities
-     
-     // Compute personalized prediction for user 1 on item 1
-     assert(within(1.0, 0.0, 0.0001))
-
-     // MAE 
-     assert(within(1.0, 0.0, 0.0001))
-   } 
-
-   test("Test ajusted cosine similarity") { 
-     // Create predictor with adjusted cosine similarities
-
-     // Similarity between user 1 and user 2
-     assert(within(1.0, 0.0, 0.0001))
-
-     // Compute personalized prediction for user 1 on item 1
-     assert(within(1.0, 0.0, 0.0001))
-
-     // MAE 
-     assert(within(1.0, 0.0, 0.0001))
-   }
-
-   test("Test jaccard similarity") { 
-     // Create predictor with jaccard similarities
-
-     // Similarity between user 1 and user 2
-     assert(within(1.0, 0.0, 0.0001))
-
-     // Compute personalized prediction for user 1 on item 1
-     assert(within(1.0, 0.0, 0.0001))
-
-     // MAE 
-     assert(within(1.0, 0.0, 0.0001))
-   }
-}
diff --git a/src/test/scala/predict/kNNTests.scala b/src/test/scala/predict/kNNTests.scala
deleted file mode 100644
index fe4c348821cbb3ff950917af53c01c5b3d96e8b3..0000000000000000000000000000000000000000
--- a/src/test/scala/predict/kNNTests.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-package test.predict
-
-import org.scalatest._
-import funsuite._
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-
-import shared.predictions._
-import tests.shared.helpers._
-import ujson._
-
-class kNNTests extends AnyFunSuite with BeforeAndAfterAll {
-
-   val separator = "\t"
-   var spark : org.apache.spark.sql.SparkSession = _
-
-   val train2Path = "data/ml-100k/u2.base"
-   val test2Path = "data/ml-100k/u2.test"
-   var train2 : Array[shared.predictions.Rating] = null
-   var test2 : Array[shared.predictions.Rating] = null
-
-   var adjustedCosine : Map[Int, Map[Int, Double]] = null
-
-   override def beforeAll {
-       Logger.getLogger("org").setLevel(Level.OFF)
-       Logger.getLogger("akka").setLevel(Level.OFF)
-       spark = SparkSession.builder()
-           .master("local[1]")
-           .getOrCreate()
-       spark.sparkContext.setLogLevel("ERROR")
-
-       // For these questions, train and test are collected in a scala Array
-       // to not depend on Spark
-       train2 = load(spark, train2Path, separator).collect()
-       test2 = load(spark, test2Path, separator).collect()
-   }
-
-   // All the functions definitions for the tests below (and the tests in other suites) 
-   // should be in a single library, 'src/main/scala/shared/predictions.scala'.
-
-   // Provide tests to show how to call your code to do the following tasks.
-   // Ensure you use the same function calls to produce the JSON outputs in
-   // src/main/scala/predict/Baseline.scala.
-   // Add assertions with the answer you expect from your code, up to the 4th
-   // decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
-   test("kNN predictor with k=10") { 
-     // Create predictor on train2
-
-     // Similarity between user 1 and itself
-     assert(within(1.0, 0.0, 0.0001))
- 
-     // Similarity between user 1 and 864
-     assert(within(1.0, 0.0, 0.0001))
-
-     // Similarity between user 1 and 886
-     assert(within(1.0, 0.0, 0.0001))
-
-     // Prediction user 1 and item 1
-     assert(within(1.0, 0.0, 0.0001))
-
-     // MAE on test2 
-     assert(within(1.0, 0.0, 0.0001))
-   } 
-
-   test("kNN Mae") {
-     // Compute MAE for k around the baseline MAE
-     
-     // Ensure the MAEs are indeed lower/higher than baseline
-     assert(1.0 < 0.0)
-   }
-}
diff --git a/src/test/scala/recommend/RecommenderTests.scala b/src/test/scala/recommend/RecommenderTests.scala
deleted file mode 100644
index 4a1a420257a73688882ccbbcf975672673978946..0000000000000000000000000000000000000000
--- a/src/test/scala/recommend/RecommenderTests.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-package test.recommend
-
-import org.scalatest._
-import funsuite._
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-
-import shared.predictions._
-import tests.shared.helpers._
-import ujson._
-
-class RecommenderTests extends AnyFunSuite with BeforeAndAfterAll {
-
-   val separator = "\t"
-   var spark : org.apache.spark.sql.SparkSession = _
-
-   val dataPath = "data/ml-100k/u.data"
-   val personalPath = "data/personal.csv"
-   var data : Array[shared.predictions.Rating] = null
-   var personal : Array[shared.predictions.Rating] = null
-   var train : Array[shared.predictions.Rating] = null
-   var predictor : (Int, Int) => Double = null
-
-   override def beforeAll {
-     Logger.getLogger("org").setLevel(Level.OFF)
-     Logger.getLogger("akka").setLevel(Level.OFF)
-     spark = SparkSession.builder()
-         .master("local[1]")
-         .getOrCreate()
-     spark.sparkContext.setLogLevel("ERROR")
-    
-     data = load(spark, dataPath, separator).collect()
-
-     println("Loading personal data from: " + personalPath) 
-     val personalFile = spark.sparkContext.textFile(personalPath)
-     personal = personalFile.map(l => {
-         val cols = l.split(",").map(_.trim)
-         if (cols(0) == "id") 
-           Rating(944,0,0.0)
-         else 
-           if (cols.length < 3) 
-             Rating(944, cols(0).toInt, 0.0)
-           else
-             Rating(944, cols(0).toInt, cols(2).toDouble)
-     }).filter(r => r.rating != 0).collect()
-
-     // TODO: Create predictor
-   }
-
-   // All the functions definitions for the tests below (and the tests in other suites) 
-   // should be in a single library, 'src/main/scala/shared/predictions.scala'.
-   //
-   test("Prediction for user 1 of item 1") {
-     assert(within(1.0, 0.0, 0.0001))
-   }
-
-   test("Top 3 recommendations for user 944") {
-     val recommendations = List((1,0.0), (2,0.0), (3,0.0))
-     assert(recommendations(0)._1 == 4)
-     assert(within(recommendations(0)._2, 5.0, 0.0001))
-     // Idem recommendation 2 and 3
-   }
-
-}
diff --git a/src/test/scala/shared/helpers.scala b/src/test/scala/shared/helpers.scala
index f7c07691a8f57158342bcc4514020db0815a2693..44dc782ce6f30b766df91db243a320bafdcc550d 100644
--- a/src/test/scala/shared/helpers.scala
+++ b/src/test/scala/shared/helpers.scala
@@ -1,9 +1,8 @@
-package tests.shared
+package test.shared
 
 package object helpers {
 
   def within(actual :Double, expected :Double, interval :Double) : Boolean = {
     return actual >= (expected - interval) && actual <= (expected + interval)
   }
-
 }
diff --git a/timeCluster.sh b/timeCluster.sh
deleted file mode 100755
index c73d0ad20226fc4dca6083b5dc0234f9b2460a4c..0000000000000000000000000000000000000000
--- a/timeCluster.sh
+++ /dev/null
@@ -1,16 +0,0 @@
-#!/usr/bin/env bash
-# If your default java install does not work, explicitly 
-# provide the path to the JDK 1.8 installation. On OSX
-# with homebrew:
-# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282; ./run.sh
-export JAVA_OPTS="-Xmx8G";
-RUN=./logs/timecluster-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
-mkdir -p $RUN
-LOGS=$RUN/log.txt
-source ./config.sh 
-echo "------------------- DISTRIBUTED ---------------------" >> $LOGS
-sbt assembly
-# 1 Executor
-spark-submit --class distributed.DistributedBaseline --master $SPARKMASTER --num-executors 1 --conf "spark.dynamicAllocation.enabled=false" target/scala-2.11/m1_yourid-assembly-1.0.jar  --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/distributed-25m-1.json --num_measurements 3 2>&1 >>$LOGS
-# 4 Executors
-spark-submit --class distributed.DistributedBaseline --master $SPARKMASTER --num-executors 4 --conf "spark.dynamicAllocation.enabled=false" target/scala-2.11/m1_yourid-assembly-1.0.jar  --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/distributed-25m-4.json --num_measurements 3 2>&1 >>$LOGS
diff --git a/timeOthers.sh b/timeOthers.sh
deleted file mode 100755
index 6dd08e6c6c87a7eea93977f41c1a6d2fc57aba81..0000000000000000000000000000000000000000
--- a/timeOthers.sh
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/usr/bin/env bash
-# If your default java install does not work, explicitly 
-# provide the path to the JDK 1.8 installation. On OSX
-# with homebrew:
-# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282;
-export JAVA_OPTS="-Xmx8G";
-RUN=./logs/timeOthers-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
-mkdir -p $RUN
-LOGS=$RUN/log.txt
-echo "------------------- BASELINE    ---------------------" >> $LOGS
-sbt "runMain predict.Baseline --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json $RUN/baseline-100k.json --num_measurements 3" 2>&1 >>$LOGS
-echo "------------------- DISTRIBUTED ---------------------" >> $LOGS
-sbt "runMain predict.Baseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --separator , --json $RUN/baseline-25m.json --num_measurements 3" 2>&1 >> $LOGS
-sbt "runMain distributed.DistributedBaseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --separator , --json $RUN/distributed-25m-1.json --num_measurements 3 --master local[1]" 2>&1 >>$LOGS
-sbt "runMain distributed.DistributedBaseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --separator , --json $RUN/distributed-25m-4.json --num_measurements 3 --master local[4]" 2>&1 >>$LOGS
diff --git a/timeTrials.sh b/timeTrials.sh
deleted file mode 100755
index 7329d07fb0bcf99fb9dc968b9f0d63a223b6abc0..0000000000000000000000000000000000000000
--- a/timeTrials.sh
+++ /dev/null
@@ -1,11 +0,0 @@
-#!/usr/bin/env bash
-# If your default java install does not work, explicitly 
-# provide the path to the JDK 1.8 installation. On OSX
-# with homebrew:
-# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282;
-export JAVA_OPTS="-Xmx8G";
-RUN=./logs/timetrials-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
-mkdir -p $RUN
-LOGS=$RUN/log.txt
-echo "------------------- KNN -----------------------------" >> $LOGS
-sbt "runMain predict.kNN --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json $RUN/knn-100k.json --num_measurements 3" 2>&1 >>$LOGS
diff --git a/timing.sh b/timing.sh
new file mode 100755
index 0000000000000000000000000000000000000000..8b64c79946dbd49a2e18c52d0c261cc42a555b21
--- /dev/null
+++ b/timing.sh
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+# If your default java install does not work, explicitly 
+# provide the path to the JDK 1.8 installation. On OSX
+# with homebrew:
+# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282; ./run.sh
+export JAVA_OPTS="-Xmx8G";
+RUN=./logs/timing-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
+mkdir -p $RUN
+LOGS=$RUN/log.txt
+source ./config.sh 
+echo "------------------- OPTIMIZING    ---------------------" >> $LOGS
+sbt "runMain scaling.Optimizing --train $ML100Ku2base --test $ML100Ku2test --json $RUN/optimized-100k.json --users 943 --movies 1682 --num_measurements 3" 2>&1 >>$LOGS
+echo "------------------- DISTRIBUTED EXACT ---------------------" >> $LOGS
+for W in 1 2 4; do
+    sbt "runMain distributed.Exact --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/exact-1m-$W.json --k 300 --master local[$W] --users 6040 --movies 3952 --num_measurements 3" 2>&1 >>$LOGS;
+done
+echo "------------------- APPROXIMATE EXACT ---------------------" >> $LOGS
+for W in 1 2 4; do
+    sbt "runMain distributed.Approximate --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/approximate-1m-$W.json --k 300 --master local[$W] --users 6040 --movies 3952 --num_measurements 3" 2>&1 >>$LOGS;
+done