From c66fb035d0071db790045f08c3771af7fbccf709 Mon Sep 17 00:00:00 2001
From: Erick Lavoie <erick.lavoie@epfl.ch>
Date: Wed, 16 Feb 2022 21:04:42 +0100
Subject: [PATCH] Updated for 2022 version of the course

---
 .gitignore                                    |   2 +
 README.md                                     | 181 +++++++++++++++---
 build.sbt                                     |  17 +-
 config.sh                                     |  16 ++
 data/personal.csv                             | 153 +++++++--------
 logs/README.md                                |   1 +
 project/plugins.sbt                           |   1 +
 run.sh                                        |  21 ++
 .../distributed/DistributedBaseline.scala     |  94 +++++++++
 src/main/scala/predict/Baseline.scala         | 108 +++++++++++
 src/main/scala/predict/Personalized.scala     |  85 ++++++++
 src/main/scala/predict/Predictor.scala        | 114 -----------
 src/main/scala/predict/kNN.scala              |  97 ++++++++++
 src/main/scala/recommend/Recommender.scala    |  66 ++++---
 src/main/scala/shared/predictions.scala       |  46 +++++
 src/main/scala/stats/Analyzer.scala           |  93 ---------
 src/test/scala/AllTests.scala                 |  16 ++
 src/test/scala/PredictionTest.scala           |   6 -
 src/test/scala/RecommendationTest.scala       |   6 -
 .../DistributedBaselineTests.scala            |  62 ++++++
 src/test/scala/predict/BaselineTests.scala    |  65 +++++++
 .../scala/predict/PersonalizedTests.scala     |  81 ++++++++
 src/test/scala/predict/kNNTests.scala         |  74 +++++++
 .../scala/recommend/RecommenderTests.scala    |  67 +++++++
 src/test/scala/shared/helpers.scala           |   9 +
 test.sh                                       |  10 +
 timeCluster.sh                                |  16 ++
 timeOthers.sh                                 |  15 ++
 timeTrials.sh                                 |  11 ++
 29 files changed, 1172 insertions(+), 361 deletions(-)
 create mode 100755 config.sh
 create mode 100644 logs/README.md
 create mode 100755 run.sh
 create mode 100644 src/main/scala/distributed/DistributedBaseline.scala
 create mode 100644 src/main/scala/predict/Baseline.scala
 create mode 100644 src/main/scala/predict/Personalized.scala
 delete mode 100644 src/main/scala/predict/Predictor.scala
 create mode 100644 src/main/scala/predict/kNN.scala
 create mode 100644 src/main/scala/shared/predictions.scala
 delete mode 100644 src/main/scala/stats/Analyzer.scala
 create mode 100644 src/test/scala/AllTests.scala
 delete mode 100644 src/test/scala/PredictionTest.scala
 delete mode 100644 src/test/scala/RecommendationTest.scala
 create mode 100644 src/test/scala/distributed/DistributedBaselineTests.scala
 create mode 100644 src/test/scala/predict/BaselineTests.scala
 create mode 100644 src/test/scala/predict/PersonalizedTests.scala
 create mode 100644 src/test/scala/predict/kNNTests.scala
 create mode 100644 src/test/scala/recommend/RecommenderTests.scala
 create mode 100644 src/test/scala/shared/helpers.scala
 create mode 100755 test.sh
 create mode 100755 timeCluster.sh
 create mode 100755 timeOthers.sh
 create mode 100755 timeTrials.sh

diff --git a/.gitignore b/.gitignore
index 3ce4e01..d30092b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,8 +3,10 @@
 **/*.swp
 data/.DS_Store
 data/ml-100k
+data/ml-25m
 project/project
 project/target
 src/main/scala/project/
 src/main/scala/target/
 target/
+logs/
diff --git a/README.md b/README.md
index 7950df6..6ad4c8a 100644
--- a/README.md
+++ b/README.md
@@ -1,71 +1,195 @@
 # Milestone Description
 
-[Milestone-1.pdf](./Milestone-1.pdf)
+[To Be Released](./Milestone-1.pdf)
 
-Note: Section 7 (Updates) lists the updates since the original release of the Milestone on February 23rd.
+Note: Section 'Updates' lists the updates since the original release of the Milestone..
 
 # Dependencies
 
 ````
     sbt >= 1.4.7
+    openjdk@8
 ````
 
-Should be available by default on the IC Cluster. Otherwise, refer to each project installation instructions.
+Should be available by default on ````iccluster028.iccluster.epfl.ch````. Otherwise, refer to each project installation instructions. Prefer working locally on your own machine, you will have less interference in your measurements from other students.
 
-# Dataset
-
-Download the ````ml-100k.zip```` dataset in the ````data/```` folder:
+If you have multiple installations of openjdk, you need to specify the one to use as JAVA_HOME, e.g. on OSX with
+openjdk@8 installed through Homebrew, you would do:
 ````
-> mkdir -p data
-> cd data
-> wget http://files.grouplens.org/datasets/movielens/ml-100k.zip   
+    export JAVA_HOME="/usr/local/Cellar/openjdk@8/1.8.0+282";
 ````
 
-Check the integrity of the file with (it should give the same number as below):
-````
-> md5 -q ml-100k.zip
-0e33842e24a9c977be4e0107933c0723 
-````
+# Dataset
+
+Download [data.zip](https://gitlab.epfl.ch/sacs/cs-449-sds-public/project/dataset/-/raw/main/data.zip).
 
 Unzip:
 ````
-> unzip ml-100k.zip
+> unzip data.zip
 ````
 
+It should unzip into ````data/```` by default. If not, manually move ````ml-100k```` and ````ml-25m```` into ````data/````.
+
+
 # Personal Ratings
 
-Add your ratings in the 'data/personal.csv' file, by providing a numerical rating between [1,5] for at least 20 movies. For example, to rate the 'Toy Story' movie with '5', modify this line:
+Additional personal ratings are provided in the 'data/personal.csv' file in a
+csv format with ````<movie>, <movie title>, <rating>```` to test your recommender.
+You can copy this file and change the ratings, with values [1,5] to obtain
+references more to your liking!
 
+Entries with no rating are in the following format:
 ````
 1,Toy Story (1995),
 ````
 
-to this:
+Entries with ratings are in the following format:
 ````
 1,Toy Story (1995),5
 ````
 
-Do include your own ratings in your final submission so we can check your answers against those provided in your report.
+# Repository Structure
+
+````src/main/scala/shared/predictions.scala````:
+All the functionalities of your code for all questions should be defined there.
+This code should then be used in the following applications and tests.
+
+## Applications
+
+````src/main/scala/predict/Baseline.scala````: Output answers to questions **B.X**.
+````src/main/scala/distributed/DistributedBaseline.scala````: Output answers to questions **D.X**.
+````src/main/scala/predict/Personalized.scala````: Output answers to questions questions **P.X**.
+````src/main/scala/predict/kNN.scala````: Output answers to questions questions **N.X**.
+````src/main/scala/recommend/Recommender.scala````: Output answers to questions questions **N.X**.
+
+Applications are separate from tests to make it easier to test with different
+inputs and permit outputting your answers and timings in JSON format for easier
+grading.
+
+## Unit Tests
+
+Corresponding unit tests for each application:
+
+````
+    src/test/scala/predict/BaselineTests.scala
+    src/test/scala/distributed/DistributedBaselineTests.scala
+    src/test/scala/predict/PersonalizedTests.scala
+    src/test/scala/predict/kNNTests.scala
+    src/test/scala/recommend/RecommenderTests.scala
+````
+
+Your tests should demonstrate how to call your code to obtain the answers of
+the applications, and should make exactly the same calls as for the
+applications above. This structure intentionally encourages you to put as
+little as possible functionality in the application. This also gives the TA a
+clear and regular structure to check its correctness.
 
 # Usage
 
-## Compute statistics
+## Execute unit tests
+
+````sbt "testOnly test.AllTests"````
 
+You should fill all tests and ensure they all succeed prior to submission.
+
+## Run applications 
+
+### Baseline
+
+On ````ml-100k````:
 ````
-> sbt "runMain stats.Analyzer --data data/ml-100k/u.data --json statistics.json"
+    sbt "runMain predict.Baseline --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json baseline-100k.json"
 ````
 
-## Compute predictions
+On ````ml-25m````:
+````
+    sbt "runMain predict.Baseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --json baseline-25m.json"
+````
+
+### Distributed Baseline
 
 ````
-> sbt "runMain predict.Predictor --train data/ml-100k/u1.base --test data/ml-100k/u1.test --json predictions.json"
+    sbt "runMain distributed.DistributedBaseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test  --separator , --json distributed-25m-4.json --master local[4]"
 ````
 
-## Compute recommendations
+You can vary the number of executors used locally by using ````local[X]```` with X being an integer representing the number of cores you want to use locally.
+
+### Personalized
+
 ````
-> sbt "runMain recommend.Recommender --data data/ml-100k/u.data --personal data/personal.csv --json recommendations.json"
+    sbt "runMain predict.Personalized --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json personalized-100k.json"
 ````
 
+### kNN
+
+````
+    sbt "runMain predict.kNN --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json knn-100k.json"
+````
+
+### Recommender
+
+````
+    sbt "runMain recommend.Recommender --data data/ml-100k/u.data --personal data/personal.csv --json recommender-100k.json"
+````
+
+## Time applications
+
+For all the previous applications, you can set the number of measurements for timings by adding the following option ````--num_measurements X```` where X is an integer. The default value is ````0````.
+
+## IC Cluster
+
+Test your application locally as much as possible and only test on the iccluster
+once everything works, to keep the cluster and the driver node maximally available
+for other students.
+
+### Assemble Application for Spark Submit
+
+````sbt clean````: clean up temporary files and previous assembly packages.
+
+````sbt assembly````: create a new jar
+````target/scala-2.11/m1_yourid-assembly-1.0.jar```` that can be used with
+````spark-submit````.
+
+Prefer packaging your application locally and upload the tar archive of your application
+before running on cluster.
+
+### Upload jar on Cluster 
+
+````
+    scp target/scala-2.11/m1_yourid-assembly-1.0.jar <username>@iccluster028.iccluster.epfl.ch:~
+````
+
+### Run on Cluster
+
+````
+spark-submit --class distributed.DistributedBaseline --master yarn --num-executors 1 target/scala-2.11/m1_yourid-assembly-1.0.jar  --train TRAIN --test TEST --separator , --json distributed-25m-1.json --num_measurements 1
+````
+
+See [config.sh](./config.sh) for HDFS paths to pre-uploaded TRAIN and TEST datasets. You can vary the number of executors with ````--num-executors X````, and number of measurements with ````--num_measurements Y````.
+
+## Grading scripts
+
+We will use the following scripts to grade your submission:
+
+    1. ````./test.sh````: Run all unit tests.
+    2. ````./run.sh````: Run all applications without timing measurements.
+    3. ````./timeTrials.sh````: Time applications to determine which student implementations are fastest.
+    4. ````./timeOthers.sh````: Time applications to check report answers against independent measurements. 
+    4. ````./timeCluster.sh````: Package and time applications on Spark Cluster.
+
+All scripts will produce execution logs in the ````logs````
+directory, including answers produced in the JSON format. Logs directories are
+in the format ````logs/<scriptname>-<datetime>-<machine>/```` and include at
+least an execution log ````log.txt```` as well as possible JSON outputs from
+applications. 
+
+Ensure all scripts run correctly locally before submitting. Avoid running
+````timeCluster.sh```` on iccluster as the packaging and measurements will
+interfere with other students working on their Milestone at the same time. If
+````timeCluster.sh```` correctly runs locally on your machine, this should be
+sufficient.
+
+
 ## Package for submission
 
 Steps:
@@ -75,9 +199,10 @@ Steps:
     3. Remove ````project/project````, ````project/target````, and ````target/````.  
     4. Test that all previous commands for generating statistics, predictions, and recommendations correctly produce a JSON file (after downloading/reinstalling dependencies).
     5. Remove the ml-100k dataset (````data/ml-100k.zip````, and ````data/ml-100k````), as well as the````project/project````, ````project/target````, and ````target/````. 
-    6. Add your report and any other necessary files listed in the Milestone description (see ````Deliverables````).
-    7. Zip the archive.
-    8. Submit to the TA for grading.
+    6. Remove the ````.git```` repository information.
+    7. Add your report and any other necessary files listed in the Milestone description (see ````Deliverables````).
+    8. Zip the archive.
+    9. Submit to the TA for grading.
 
 # References
 
@@ -89,8 +214,6 @@ Scallop Argument Parsing: https://github.com/scallop/scallop/wiki
 
 Spark Resilient Distributed Dataset (RDD): https://spark.apache.org/docs/3.0.1/api/scala/org/apache/spark/rdd/RDD.html
 
-JSON Serialization: https://github.com/json4s/json4s#serialization
-
 # Credits
 
 Erick Lavoie (Design, Implementation, Tests)
diff --git a/build.sbt b/build.sbt
index 762b925..78536a5 100644
--- a/build.sbt
+++ b/build.sbt
@@ -1,13 +1,18 @@
 name := "m1_yourid"
 version := "1.0"
-maintainer := "your.name@epfl.ch"
 
-libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.0" % Test
 libraryDependencies += "org.rogach" %% "scallop" % "4.0.2"
-libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.6.10"
-libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"
-libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
+libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.7"
+libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.7"
+libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.0" % Test
+libraryDependencies += "com.lihaoyi" %% "ujson" % "1.5.0"
 
-scalaVersion in ThisBuild := "2.12.13"
+scalaVersion in ThisBuild := "2.11.12"
 enablePlugins(JavaAppPackaging)
+logBuffered in Test := false
 
+test in assembly := {}
+assemblyMergeStrategy in assembly := {   
+  case PathList("META-INF", xs @ _*) => MergeStrategy.discard   
+  case x => MergeStrategy.first 
+}
diff --git a/config.sh b/config.sh
new file mode 100755
index 0000000..54ad473
--- /dev/null
+++ b/config.sh
@@ -0,0 +1,16 @@
+if [ $(hostname) == 'iccluster028' ]; 
+then  
+    export ML100Ku2base=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-100k/u2.base;
+    export ML100Ku2test=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-100k/u2.test;
+    export ML100Kudata=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-100k/u.data;
+    export ML25Mr2train=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-25m/r2.train;
+    export ML25Mr2test=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-25m/r2.test;
+    export SPARKMASTER='yarn'
+else 
+    export ML100Ku2base=data/ml-100k/u2.base;
+    export ML100Ku2test=data/ml-100k/u2.test;
+    export ML100Kudata=data/ml-100k/u.data;
+    export ML25Mr2train=data/ml-25m/r2.train;
+    export ML25Mr2test=data/ml-25m/r2.test;
+    export SPARKMASTER='local[4]'
+fi;
diff --git a/data/personal.csv b/data/personal.csv
index ee74efa..880b202 100644
--- a/data/personal.csv
+++ b/data/personal.csv
@@ -1,5 +1,6 @@
-1,Toy Story (1995),
-2,GoldenEye (1995),
+id,title,rating
+1,Toy Story (1995),5
+2,GoldenEye (1995),3
 3,Four Rooms (1995),
 4,Get Shorty (1995),
 5,Copycat (1995),
@@ -19,13 +20,13 @@
 19,Antonia's Line (1995),
 20,Angels and Insects (1995),
 21,Muppet Treasure Island (1996),
-22,Braveheart (1995),
+22,Braveheart (1995),3
 23,Taxi Driver (1976),
 24,Rumble in the Bronx (1995),
 25,Birdcage,
 26,Brothers McMullen,
 27,Bad Boys (1995),
-28,Apollo 13 (1995),
+28,Apollo 13 (1995),3
 29,Batman Forever (1995),
 30,Belle de jour (1967),
 31,Crimson Tide (1995),
@@ -47,13 +48,13 @@
 47,Ed Wood (1994),
 48,Hoop Dreams (1994),
 49,I.Q. (1994),
-50,Star Wars (1977),
+50,Star Wars (1977),4
 51,Legends of the Fall (1994),
 52,Madness of King George,
 53,Natural Born Killers (1994),
 54,Outbreak (1995),
 55,Professional,
-56,Pulp Fiction (1994),
+56,Pulp Fiction (1994),5
 57,Priest (1994),
 58,Quiz Show (1994),
 59,Three Colors: Red (1994),
@@ -61,14 +62,14 @@
 61,Three Colors: White (1994),
 62,Stargate (1994),
 63,Santa Clause,
-64,Shawshank Redemption,
+64,Shawshank Redemption,5
 65,What's Eating Gilbert Grape (1993),
 66,While You Were Sleeping (1995),
 67,Ace Ventura: Pet Detective (1994),
 68,Crow,
-69,Forrest Gump (1994),
+69,Forrest Gump (1994),5
 70,Four Weddings and a Funeral (1994),
-71,Lion King,
+71,Lion King,5
 72,Mask,
 73,Maverick (1994),
 74,Faster Pussycat! Kill! Kill! (1965),
@@ -79,24 +80,24 @@
 79,Fugitive,
 80,Hot Shots! Part Deux (1993),
 81,Hudsucker Proxy,
-82,Jurassic Park (1993),
+82,Jurassic Park (1993),3
 83,Much Ado About Nothing (1993),
 84,Robert A. Heinlein's The Puppet Masters (1994),
 85,Ref,
 86,Remains of the Day,
 87,Searching for Bobby Fischer (1993),
 88,Sleepless in Seattle (1993),
-89,Blade Runner (1982),
+89,Blade Runner (1982),3
 90,So I Married an Axe Murderer (1993),
 91,Nightmare Before Christmas,
 92,True Romance (1993),
 93,Welcome to the Dollhouse (1995),
 94,Home Alone (1990),
-95,Aladdin (1992),
-96,Terminator 2: Judgment Day (1991),
+95,Aladdin (1992),4
+96,Terminator 2: Judgment Day (1991),5
 97,Dances with Wolves (1990),
 98,Silence of the Lambs,
-99,Snow White and the Seven Dwarfs (1937),
+99,Snow White and the Seven Dwarfs (1937),1
 100,Fargo (1996),
 101,Heavy Metal (1981),
 102,Aristocats,
@@ -118,13 +119,13 @@
 118,Twister (1996),
 119,Maya Lin: A Strong Clear Vision (1994),
 120,Striptease (1996),
-121,Independence Day (ID4) (1996),
-122,Cable Guy,
+121,Independence Day (ID4) (1996),1
+122,Cable Guy,1
 123,Frighteners,
 124,Lone Star (1996),
 125,Phenomenon (1996),
 126,Spitfire Grill,
-127,Godfather,
+127,Godfather,5
 128,Supercop (1992),
 129,Bound (1996),
 130,Kansas City (1996),
@@ -135,73 +136,73 @@
 135,2001: A Space Odyssey (1968),
 136,Mr. Smith Goes to Washington (1939),
 137,Big Night (1996),
-138,D3: The Mighty Ducks (1996),
+138,D3: The Mighty Ducks (1996),2
 139,Love Bug,
 140,Homeward Bound: The Incredible Journey (1993),
 141,20,
 142,Bedknobs and Broomsticks (1971),
-143,Sound of Music,
-144,Die Hard (1988),
+143,Sound of Music,4
+144,Die Hard (1988),3
 145,Lawnmower Man,
 146,Unhook the Stars (1996),
 147,Long Kiss Goodnight,
 148,Ghost and the Darkness,
 149,Jude (1996),
 150,Swingers (1996),
-151,Willy Wonka and the Chocolate Factory (1971),
+151,Willy Wonka and the Chocolate Factory (1971),4
 152,Sleeper (1973),
 153,Fish Called Wanda,
-154,Monty Python's Life of Brian (1979),
-155,Dirty Dancing (1987),
-156,Reservoir Dogs (1992),
+154,Monty Python's Life of Brian (1979),5
+155,Dirty Dancing (1987),3
+156,Reservoir Dogs (1992),5
 157,Platoon (1986),
 158,Weekend at Bernie's (1989),
 159,Basic Instinct (1992),
 160,Glengarry Glen Ross (1992),
-161,Top Gun (1986),
+161,Top Gun (1986),3
 162,On Golden Pond (1981),
-163,Return of the Pink Panther,
-164,Abyss,
+163,Return of the Pink Panther,4
+164,Abyss,2
 165,Jean de Florette (1986),
 166,Manon of the Spring (Manon des sources) (1986),
 167,Private Benjamin (1980),
-168,Monty Python and the Holy Grail (1974),
+168,Monty Python and the Holy Grail (1974),5
 169,Wrong Trousers,
 170,Cinema Paradiso (1988),
 171,Delicatessen (1991),
-172,Empire Strikes Back,
+172,Empire Strikes Back,2
 173,Princess Bride,
 174,Raiders of the Lost Ark (1981),
 175,Brazil (1985),
 176,Aliens (1986),
-177,The Good The Bad and the Ugly,
+177,"The Good the Bad and the Ugly",5
 178,12 Angry Men (1957),
-179,Clockwork Orange,
-180,Apocalypse Now (1979),
-181,Return of the Jedi (1983),
+179,Clockwork Orange,4
+180,Apocalypse Now (1979),3
+181,Return of the Jedi (1983),3
 182,GoodFellas (1990),
-183,Alien (1979),
-184,Army of Darkness (1993),
-185,Psycho (1960),
+183,Alien (1979),5
+184,Army of Darkness (1993),2
+185,Psycho (1960),4
 186,Blues Brothers,
-187,Godfather: Part II,
-188,Full Metal Jacket (1987),
+187,Godfather: Part II,4
+188,Full Metal Jacket (1987),5
 189,Grand Day Out,
 190,Henry V (1989),
-191,Amadeus (1984),
+191,Amadeus (1984),3
 192,Raging Bull (1980),
 193,Right Stuff,
 194,Sting,
 195,Terminator,
-196,Dead Poets Society (1989),
+196,Dead Poets Society (1989),5
 197,Graduate,
 198,Nikita (La Femme Nikita) (1990),
 199,Bridge on the River Kwai,
 200,Shining,
 201,Evil Dead II (1987),
-202,Groundhog Day (1993),
+202,Groundhog Day (1993),5
 203,Unforgiven (1992),
-204,Back to the Future (1985),
+204,Back to the Future (1985),2
 205,Patton (1970),
 206,Akira (1988),
 207,Cyrano de Bergerac (1990),
@@ -211,7 +212,7 @@
 211,M*A*S*H (1970),
 212,Unbearable Lightness of Being,
 213,Room with a View,
-214,Pink Floyd - The Wall (1982),
+214,Pink Floyd - The Wall (1982),4
 215,Field of Dreams (1989),
 216,When Harry Met Sally... (1989),
 217,Bram Stoker's Dracula (1992),
@@ -232,9 +233,9 @@
 232,Young Guns (1988),
 233,Under Siege (1992),
 234,Jaws (1975),
-235,Mars Attacks! (1996),
+235,Mars Attacks! (1996),2
 236,Citizen Ruth (1996),
-237,Jerry Maguire (1996),
+237,Jerry Maguire (1996),3
 238,Raising Arizona (1987),
 239,Sneakers (1992),
 240,Beavis and Butt-head Do America (1996),
@@ -247,16 +248,16 @@
 247,Turbo: A Power Rangers Movie (1997),
 248,Grosse Pointe Blank (1997),
 249,Austin Powers: International Man of Mystery (1997),
-250,Fifth Element,
+250,Fifth Element,2
 251,Shall We Dance? (1996),
 252,Lost World: Jurassic Park,
 253,Pillow Book,
 254,Batman & Robin (1997),
 255,My Best Friend's Wedding (1997),
 256,When the Cats Away (Chacun cherche son chat) (1996),
-257,Men in Black (1997),
-258,Contact (1997),
-259,George of the Jungle (1997),
+257,Men in Black (1997),2
+258,Contact (1997),3
+259,George of the Jungle (1997),3
 260,Event Horizon (1997),
 261,Air Bud (1997),
 262,In the Company of Men (1997),
@@ -267,9 +268,9 @@
 267,unknown,
 268,Chasing Amy (1997),
 269,Full Monty,
-270,Gattaca (1997),
+270,Gattaca (1997),3
 271,Starship Troopers (1997),
-272,Good Will Hunting (1997),
+272,Good Will Hunting (1997),5
 273,Heat (1995),
 274,Sabrina (1995),
 275,Sense and Sensibility (1995),
@@ -291,7 +292,7 @@
 291,Absolute Power (1997),
 292,Rosewood (1997),
 293,Donnie Brasco (1997),
-294,Liar Liar (1997),
+294,Liar Liar (1997),2
 295,Breakdown (1997),
 296,Promesse,
 297,Ulee's Gold (1997),
@@ -310,7 +311,7 @@
 310,Rainmaker,
 311,Wings of the Dove,
 312,Midnight in the Garden of Good and Evil (1997),
-313,Titanic (1997),
+313,Titanic (1997),3
 314,3 Ninjas: High Noon At Mega Mountain (1998),
 315,Apt Pupil (1998),
 316,As Good As It Gets (1997),
@@ -359,9 +360,9 @@
 359,Assignment,
 360,Wonderland (1997),
 361,Incognito (1997),
-362,Blues Brothers 2000 (1998),
+362,Blues Brothers 2000 (1998),1
 363,Sudden Death (1995),
-364,Ace Ventura: When Nature Calls (1995),
+364,Ace Ventura: When Nature Calls (1995),1
 365,Powder (1995),
 366,Dangerous Minds (1995),
 367,Clueless (1995),
@@ -371,7 +372,7 @@
 371,Bridges of Madison County,
 372,Jeffrey (1995),
 373,Judge Dredd (1995),
-374,Mighty Morphin Power Rangers: The Movie (1995),
+374,Mighty Morphin Power Rangers: The Movie (1995),1
 375,Showgirls (1995),
 376,Houseguest (1994),
 377,Heavyweights (1994),
@@ -381,8 +382,8 @@
 381,Muriel's Wedding (1994),
 382,Adventures of Priscilla,
 383,Flintstones,
-384,Naked Gun 33 1/3: The Final Insult (1994),
-385,True Lies (1994),
+384,Naked Gun 33 1/3: The Final Insult (1994),3
+385,True Lies (1994),2
 386,Addams Family Values (1993),
 387,Age of Innocence,
 388,Beverly Hills Cop III (1994),
@@ -395,14 +396,14 @@
 395,Robin Hood: Men in Tights (1993),
 396,Serial Mom (1994),
 397,Striking Distance (1993),
-398,Super Mario Bros. (1993),
+398,Super Mario Bros. (1993),1
 399,Three Musketeers,
 400,Little Rascals,
 401,Brady Bunch Movie,
 402,Ghost (1990),
 403,Batman (1989),
 404,Pinocchio (1940),
-405,Mission: Impossible (1996),
+405,Mission: Impossible (1996),3
 406,Thinner (1996),
 407,Spy Hard (1996),
 408,Close Shave,
@@ -428,7 +429,7 @@
 428,Harold and Maude (1971),
 429,Day the Earth Stood Still,
 430,Duck Soup (1933),
-431,Highlander (1986),
+431,Highlander (1986),2
 432,Fantasia (1940),
 433,Heathers (1989),
 434,Forbidden Planet (1956),
@@ -498,7 +499,7 @@
 498,African Queen,
 499,Cat on a Hot Tin Roof (1958),
 500,Fly Away Home (1996),
-501,Dumbo (1941),
+501,Dumbo (1941),3
 502,Bananas (1971),
 503,Candidate,
 504,Bonnie and Clyde (1967),
@@ -538,7 +539,7 @@
 538,Anastasia (1997),
 539,Mouse Hunt (1997),
 540,Money Train (1995),
-541,Mortal Kombat (1995),
+541,Mortal Kombat (1995),1
 542,Pocahontas (1995),
 543,Misérables,
 544,Things to Do in Denver when You're Dead (1995),
@@ -669,14 +670,14 @@
 669,Body Parts (1991),
 670,Body Snatchers (1993),
 671,Bride of Frankenstein (1935),
-672,Candyman (1992),
+672,Candyman (1992),1
 673,Cape Fear (1962),
 674,Cat People (1982),
 675,Nosferatu (Nosferatu,
 676,Crucible,
 677,Fire on the Mountain (1996),
 678,Volcano (1997),
-679,Conan the Barbarian (1981),
+679,Conan the Barbarian (1981),5
 680,Kull the Conqueror (1997),
 681,Wishmaster (1997),
 682,I Know What You Did Last Summer (1997),
@@ -688,7 +689,7 @@
 688,Leave It to Beaver (1997),
 689,Jackal,
 690,Seven Years in Tibet (1997),
-691,Dark City (1998),
+691,Dark City (1998),3
 692,American President,
 693,Casino (1995),
 694,Persuasion (1995),
@@ -735,7 +736,7 @@
 735,Philadelphia (1993),
 736,Shadowlands (1993),
 737,Sirens (1994),
-738,Threesome (1994),
+738,Threesome (1994),1
 739,Pretty Woman (1990),
 740,Jane Eyre (1996),
 741,Last Supper,
@@ -747,7 +748,7 @@
 747,Benny & Joon (1993),
 748,Saint,
 749,MatchMaker,
-750,Amistad (1997),
+750,Amistad (1997),4
 751,Tomorrow Never Dies (1997),
 752,Replacement Killers,
 753,Burnt By the Sun (1994),
@@ -765,10 +766,10 @@
 765,Boomerang (1992),
 766,Man of the Year (1995),
 767,Addiction,
-768,Casper (1995),
+768,Casper (1995),1
 769,Congo (1995),
 770,Devil in a Blue Dress (1995),
-771,Johnny Mnemonic (1995),
+771,Johnny Mnemonic (1995),2
 772,Kids (1995),
 773,Mute Witness (1994),
 774,Prophecy,
@@ -899,7 +900,7 @@
 899,Winter Guest,
 900,Kundun (1997),
 901,Mr. Magoo (1997),
-902,Big Lebowski,
+902,Big Lebowski,3
 903,Afterglow (1997),
 904,Ma vie en rose (My Life in Pink) (1997),
 905,Great Expectations (1998),
@@ -1062,7 +1063,7 @@
 1062,Four Days in September (1997),
 1063,Little Princess,
 1064,Crossfire (1947),
-1065,Koyaanisqatsi (1983),
+1065,Koyaanisqatsi (1983),4
 1066,Balto (1995),
 1067,Bottle Rocket (1996),
 1068,Star Maker,
@@ -1124,7 +1125,7 @@
 1124,Farewell to Arms,
 1125,Innocents,
 1126,Old Man and the Sea,
-1127,Truman Show,
+1127,Truman Show,1
 1128,Heidi Fleiss: Hollywood Madam (1995),
 1129,Chungking Express (1994),
 1130,Jupiter's Wife (1994),
@@ -1136,7 +1137,7 @@
 1136,Ghosts of Mississippi (1996),
 1137,Beautiful Thing (1996),
 1138,Best Men (1997),
-1139,Hackers (1995),
+1139,Hackers (1995),2
 1140,Road to Wellville,
 1141,War Room,
 1142,When We Were Kings (1996),
@@ -1232,12 +1233,12 @@
 1232,Madonna: Truth or Dare (1991),
 1233,Nénette et Boni (1996),
 1234,Chairman of the Board (1998),
-1235,Big Bang Theory,
+1235,Big Bang Theory,1
 1236,Other Voices,
 1237,Twisted (1996),
 1238,Full Speed (1996),
 1239,Cutthroat Island (1995),
-1240,Ghost in the Shell (Kokaku kidotai) (1995),
+1240,Ghost in the Shell (Kokaku kidotai) (1995),5
 1241,Van,
 1242,Old Lady Who Walked in the Sea,
 1243,Night Flier (1997),
diff --git a/logs/README.md b/logs/README.md
new file mode 100644
index 0000000..e45b8c3
--- /dev/null
+++ b/logs/README.md
@@ -0,0 +1 @@
+This directory should hold execution results.
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 4081e3e..29ad220 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -1 +1,2 @@
 addSbtPlugin("com.typesafe.sbt" %  "sbt-native-packager" % "1.7.4")
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0")
diff --git a/run.sh b/run.sh
new file mode 100755
index 0000000..75e8876
--- /dev/null
+++ b/run.sh
@@ -0,0 +1,21 @@
+#!/usr/bin/env bash
+# If your default java install does not work, explicitly 
+# provide the path to the JDK 1.8 installation. On OSX
+# with homebrew:
+# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282; ./run.sh
+export JAVA_OPTS="-Xmx8G";
+RUN=./logs/run-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
+mkdir -p $RUN
+LOGS=$RUN/log.txt
+source ./config.sh 
+echo "------------------- BASELINE    ---------------------" >> $LOGS
+sbt "runMain predict.Baseline --train $ML100Ku2base --test $ML100Ku2test --json $RUN/baseline-100k.json" 2>&1 >>$LOGS
+echo "------------------- DISTRIBUTED ---------------------" >> $LOGS
+sbt "runMain predict.Baseline --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/baseline-25m.json" 2>&1 >>$LOGS
+sbt "runMain distributed.DistributedBaseline --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/distributed-25m-4.json --master $SPARKMASTER" 2>&1 >>$LOGS
+echo "------------------- PERSONALIZED --------------------" >> $LOGS
+sbt "runMain predict.Personalized --train $ML100Ku2base --test $ML100Ku2test --json $RUN/personalized-100k.json" 2>&1 >>$LOGS
+echo "------------------- KNN -----------------------------" >> $LOGS
+sbt "runMain predict.kNN --train $ML100Ku2base --test $ML100Ku2test --json $RUN/knn-100k.json" 2>&1 >>$LOGS
+echo "------------------- RECOMMEND -----------------------" >> $LOGS
+sbt "runMain recommend.Recommender --data $ML100Kudata --personal data/personal.csv --json $RUN/recommender-100k.json" 2>&1 >>$LOGS
diff --git a/src/main/scala/distributed/DistributedBaseline.scala b/src/main/scala/distributed/DistributedBaseline.scala
new file mode 100644
index 0000000..cf9e832
--- /dev/null
+++ b/src/main/scala/distributed/DistributedBaseline.scala
@@ -0,0 +1,94 @@
+package distributed
+
+import org.rogach.scallop._
+import org.apache.spark.rdd.RDD
+import ujson._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import scala.math
+import shared.predictions._
+
+class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
+  val train = opt[String](required = true)
+  val test = opt[String](required = true)
+  val separator = opt[String](default=Some("\t"))
+  val master = opt[String](default=Some(""))
+  val num_measurements = opt[Int](default=Some(0))
+  val json = opt[String]()
+  verify()
+}
+
+object DistributedBaseline extends App {
+  var conf = new Conf(args) 
+
+  // Remove these lines if encountering/debugging Spark
+  Logger.getLogger("org").setLevel(Level.OFF)
+  Logger.getLogger("akka").setLevel(Level.OFF)
+  val spark = if (conf.master() != "") {
+    SparkSession.builder().master(conf.master()).getOrCreate()
+  } else {
+    SparkSession.builder().getOrCreate()
+  }
+  spark.sparkContext.setLogLevel("ERROR") 
+
+  println("")
+  println("******************************************************")
+
+  println("Loading training data from: " + conf.train()) 
+  val train = load(spark, conf.train(), conf.separator())
+  println("Loading test data from: " + conf.test()) 
+  val test = load(spark, conf.test(), conf.separator())
+
+  val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
+    Thread.sleep(1000) // Do everything here from train and test
+    42        // Output answer as last value
+  }))
+  val timings = measurements.map(t => t._2) // Retrieve the timing measurements
+
+  // Save answers as JSON
+  def printToFile(content: String, 
+                  location: String = "./answers.json") =
+    Some(new java.io.PrintWriter(location)).foreach{
+      f => try{
+        f.write(content)
+      } finally{ f.close }
+  }
+  conf.json.toOption match {
+    case None => ; 
+    case Some(jsonFile) => {
+      val answers = ujson.Obj(
+        "Meta" -> ujson.Obj(
+          "1.Train" -> conf.train(),
+          "2.Test" -> conf.test(),
+          "3.Master" -> conf.master(),
+          "4.Measurements" -> conf.num_measurements()
+        ),
+        "D.1" -> ujson.Obj(
+          "1.GlobalAvg" -> ujson.Num(0.0), // Datatype of answer: Double
+          "2.User1Avg" -> ujson.Num(0.0),  // Datatype of answer: Double
+          "3.Item1Avg" -> ujson.Num(0.0),   // Datatype of answer: Double
+          "4.Item1AvgDev" -> ujson.Num(0.0), // Datatype of answer: Double,
+          "5.PredUser1Item1" -> ujson.Num(0.0), // Datatype of answer: Double
+          "6.Mae" -> ujson.Num(0.0) // Datatype of answer: Double
+        ),
+        "D.2" -> ujson.Obj(
+          "1.DistributedBaseline" -> ujson.Obj(
+            "average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
+            "stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
+          )            
+        )
+      )
+      val json = write(answers, 4)
+
+      println(json)
+      println("Saving answers in: " + jsonFile)
+      printToFile(json, jsonFile)
+    }
+  }
+
+  println("")
+  spark.close()
+}
diff --git a/src/main/scala/predict/Baseline.scala b/src/main/scala/predict/Baseline.scala
new file mode 100644
index 0000000..b3775d7
--- /dev/null
+++ b/src/main/scala/predict/Baseline.scala
@@ -0,0 +1,108 @@
+package predict
+
+import org.rogach.scallop._
+import org.apache.spark.rdd.RDD
+
+import org.apache.spark.sql.SparkSession
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import scala.math
+import shared.predictions._
+
+
+class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
+  val train = opt[String](required = true)
+  val test = opt[String](required = true)
+  val separator = opt[String](default=Some("\t"))
+  val num_measurements = opt[Int](default=Some(0))
+  val json = opt[String]()
+  verify()
+}
+
+object Baseline extends App {
+  // Remove these lines if encountering/debugging Spark
+  Logger.getLogger("org").setLevel(Level.OFF)
+  Logger.getLogger("akka").setLevel(Level.OFF)
+  val spark = SparkSession.builder()
+    .master("local[1]")
+    .getOrCreate()
+  spark.sparkContext.setLogLevel("ERROR") 
+
+  println("")
+  println("******************************************************")
+
+  var conf = new Conf(args) 
+  // For these questions, data is collected in a scala Array 
+  // to not depend on Spark
+  println("Loading training data from: " + conf.train()) 
+  val train = load(spark, conf.train(), conf.separator()).collect()
+  println("Loading test data from: " + conf.test()) 
+  val test = load(spark, conf.test(), conf.separator()).collect()
+
+  val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
+    Thread.sleep(1000) // Do everything here from train and test
+    42        // Output answer as last value
+  }))
+  val timings = measurements.map(t => t._2) // Retrieve the timing measurements
+
+  // Save answers as JSON
+  def printToFile(content: String, 
+                  location: String = "./answers.json") =
+    Some(new java.io.PrintWriter(location)).foreach{
+      f => try{
+        f.write(content)
+      } finally{ f.close }
+  }
+  conf.json.toOption match {
+    case None => ; 
+    case Some(jsonFile) => {
+      var answers = ujson.Obj(
+        "Meta" -> ujson.Obj(
+          "1.Train" -> ujson.Str(conf.train()),
+          "2.Test" -> ujson.Str(conf.test()),
+          "3.Measurements" -> ujson.Num(conf.num_measurements())
+        ),
+        "B.1" -> ujson.Obj(
+          "1.GlobalAvg" -> ujson.Num(0.0), // Datatype of answer: Double
+          "2.User1Avg" -> ujson.Num(0.0),  // Datatype of answer: Double
+          "3.Item1Avg" -> ujson.Num(0.0),   // Datatype of answer: Double
+          "4.Item1AvgDev" -> ujson.Num(0.0), // Datatype of answer: Double
+          "5.PredUser1Item1" -> ujson.Num(0.0) // Datatype of answer: Double
+        ),
+        "B.2" -> ujson.Obj(
+          "1.GlobalAvgMAE" -> ujson.Num(0.0), // Datatype of answer: Double
+          "2.UserAvgMAE" -> ujson.Num(0.0),  // Datatype of answer: Double
+          "3.ItemAvgMAE" -> ujson.Num(0.0),   // Datatype of answer: Double
+          "4.BaselineMAE" -> ujson.Num(0.0)   // Datatype of answer: Double
+        ),
+        "B.3" -> ujson.Obj(
+          "1.GlobalAvg" -> ujson.Obj(
+            "average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
+            "stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
+          ),
+          "2.UserAvg" -> ujson.Obj(
+            "average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
+            "stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
+          ),
+          "3.ItemAvg" -> ujson.Obj(
+            "average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
+            "stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
+          ),
+          "4.Baseline" -> ujson.Obj(
+            "average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
+            "stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
+          )
+        )
+      )
+
+      val json = ujson.write(answers, 4)
+      println(json)
+      println("Saving answers in: " + jsonFile)
+      printToFile(json.toString, jsonFile)
+    }
+  }
+
+  println("")
+  spark.close()
+}
diff --git a/src/main/scala/predict/Personalized.scala b/src/main/scala/predict/Personalized.scala
new file mode 100644
index 0000000..3f1d709
--- /dev/null
+++ b/src/main/scala/predict/Personalized.scala
@@ -0,0 +1,85 @@
+package predict
+
+import org.rogach.scallop._
+import org.apache.spark.rdd.RDD
+import ujson._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import scala.math
+import shared.predictions._
+
+
+class PersonalizedConf(arguments: Seq[String]) extends ScallopConf(arguments) {
+  val train = opt[String](required = true)
+  val test = opt[String](required = true)
+  val separator = opt[String](default=Some("\t"))
+  val num_measurements = opt[Int](default=Some(0))
+  val json = opt[String]()
+  verify()
+}
+
+object Personalized extends App {
+  // Remove these lines if encountering/debugging Spark
+  Logger.getLogger("org").setLevel(Level.OFF)
+  Logger.getLogger("akka").setLevel(Level.OFF)
+  val spark = SparkSession.builder()
+    .master("local[1]")
+    .getOrCreate()
+  spark.sparkContext.setLogLevel("ERROR") 
+
+  println("")
+  println("******************************************************")
+
+  var conf = new PersonalizedConf(args) 
+  println("Loading training data from: " + conf.train()) 
+  val train = load(spark, conf.train(), conf.separator()).collect()
+  println("Loading test data from: " + conf.test()) 
+  val test = load(spark, conf.test(), conf.separator()).collect()
+  
+  // Compute here
+
+  // Save answers as JSON
+  def printToFile(content: String, 
+                  location: String = "./answers.json") =
+    Some(new java.io.PrintWriter(location)).foreach{
+      f => try{
+        f.write(content)
+      } finally{ f.close }
+  }
+  conf.json.toOption match {
+    case None => ; 
+    case Some(jsonFile) => {
+      val answers = ujson.Obj(
+        "Meta" -> ujson.Obj(
+          "1.Train" -> ujson.Str(conf.train()),
+          "2.Test" -> ujson.Str(conf.test()),
+          "3.Measurements" -> ujson.Num(conf.num_measurements())
+        ),
+        "P.1" -> ujson.Obj(
+          "1.PredUser1Item1" -> ujson.Num(0.0), // Prediction of item 1 for user 1 (similarity 1 between users)
+          "2.OnesMAE" -> ujson.Num(0.0)         // MAE when using similarities of 1 between all users
+        ),
+        "P.2" -> ujson.Obj(
+          "1.AdjustedCosineUser1User2" -> ujson.Num(0.0), // Similarity between user 1 and user 2 (adjusted Cosine)
+          "2.PredUser1Item1" -> ujson.Num(0.0),  // Prediction item 1 for user 1 (adjusted cosine)
+          "3.AdjustedCosineMAE" -> ujson.Num(0.0) // MAE when using adjusted cosine similarity
+        ),
+        "P.3" -> ujson.Obj(
+          "1.JaccardUser1User2" -> ujson.Num(0.0), // Similarity between user 1 and user 2 (jaccard similarity)
+          "2.PredUser1Item1" -> ujson.Num(0.0),  // Prediction item 1 for user 1 (jaccard)
+          "3.JaccardPersonalizedMAE" -> ujson.Num(0.0) // MAE when using jaccard similarity
+        )
+      )
+      val json = write(answers, 4)
+      println(json)
+      println("Saving answers in: " + jsonFile)
+      printToFile(json, jsonFile)
+    }
+  }
+
+  println("")
+  spark.close()
+}
diff --git a/src/main/scala/predict/Predictor.scala b/src/main/scala/predict/Predictor.scala
deleted file mode 100644
index 91ba1fb..0000000
--- a/src/main/scala/predict/Predictor.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-package predict
-
-import org.rogach.scallop._
-import org.json4s.jackson.Serialization
-import org.apache.spark.rdd.RDD
-
-import org.apache.spark.sql.SparkSession
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-
-class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
-  val train = opt[String](required = true)
-  val test = opt[String](required = true)
-  val json = opt[String]()
-  verify()
-}
-
-case class Rating(user: Int, item: Int, rating: Double)
-
-object Predictor extends App {
-  // Remove these lines if encountering/debugging Spark
-  Logger.getLogger("org").setLevel(Level.OFF)
-  Logger.getLogger("akka").setLevel(Level.OFF)
-  val spark = SparkSession.builder()
-    .master("local[1]")
-    .getOrCreate()
-  spark.sparkContext.setLogLevel("ERROR") 
-
-  println("")
-  println("******************************************************")
-
-  var conf = new Conf(args) 
-  println("Loading training data from: " + conf.train()) 
-  val trainFile = spark.sparkContext.textFile(conf.train())
-  val train = trainFile.map(l => {
-      val cols = l.split("\t").map(_.trim)
-      Rating(cols(0).toInt, cols(1).toInt, cols(2).toDouble)
-  }) 
-  assert(train.count == 80000, "Invalid training data")
-
-  println("Loading test data from: " + conf.test()) 
-  val testFile = spark.sparkContext.textFile(conf.test())
-  val test = testFile.map(l => {
-      val cols = l.split("\t").map(_.trim)
-      Rating(cols(0).toInt, cols(1).toInt, cols(2).toDouble)
-  }) 
-  assert(test.count == 20000, "Invalid test data")
-
-  val globalPred = 3.0
-  val globalMae = test.map(r => scala.math.abs(r.rating - globalPred)).reduce(_+_) / test.count.toDouble
-
-  // Save answers as JSON
-  def printToFile(content: String, 
-                  location: String = "./answers.json") =
-    Some(new java.io.PrintWriter(location)).foreach{
-      f => try{
-        f.write(content)
-      } finally{ f.close }
-  }
-  conf.json.toOption match {
-    case None => ; 
-    case Some(jsonFile) => {
-      var json = "";
-      {
-        // Limiting the scope of implicit formats with {}
-        implicit val formats = org.json4s.DefaultFormats
-        val answers: Map[String, Any] = Map(
-            "Q3.1.4" -> Map(
-              "MaeGlobalMethod" -> 0.0, // Datatype of answer: Double
-              "MaePerUserMethod" -> 0.0, // Datatype of answer: Double
-              "MaePerItemMethod" -> 0.0, // Datatype of answer: Double
-              "MaeBaselineMethod" -> 0.0 // Datatype of answer: Double
-            ),
-
-            "Q3.1.5" -> Map(
-              "DurationInMicrosecForGlobalMethod" -> Map(
-                "min" -> 0.0,  // Datatype of answer: Double
-                "max" -> 0.0,  // Datatype of answer: Double
-                "average" -> 0.0, // Datatype of answer: Double
-                "stddev" -> 0.0 // Datatype of answer: Double
-              ),
-              "DurationInMicrosecForPerUserMethod" -> Map(
-                "min" -> 0.0,  // Datatype of answer: Double
-                "max" -> 0.0,  // Datatype of answer: Double
-                "average" -> 0.0, // Datatype of answer: Double
-                "stddev" -> 0.0 // Datatype of answer: Double
-              ),
-              "DurationInMicrosecForPerItemMethod" -> Map(
-                "min" -> 0.0,  // Datatype of answer: Double
-                "max" -> 0.0,  // Datatype of answer: Double
-                "average" -> 0.0, // Datatype of answer: Double
-                "stddev" -> 0.0 // Datatype of answer: Double
-              ),
-              "DurationInMicrosecForBaselineMethod" -> Map(
-                "min" -> 0.0,  // Datatype of answer: Double
-                "max" -> 0.0, // Datatype of answer: Double
-                "average" -> 0.0, // Datatype of answer: Double
-                "stddev" -> 0.0 // Datatype of answer: Double
-              ),
-              "RatioBetweenBaselineMethodAndGlobalMethod" -> 0.0 // Datatype of answer: Double
-            ),
-         )
-        json = Serialization.writePretty(answers)
-      }
-
-      println(json)
-      println("Saving answers in: " + jsonFile)
-      printToFile(json, jsonFile)
-    }
-  }
-
-  println("")
-  spark.close()
-}
diff --git a/src/main/scala/predict/kNN.scala b/src/main/scala/predict/kNN.scala
new file mode 100644
index 0000000..ea602cd
--- /dev/null
+++ b/src/main/scala/predict/kNN.scala
@@ -0,0 +1,97 @@
+package predict
+
+import org.rogach.scallop._
+import org.apache.spark.rdd.RDD
+import ujson._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import scala.math
+import shared.predictions._
+
+
+class kNNConf(arguments: Seq[String]) extends ScallopConf(arguments) {
+  val train = opt[String](required = true)
+  val test = opt[String](required = true)
+  val separator = opt[String](default=Some("\t"))
+  val num_measurements = opt[Int](default=Some(0))
+  val json = opt[String]()
+  verify()
+}
+
+object kNN extends App {
+  // Remove these lines if encountering/debugging Spark
+  Logger.getLogger("org").setLevel(Level.OFF)
+  Logger.getLogger("akka").setLevel(Level.OFF)
+  val spark = SparkSession.builder()
+    .master("local[1]")
+    .getOrCreate()
+  spark.sparkContext.setLogLevel("ERROR") 
+
+  println("")
+  println("******************************************************")
+
+  var conf = new PersonalizedConf(args) 
+  println("Loading training data from: " + conf.train()) 
+  val train = load(spark, conf.train(), conf.separator()).collect()
+  println("Loading test data from: " + conf.test()) 
+  val test = load(spark, conf.test(), conf.separator()).collect()
+
+
+  val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
+    Thread.sleep(1000) // Do everything here from train and test
+    42        // Output answer as last value
+  }))
+  val timings = measurements.map(t => t._2) // Retrieve the timing measurements
+
+  // Save answers as JSON
+  def printToFile(content: String, 
+                  location: String = "./answers.json") =
+    Some(new java.io.PrintWriter(location)).foreach{
+      f => try{
+        f.write(content)
+      } finally{ f.close }
+  }
+  conf.json.toOption match {
+    case None => ; 
+    case Some(jsonFile) => {
+      val answers = ujson.Obj(
+        "Meta" -> ujson.Obj(
+          "1.Train" -> conf.train(),
+          "2.Test" -> conf.test(),
+          "3.Measurements" -> conf.num_measurements()
+        ),
+        "N.1" -> ujson.Obj(
+          "1.k10u1v1" -> ujson.Num(0.0), // Similarity between user 1 and user 1 (k=10)
+          "2.k10u1v864" -> ujson.Num(0.0), // Similarity between user 1 and user 864 (k=10)
+          "3.k10u1v886" -> ujson.Num(0.0), // Similarity between user 1 and user 886 (k=10)
+          "4.PredUser1Item1" -> ujson.Num(0.0) // Prediction of item 1 for user 1 (k=10)
+        ),
+        "N.2" -> ujson.Obj(
+          "1.kNN-Mae" -> List(10,30,50,100,200,300,400,800,943).map(k => 
+              List(
+                k,
+                0.0 // Compute MAE
+              )
+          ).toList
+        ),
+        "N.3" -> ujson.Obj(
+          "1.kNN" -> ujson.Obj(
+            "average (ms)" -> ujson.Num(mean(timings)),
+            "stddev (ms)" -> ujson.Num(std(timings))
+          )
+        )
+      )
+      val json = write(answers, 4)
+
+      println(json)
+      println("Saving answers in: " + jsonFile)
+      printToFile(json, jsonFile)
+    }
+  }
+
+  println("")
+  spark.close()
+}
diff --git a/src/main/scala/recommend/Recommender.scala b/src/main/scala/recommend/Recommender.scala
index 6e14620..83c7a1f 100644
--- a/src/main/scala/recommend/Recommender.scala
+++ b/src/main/scala/recommend/Recommender.scala
@@ -1,22 +1,23 @@
 package recommend
 
 import org.rogach.scallop._
-import org.json4s.jackson.Serialization
 import org.apache.spark.rdd.RDD
+import ujson._
 
 import org.apache.spark.sql.SparkSession
 import org.apache.log4j.Logger
 import org.apache.log4j.Level
 
+import shared.predictions._
+
 class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
   val data = opt[String](required = true)
   val personal = opt[String](required = true)
+  val separator = opt[String](default = Some("\t"))
   val json = opt[String]()
   verify()
 }
 
-case class Rating(user: Int, item: Int, rating: Double)
-
 object Recommender extends App {
   // Remove these lines if encountering/debugging Spark
   Logger.getLogger("org").setLevel(Level.OFF)
@@ -31,19 +32,27 @@ object Recommender extends App {
 
   var conf = new Conf(args) 
   println("Loading data from: " + conf.data()) 
-  val dataFile = spark.sparkContext.textFile(conf.data())
-  val data = dataFile.map(l => {
-      val cols = l.split("\t").map(_.trim)
-      Rating(cols(0).toInt, cols(1).toInt, cols(2).toDouble)
-  }) 
-  assert(data.count == 100000, "Invalid data")
+  val data = load(spark, conf.data(), conf.separator()).collect()
+  assert(data.length == 100000, "Invalid data")
 
   println("Loading personal data from: " + conf.personal()) 
   val personalFile = spark.sparkContext.textFile(conf.personal())
-  // TODO: Extract ratings and movie titles
-  assert(personalFile.count == 1682, "Invalid personal data")
+  val personal = personalFile.map(l => {
+      val cols = l.split(",").map(_.trim)
+      if (cols(0) == "id") 
+        Rating(944,0,0.0)
+      else 
+        if (cols.length < 3) 
+          Rating(944, cols(0).toInt, 0.0)
+        else
+          Rating(944, cols(0).toInt, cols(2).toDouble)
+  }).filter(r => r.rating != 0).collect()
+  val movieNames = personalFile.map(l => {
+      val cols = l.split(",").map(_.trim)
+      if (cols(0) == "id") (0, "header")
+      else (cols(0).toInt, cols(1).toString)
+  }).collect().toMap
 
- 
 
   // Save answers as JSON
   def printToFile(content: String, 
@@ -56,26 +65,21 @@ object Recommender extends App {
   conf.json.toOption match {
     case None => ; 
     case Some(jsonFile) => {
-      var json = "";
-      {
-        // Limiting the scope of implicit formats with {}
-        implicit val formats = org.json4s.DefaultFormats
-        val answers: Map[String, Any] = Map(
-
-            // IMPORTANT: To break ties and ensure reproducibility of results,
-            // please report the top-5 recommendations that have the smallest
-            // movie identifier.
+      val answers = ujson.Obj(
+        "Meta" -> ujson.Obj(
+          "data" -> conf.data(),
+          "personal" -> conf.personal()
+        ),
+        "R.1" -> ujson.Obj(
+          "PredUser1Item1" -> ujson.Num(0.0) // Prediction for user 1 of item 1
+        ),
+          // IMPORTANT: To break ties and ensure reproducibility of results,
+          // please report the top-3 recommendations that have the smallest
+          // movie identifier.
 
-            "Q4.1.1" -> List[Any](
-              List(254, "Batman & Robin (1997)", 5.0), // Datatypes for answer: Int, String, Double
-              List(338, "Bean (1997)", 5.0),
-              List(615, "39 Steps", 5.0),
-              List(741, "Last Supper", 5.0),
-              List(587, "Hour of the Pig", 5.0)
-            )
-         )
-        json = Serialization.writePretty(answers)
-      }
+        "R.2" -> List((254, 0.0), (338, 0.0), (615, 0.0)).map(x => ujson.Arr(x._1, movieNames(x._1), x._2))
+       )
+      val json = write(answers, 4)
 
       println(json)
       println("Saving answers in: " + jsonFile)
diff --git a/src/main/scala/shared/predictions.scala b/src/main/scala/shared/predictions.scala
new file mode 100644
index 0000000..89bff73
--- /dev/null
+++ b/src/main/scala/shared/predictions.scala
@@ -0,0 +1,46 @@
+package shared
+
+package object predictions
+{
+  case class Rating(user: Int, item: Int, rating: Double)
+
+  def timingInMs(f : ()=>Double ) : (Double, Double) = {
+    val start = System.nanoTime() 
+    val output = f()
+    val end = System.nanoTime()
+    return (output, (end-start)/1000000.0)
+  }
+
+  def mean(s :Seq[Double]): Double =  if (s.size > 0) s.reduce(_+_) / s.length else 0.0
+  def std(s :Seq[Double]): Double = {
+    if (s.size == 0) 0.0
+    else {
+      val m = mean(s)
+      scala.math.sqrt(s.map(x => scala.math.pow(m-x, 2)).sum / s.length.toDouble)
+    }
+  }
+
+  def toInt(s: String): Option[Int] = {
+    try {
+      Some(s.toInt)
+    } catch {
+      case e: Exception => None
+    }
+  }
+
+  def load(spark : org.apache.spark.sql.SparkSession,  path : String, sep : String) : org.apache.spark.rdd.RDD[Rating] = {
+       val file = spark.sparkContext.textFile(path)
+       return file
+         .map(l => {
+           val cols = l.split(sep).map(_.trim)
+           toInt(cols(0)) match {
+             case Some(_) => Some(Rating(cols(0).toInt, cols(1).toInt, cols(2).toDouble))
+             case None => None
+           }
+       })
+         .filter({ case Some(_) => true 
+                   case None => false })
+         .map({ case Some(x) => x 
+                case None => Rating(-1, -1, -1)})
+  }
+}
diff --git a/src/main/scala/stats/Analyzer.scala b/src/main/scala/stats/Analyzer.scala
deleted file mode 100644
index 97a9ee9..0000000
--- a/src/main/scala/stats/Analyzer.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-package stats
-
-import org.rogach.scallop._
-import org.json4s.jackson.Serialization
-import org.apache.spark.rdd.RDD
-
-import org.apache.spark.sql.SparkSession
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-
-class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
-  val data = opt[String](required = true)
-  val json = opt[String]()
-  verify()
-}
-
-case class Rating(user: Int, item: Int, rating: Double)
-
-object Analyzer extends App {
-  // Remove these lines if encountering/debugging Spark
-  Logger.getLogger("org").setLevel(Level.OFF)
-  Logger.getLogger("akka").setLevel(Level.OFF)
-  val spark = SparkSession.builder()
-    .master("local[1]")
-    .getOrCreate()
-  spark.sparkContext.setLogLevel("ERROR") 
-
-  println("")
-  println("******************************************************")
-
-  var conf = new Conf(args) 
-  println("Loading data from: " + conf.data()) 
-  val dataFile = spark.sparkContext.textFile(conf.data())
-  val data = dataFile.map(l => {
-      val cols = l.split("\t").map(_.trim)
-      Rating(cols(0).toInt, cols(1).toInt, cols(2).toDouble)
-  }) 
-  assert(data.count == 100000, "Invalid data")
-
-  // Save answers as JSON
-  def printToFile(content: String, 
-                  location: String = "./answers.json") =
-    Some(new java.io.PrintWriter(location)).foreach{
-      f => try{
-        f.write(content)
-      } finally{ f.close }
-  }
-  conf.json.toOption match {
-    case None => ; 
-    case Some(jsonFile) => {
-      var json = "";
-      {
-        // Limiting the scope of implicit formats with {}
-        implicit val formats = org.json4s.DefaultFormats
-        val answers: Map[String, Any] = Map(
-          "Q3.1.1" -> Map(
-            "GlobalAverageRating" -> 0.0 // Datatype of answer: Double
-          ),
-          "Q3.1.2" -> Map(
-            "UsersAverageRating" -> Map(
-                // Using as your input data the average rating for each user,
-                // report the min, max and average of the input data.
-                "min" -> 0.0,  // Datatype of answer: Double
-                "max" -> 0.0, // Datatype of answer: Double
-                "average" -> 0.0 // Datatype of answer: Double
-            ),
-            "AllUsersCloseToGlobalAverageRating" -> true, // Datatype of answer: Boolean
-            "RatioUsersCloseToGlobalAverageRating" -> 0.0 // Datatype of answer: Double
-          ),
-          "Q3.1.3" -> Map(
-            "ItemsAverageRating" -> Map(
-                // Using as your input data the average rating for each item,
-                // report the min, max and average of the input data.
-                "min" -> 0.0,  // Datatype of answer: Double
-                "max" -> 0.0, // Datatype of answer: Double
-                "average" -> 0.0 // Datatype of answer: Double
-            ),
-            "AllItemsCloseToGlobalAverageRating" -> true, // Datatype of answer: Boolean
-            "RatioItemsCloseToGlobalAverageRating" -> 0.0 // Datatype of answer: Double
-          ),
-         )
-        json = Serialization.writePretty(answers)
-      }
-
-      println(json)
-      println("Saving answers in: " + jsonFile)
-      printToFile(json, jsonFile)
-    }
-  }
-
-  println("")
-  spark.close()
-}
diff --git a/src/test/scala/AllTests.scala b/src/test/scala/AllTests.scala
new file mode 100644
index 0000000..56825e9
--- /dev/null
+++ b/src/test/scala/AllTests.scala
@@ -0,0 +1,16 @@
+package test
+
+import org.scalatest._
+import funsuite._
+
+import test.distributed._
+import test.predict._
+
+class AllTests extends Sequential(
+  new test.predict.BaselineTests,
+  new test.distributed.DistributedBaselineTests,
+  new test.predict.PersonalizedTests,
+  new test.predict.kNNTests,
+  new test.recommend.RecommenderTests
+)
+
diff --git a/src/test/scala/PredictionTest.scala b/src/test/scala/PredictionTest.scala
deleted file mode 100644
index 48cab00..0000000
--- a/src/test/scala/PredictionTest.scala
+++ /dev/null
@@ -1,6 +0,0 @@
-import org.scalatest.funsuite._
-final class Prediction extends AnyFunSuite {
-  test("prediction works") {
-    assert(true)
-  }
-}
diff --git a/src/test/scala/RecommendationTest.scala b/src/test/scala/RecommendationTest.scala
deleted file mode 100644
index d8ead18..0000000
--- a/src/test/scala/RecommendationTest.scala
+++ /dev/null
@@ -1,6 +0,0 @@
-import org.scalatest.funsuite._
-final class Recommendation extends AnyFunSuite {
-  test("recommendation works") {
-    assert(true)
-  }
-}
diff --git a/src/test/scala/distributed/DistributedBaselineTests.scala b/src/test/scala/distributed/DistributedBaselineTests.scala
new file mode 100644
index 0000000..927f925
--- /dev/null
+++ b/src/test/scala/distributed/DistributedBaselineTests.scala
@@ -0,0 +1,62 @@
+package test.distributed
+
+import org.scalatest._
+
+import funsuite._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import shared.predictions._
+import tests.shared.helpers._
+
+class DistributedBaselineTests extends AnyFunSuite with BeforeAndAfterAll {
+
+   val separator = "\t"
+   var spark : org.apache.spark.sql.SparkSession = _
+
+   val train2Path = "data/ml-100k/u2.base"
+   val test2Path = "data/ml-100k/u2.test"
+   var train2 : org.apache.spark.rdd.RDD[shared.predictions.Rating] = null
+   var test2 : org.apache.spark.rdd.RDD[shared.predictions.Rating] = null
+
+   override def beforeAll {
+       Logger.getLogger("org").setLevel(Level.OFF)
+       Logger.getLogger("akka").setLevel(Level.OFF)
+       spark = SparkSession.builder()
+           .master("local[1]")
+           .getOrCreate()
+       spark.sparkContext.setLogLevel("ERROR")
+       train2 = load(spark, train2Path, separator)
+       test2 = load(spark, test2Path, separator)
+   }
+
+   // All the functions definitions for the tests below (and the tests in other suites) 
+   // should be in a single library, 'src/main/scala/shared/predictions.scala'.
+
+   // Provide tests to show how to call your code to do the following tasks (each in with their own test):
+   // each method should be invoked with a single function call. 
+   // Ensure you use the same function calls to produce the JSON outputs in
+   // src/main/scala/predict/Baseline.scala.
+   // Add assertions with the answer you expect from your code, up to the 4th
+   // decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
+   test("Compute global average")                           { assert(within(1.0, 0.0, 0.0001)) }
+   test("Compute user 1 average")                           { assert(within(1.0, 0.0, 0.0001)) }
+   test("Compute item 1 average")                           { assert(within(1.0, 0.0, 0.0001)) }
+   test("Compute item 1 average deviation")                 { assert(within(1.0, 0.0, 0.0001)) }
+   test("Compute baseline prediction for user 1 on item 1") { assert(within(1.0, 0.0, 0.0001)) }
+
+   // Show how to compute the MAE on all four non-personalized methods:
+   // 1. There should be four different functions, one for each method, to create a predictor
+   // with the following signature: ````predictor: (train: Seq[shared.predictions.Rating]) => ((u: Int, i: Int) => Double)````;
+   // 2. There should be a single reusable function to compute the MAE on the test set, given a predictor;
+   // 3. There should be invocations of both to show they work on the following datasets.
+   test("MAE on all four non-personalized methods on data/ml-100k/u2.base and data/ml-100k/u2.test") {
+     assert(within(1.0, 0.0, 0.0001))
+     assert(within(1.0, 0.0, 0.0001))
+     assert(within(1.0, 0.0, 0.0001))
+     assert(within(1.0, 0.0, 0.0001))
+   }
+}
diff --git a/src/test/scala/predict/BaselineTests.scala b/src/test/scala/predict/BaselineTests.scala
new file mode 100644
index 0000000..65765f8
--- /dev/null
+++ b/src/test/scala/predict/BaselineTests.scala
@@ -0,0 +1,65 @@
+package test.predict
+
+import org.scalatest._
+import funsuite._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import shared.predictions._
+import tests.shared.helpers._
+import ujson._
+
+class BaselineTests extends AnyFunSuite with BeforeAndAfterAll {
+
+   val separator = "\t"
+   var spark : org.apache.spark.sql.SparkSession = _
+
+   val train2Path = "data/ml-100k/u2.base"
+   val test2Path = "data/ml-100k/u2.test"
+   var train2 : Array[shared.predictions.Rating] = null
+   var test2 : Array[shared.predictions.Rating] = null
+
+   override def beforeAll {
+       Logger.getLogger("org").setLevel(Level.OFF)
+       Logger.getLogger("akka").setLevel(Level.OFF)
+       spark = SparkSession.builder()
+           .master("local[1]")
+           .getOrCreate()
+       spark.sparkContext.setLogLevel("ERROR")
+
+       // For these questions, train and test are collected in a scala Array
+       // to not depend on Spark
+       train2 = load(spark, train2Path, separator).collect()
+       test2 = load(spark, test2Path, separator).collect()
+   }
+
+   // All the functions definitions for the tests below (and the tests in other suites) 
+   // should be in a single library, 'src/main/scala/shared/predictions.scala'.
+
+   // Provide tests to show how to call your code to do the following tasks (each in with their own test):
+   // each method should be invoked with a single function call. 
+   // Ensure you use the same function calls to produce the JSON outputs in
+   // src/main/scala/predict/Baseline.scala.
+   // Add assertions with the answer you expect from your code, up to the 4th
+   // decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
+   test("Compute global average")                           { assert(within(1.0, 0.0, 0.0001)) }
+   test("Compute user 1 average")                           { assert(within(1.0, 0.0, 0.0001)) }
+   test("Compute item 1 average")                           { assert(within(1.0, 0.0, 0.0001)) }
+   test("Compute item 1 average deviation")                 { assert(within(1.0, 0.0, 0.0001)) }
+   test("Compute baseline prediction for user 1 on item 1") { assert(within(1.0, 0.0, 0.0001)) }
+
+   // Show how to compute the MAE on all four non-personalized methods:
+   // 1. There should be four different functions, one for each method, to create a predictor
+   // with the following signature: ````predictor: (train: Seq[shared.predictions.Rating]) => ((u: Int, i: Int) => Double)````;
+   // 2. There should be a single reusable function to compute the MAE on the test set, given a predictor;
+   // 3. There should be invocations of both to show they work on the following datasets.
+   test("MAE on all four non-personalized methods on data/ml-100k/u2.base and data/ml-100k/u2.test") {
+     assert(within(1.0, 0.0, 0.0001))
+     assert(within(1.0, 0.0, 0.0001))
+     assert(within(1.0, 0.0, 0.0001))
+     assert(within(1.0, 0.0, 0.0001))
+   }
+}
diff --git a/src/test/scala/predict/PersonalizedTests.scala b/src/test/scala/predict/PersonalizedTests.scala
new file mode 100644
index 0000000..0685979
--- /dev/null
+++ b/src/test/scala/predict/PersonalizedTests.scala
@@ -0,0 +1,81 @@
+package test.predict
+
+import org.scalatest._
+import funsuite._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import shared.predictions._
+import tests.shared.helpers._
+import ujson._
+
+class PersonalizedTests extends AnyFunSuite with BeforeAndAfterAll {
+
+   val separator = "\t"
+   var spark : org.apache.spark.sql.SparkSession = _
+
+   val train2Path = "data/ml-100k/u2.base"
+   val test2Path = "data/ml-100k/u2.test"
+   var train2 : Array[shared.predictions.Rating] = null
+   var test2 : Array[shared.predictions.Rating] = null
+
+   override def beforeAll {
+       Logger.getLogger("org").setLevel(Level.OFF)
+       Logger.getLogger("akka").setLevel(Level.OFF)
+       spark = SparkSession.builder()
+           .master("local[1]")
+           .getOrCreate()
+       spark.sparkContext.setLogLevel("ERROR")
+       // For these questions, train and test are collected in a scala Array
+       // to not depend on Spark
+       train2 = load(spark, train2Path, separator).collect()
+       test2 = load(spark, test2Path, separator).collect()
+   }
+
+   // All the functions definitions for the tests below (and the tests in other suites) 
+   // should be in a single library, 'src/main/scala/shared/predictions.scala'.
+
+   // Provide tests to show how to call your code to do the following tasks.
+   // Ensure you use the same function calls to produce the JSON outputs in
+   // src/main/scala/predict/Baseline.scala.
+   // Add assertions with the answer you expect from your code, up to the 4th
+   // decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
+   test("Test uniform unary similarities") { 
+     // Create predictor with uniform similarities
+     
+     // Compute personalized prediction for user 1 on item 1
+     assert(within(1.0, 0.0, 0.0001))
+
+     // MAE 
+     assert(within(1.0, 0.0, 0.0001))
+   } 
+
+   test("Test ajusted cosine similarity") { 
+     // Create predictor with adjusted cosine similarities
+
+     // Similarity between user 1 and user 2
+     assert(within(1.0, 0.0, 0.0001))
+
+     // Compute personalized prediction for user 1 on item 1
+     assert(within(1.0, 0.0, 0.0001))
+
+     // MAE 
+     assert(within(1.0, 0.0, 0.0001))
+   }
+
+   test("Test jaccard similarity") { 
+     // Create predictor with jaccard similarities
+
+     // Similarity between user 1 and user 2
+     assert(within(1.0, 0.0, 0.0001))
+
+     // Compute personalized prediction for user 1 on item 1
+     assert(within(1.0, 0.0, 0.0001))
+
+     // MAE 
+     assert(within(1.0, 0.0, 0.0001))
+   }
+}
diff --git a/src/test/scala/predict/kNNTests.scala b/src/test/scala/predict/kNNTests.scala
new file mode 100644
index 0000000..fe4c348
--- /dev/null
+++ b/src/test/scala/predict/kNNTests.scala
@@ -0,0 +1,74 @@
+package test.predict
+
+import org.scalatest._
+import funsuite._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import shared.predictions._
+import tests.shared.helpers._
+import ujson._
+
+class kNNTests extends AnyFunSuite with BeforeAndAfterAll {
+
+   val separator = "\t"
+   var spark : org.apache.spark.sql.SparkSession = _
+
+   val train2Path = "data/ml-100k/u2.base"
+   val test2Path = "data/ml-100k/u2.test"
+   var train2 : Array[shared.predictions.Rating] = null
+   var test2 : Array[shared.predictions.Rating] = null
+
+   var adjustedCosine : Map[Int, Map[Int, Double]] = null
+
+   override def beforeAll {
+       Logger.getLogger("org").setLevel(Level.OFF)
+       Logger.getLogger("akka").setLevel(Level.OFF)
+       spark = SparkSession.builder()
+           .master("local[1]")
+           .getOrCreate()
+       spark.sparkContext.setLogLevel("ERROR")
+
+       // For these questions, train and test are collected in a scala Array
+       // to not depend on Spark
+       train2 = load(spark, train2Path, separator).collect()
+       test2 = load(spark, test2Path, separator).collect()
+   }
+
+   // All the functions definitions for the tests below (and the tests in other suites) 
+   // should be in a single library, 'src/main/scala/shared/predictions.scala'.
+
+   // Provide tests to show how to call your code to do the following tasks.
+   // Ensure you use the same function calls to produce the JSON outputs in
+   // src/main/scala/predict/Baseline.scala.
+   // Add assertions with the answer you expect from your code, up to the 4th
+   // decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
+   test("kNN predictor with k=10") { 
+     // Create predictor on train2
+
+     // Similarity between user 1 and itself
+     assert(within(1.0, 0.0, 0.0001))
+ 
+     // Similarity between user 1 and 864
+     assert(within(1.0, 0.0, 0.0001))
+
+     // Similarity between user 1 and 886
+     assert(within(1.0, 0.0, 0.0001))
+
+     // Prediction user 1 and item 1
+     assert(within(1.0, 0.0, 0.0001))
+
+     // MAE on test2 
+     assert(within(1.0, 0.0, 0.0001))
+   } 
+
+   test("kNN Mae") {
+     // Compute MAE for k around the baseline MAE
+     
+     // Ensure the MAEs are indeed lower/higher than baseline
+     assert(1.0 < 0.0)
+   }
+}
diff --git a/src/test/scala/recommend/RecommenderTests.scala b/src/test/scala/recommend/RecommenderTests.scala
new file mode 100644
index 0000000..4a1a420
--- /dev/null
+++ b/src/test/scala/recommend/RecommenderTests.scala
@@ -0,0 +1,67 @@
+package test.recommend
+
+import org.scalatest._
+import funsuite._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import shared.predictions._
+import tests.shared.helpers._
+import ujson._
+
+class RecommenderTests extends AnyFunSuite with BeforeAndAfterAll {
+
+   val separator = "\t"
+   var spark : org.apache.spark.sql.SparkSession = _
+
+   val dataPath = "data/ml-100k/u.data"
+   val personalPath = "data/personal.csv"
+   var data : Array[shared.predictions.Rating] = null
+   var personal : Array[shared.predictions.Rating] = null
+   var train : Array[shared.predictions.Rating] = null
+   var predictor : (Int, Int) => Double = null
+
+   override def beforeAll {
+     Logger.getLogger("org").setLevel(Level.OFF)
+     Logger.getLogger("akka").setLevel(Level.OFF)
+     spark = SparkSession.builder()
+         .master("local[1]")
+         .getOrCreate()
+     spark.sparkContext.setLogLevel("ERROR")
+    
+     data = load(spark, dataPath, separator).collect()
+
+     println("Loading personal data from: " + personalPath) 
+     val personalFile = spark.sparkContext.textFile(personalPath)
+     personal = personalFile.map(l => {
+         val cols = l.split(",").map(_.trim)
+         if (cols(0) == "id") 
+           Rating(944,0,0.0)
+         else 
+           if (cols.length < 3) 
+             Rating(944, cols(0).toInt, 0.0)
+           else
+             Rating(944, cols(0).toInt, cols(2).toDouble)
+     }).filter(r => r.rating != 0).collect()
+
+     // TODO: Create predictor
+   }
+
+   // All the functions definitions for the tests below (and the tests in other suites) 
+   // should be in a single library, 'src/main/scala/shared/predictions.scala'.
+   //
+   test("Prediction for user 1 of item 1") {
+     assert(within(1.0, 0.0, 0.0001))
+   }
+
+   test("Top 3 recommendations for user 944") {
+     val recommendations = List((1,0.0), (2,0.0), (3,0.0))
+     assert(recommendations(0)._1 == 4)
+     assert(within(recommendations(0)._2, 5.0, 0.0001))
+     // Idem recommendation 2 and 3
+   }
+
+}
diff --git a/src/test/scala/shared/helpers.scala b/src/test/scala/shared/helpers.scala
new file mode 100644
index 0000000..f7c0769
--- /dev/null
+++ b/src/test/scala/shared/helpers.scala
@@ -0,0 +1,9 @@
+package tests.shared
+
+package object helpers {
+
+  def within(actual :Double, expected :Double, interval :Double) : Boolean = {
+    return actual >= (expected - interval) && actual <= (expected + interval)
+  }
+
+}
diff --git a/test.sh b/test.sh
new file mode 100755
index 0000000..c242faf
--- /dev/null
+++ b/test.sh
@@ -0,0 +1,10 @@
+#!/usr/bin/env bash
+# If your default java install does not work, explicitly 
+# provide the path to the JDK 1.8 installation. On OSX
+# with homebrew:
+# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282; ./test.sh
+export JAVA_OPTS="-Xmx8G";
+RUN=./logs/test-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
+mkdir -p $RUN
+LOGS=$RUN/log.txt
+sbt "testOnly test.AllTests" 2>&1 >>$LOGS
diff --git a/timeCluster.sh b/timeCluster.sh
new file mode 100755
index 0000000..597d3c3
--- /dev/null
+++ b/timeCluster.sh
@@ -0,0 +1,16 @@
+#!/usr/bin/env bash
+# If your default java install does not work, explicitly 
+# provide the path to the JDK 1.8 installation. On OSX
+# with homebrew:
+# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282; ./run.sh
+export JAVA_OPTS="-Xmx8G";
+RUN=./logs/timecluster-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
+mkdir -p $RUN
+LOGS=$RUN/log.txt
+source ./config.sh 
+echo "------------------- DISTRIBUTED ---------------------" >> $LOGS
+sbt assembly
+# 1 Executor
+spark-submit --class distributed.DistributedBaseline --master $SPARKMASTER --num-executors 1 target/scala-2.11/m1_yourid-assembly-1.0.jar  --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/distributed-25m-1.json --num_measurements 3 2>&1 >>$LOGS
+# 4 Executors
+spark-submit --class distributed.DistributedBaseline --master $SPARKMASTER --num-executors 4 target/scala-2.11/m1_yourid-assembly-1.0.jar  --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/distributed-25m-4.json --num_measurements 3 2>&1 >>$LOGS
diff --git a/timeOthers.sh b/timeOthers.sh
new file mode 100755
index 0000000..6dd08e6
--- /dev/null
+++ b/timeOthers.sh
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+# If your default java install does not work, explicitly 
+# provide the path to the JDK 1.8 installation. On OSX
+# with homebrew:
+# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282;
+export JAVA_OPTS="-Xmx8G";
+RUN=./logs/timeOthers-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
+mkdir -p $RUN
+LOGS=$RUN/log.txt
+echo "------------------- BASELINE    ---------------------" >> $LOGS
+sbt "runMain predict.Baseline --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json $RUN/baseline-100k.json --num_measurements 3" 2>&1 >>$LOGS
+echo "------------------- DISTRIBUTED ---------------------" >> $LOGS
+sbt "runMain predict.Baseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --separator , --json $RUN/baseline-25m.json --num_measurements 3" 2>&1 >> $LOGS
+sbt "runMain distributed.DistributedBaseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --separator , --json $RUN/distributed-25m-1.json --num_measurements 3 --master local[1]" 2>&1 >>$LOGS
+sbt "runMain distributed.DistributedBaseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --separator , --json $RUN/distributed-25m-4.json --num_measurements 3 --master local[4]" 2>&1 >>$LOGS
diff --git a/timeTrials.sh b/timeTrials.sh
new file mode 100755
index 0000000..7329d07
--- /dev/null
+++ b/timeTrials.sh
@@ -0,0 +1,11 @@
+#!/usr/bin/env bash
+# If your default java install does not work, explicitly 
+# provide the path to the JDK 1.8 installation. On OSX
+# with homebrew:
+# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282;
+export JAVA_OPTS="-Xmx8G";
+RUN=./logs/timetrials-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
+mkdir -p $RUN
+LOGS=$RUN/log.txt
+echo "------------------- KNN -----------------------------" >> $LOGS
+sbt "runMain predict.kNN --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json $RUN/knn-100k.json --num_measurements 3" 2>&1 >>$LOGS
-- 
GitLab