Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/cs-449-sds-public/project/cs449-template-m2-2022
  • hlanfran/cs449-template-m2-2022
2 results
Show changes
Commits on Source (27)
Showing
with 745 additions and 601 deletions
File deleted
\documentclass{article}
\usepackage{hyperref}
\usepackage{algorithm}
\usepackage{algpseudocode}
\usepackage{ dsfont }
\usepackage{amsmath}
\usepackage{filemod}
\usepackage{ulem}
\usepackage{graphicx}
\usepackage{todonotes}
\input{Milestone-2-questions.sty}
% If you use BibTeX in apalike style, activate the following line:
\bibliographystyle{acm}
\title{CS-449 Project Milestone 2: Optimizing, Scaling, and Economics}
\author{
\textbf{Name}: xxx\\
\textbf{Sciper}: xxx\\
\textbf{Email:} xxx\\
\textbf{Name}: xxx\\
\textbf{Sciper}: xxx\\
\textbf{Email:} xxx\\
}
\begin{document}
\maketitle
\section{Optimizing with Breeze, a Linear Algebra Library}
\label{section:optimization}
\begin{itemize}
\item[\textbf{BR.1}] \BROne
\item [\textbf{BR.2}] \BRTwoOne
\BRTwoTwo
\BRTwoThree
\end{itemize}
\section{Parallel k-NN Computations with Replicated Ratings}
\begin{enumerate}
\item [\textbf{EK.1}] \EKOne
\item [\textbf{EK.2}] \EKTwo
\end{enumerate}
\section{Distributed Approximate k-NN}
\begin{enumerate}
\item [\textbf{AK.1}] \AKOne
\item [\textbf{AK.2}] \AKTwo
\item [\textbf{AK.3}] \AKThree
\end{enumerate}
\section{Economics}
\textit{Implement the computations for the different answers in the Economics.scala file. You don't need to provide unit tests for this question, nor written answers for these questions in your report.}
\end{document}
\ProvidesPackage{m2questions}[2022/03/11 v1.0]
% Breeze
\newcommand{\BROne}{
Reimplement the kNN predictor of Milestone 1 using the Breeze library and without using Spark. Using $k=10$ and \texttt{data/ml-100k/u2.base} for training, output the similarities between: (1) user $1$ and itself; (2) user $1$ and user $864$; (3) user $1$ and user $886$. Still using $k=10$, output the prediction for user 1 and item 1 ($p_{1,1}$), the prediction for user 327 and item 2 ($p_{327,2}$), and make sure that you obtain an MAE of $0.8287 \pm 0.0001$ on \texttt{data/ml-100k/u2.test}.
}
\newcommand{\BRTwoOne}{
Try making your implementation as fast as possible, both for computing all k-nearest neighbours and for computing the predictions and MAE on a test set. Your implementation should be based around \texttt{CSCMatrix}, but may involve conversions for individual operations. We will test your implementation on a secret test set. The teams with both a correct answer and the shortest time will receive more points.
}
\newcommand{\BRTwoTwo}{
Using $k=300$, compare the time for predicting all values and computing the MAE of \texttt{ml-100k/u2.test} to the one you obtained in Milestone 1. What is the speedup of your new implementation (as a ratio of $\frac{\textit{average time}_{old}}{\textit{average time}_{new}}$)? Use the same machine to measure the time for both versions and provide the answer in your report.
}
\newcommand{\BRTwoThree}{
Also ensure your implementation works with \texttt{data/ml-1m/rb.train} and \texttt{data/ml-1m/rb.test} since you will reuse it in the next questions.
}
% Parallel Exact Knn
\newcommand{\EKOne}{
Test your parallel implementation of k-NN for correctness with two workers. Using $k=10$ and \texttt{data/ml-100k/u2.base} for training, output the similarities between: (1) user $1$ and itself; (2) user $1$ and user $864$; (3) user $1$ and user $886$. Still using $k=10$, output the prediction for user 1 and item 1 ($p_{1,1}$), the prediction for user 327 and item 2 ($p_{327,2}$), and make sure that you obtain an MAE of $0.8287 \pm 0.0001$ on \texttt{data/ml-100k/u2.test}
}
\newcommand{\EKTwo}{
Measure and report the combined \textit{k-NN} and \textit{prediction} time when using 1, 2, 4 workers, $k=300$, and \texttt{ml-1m/rb.train} for training and \texttt{ml-1m/rb.test} for test, on the cluster (or a machine with at least 4 physical cores). Perform 3 measurements for each experiment and report the average and standard-deviation total time, including training, making predictions, and computing the MAE. Do you observe a speedup? Does this speedup grow linearly with the number of executors, i.e. is the running time $X$ times faster when using $X$ executors compared to using a single executor? Answer both questions in your report.
}
% Approximate Knn
\newcommand{\AKOne}{
Implement the approximate k-NN using your previous breeze implementation and Spark's RDDs. Using the partitioner of the template with 10 partitions and 2 replications, $k=10$, and \texttt{data/ml-100k/u2.base} for training, output the similarities of the approximate k-NN between user $1$ and the following users: $1,864,344,16,334,2$.
}
\newcommand{\AKTwo}{
Vary the number of partitions in which a given user appears. For the \texttt{data/ml-100k/u2.base} training set, partitioned equally between 10 workers, report the relationship between the level of replication (1,2,3,4,6,8) and the MAE you obtain on the \texttt{data/ml-100k/u2.test} test set. What is the minimum level of replication such that the MAE is still lower than the baseline predictor of Milestone 1 (MAE of 0.7604), when using $k=300$? Does this reduce the number of similarity computations compared to an exact k-NN? What is the ratio? Answer both questions in your report.
}
\newcommand{\AKThree}{
Measure and report the time required by your approximate \textit{k-NN} implementation, including both training on \texttt{data/ml-1m/rb.train} and computing the MAE on the test set \texttt{data/ml-1m/rb.test}, using $k=300$ on 8 partitions with a replication factor of 1 when using 1, 2, 4 workers. Perform each experiment 3 times and report the average and standard-deviation. Do you observe a speedup compared to the parallel (exact) k-NN with replicated ratings for the same number of workers?
}
% Economics
\newcommand{\EOne}{
What is the minimum number of days of renting to make buying the ICC.M7 less expensive, excluding any operating costs such as electricity and maintenance? Round up to the nearest integer.
}
\newcommand{\ETwoOne}{
After how many days of renting a container, is the cost higher than buying and running 4 Raspberry Pis? (1) Assuming optimistically no maintenance at minimum power usage for RPis, and (2) no maintenance at maximum power usage for RPis, to obtain a likely range. (Round up to the nearest integer in each case).
}
\newcommand{\ETwoTwo}{
Assume a single processor for the container and an equivalent amount of total RAM as the 4 Raspberry Pis. Also provide unrounded intermediary results for (1) Container Daily Cost, (2) 4 RPis (Idle) Daily Electricity Cost, (3) 4 RPis (Computing) Daily Electricity Cost.
}
\newcommand{\EThree}{
For the same buying price as an ICC.M7, how many Raspberry Pis can you get (floor the result to remove the decimal)? Assuming perfect scaling, would you obtain a larger overall throughput and RAM from these? If so, by how much? Compute the ratios using the previous floored number of RPis, but do not round the final results.
}
\ No newline at end of file
File added
# Milestone Description
[To Be Released](./Milestone-1.pdf)
[Milestone-2.pdf](./Milestone-2.pdf)
Note: Section 'Updates' lists the updates since the original release of the Milestone..
Note: Section 'Updates' lists the updates since the original release of the Milestone.
Mu has prepared a report template for your convenience here: [Report Template](./Milestone-2-QA-template.tex).
# Dependencies
......@@ -13,6 +15,11 @@ Note: Section 'Updates' lists the updates since the original release of the Mile
Should be available by default on ````iccluster028.iccluster.epfl.ch````. Otherwise, refer to each project installation instructions. Prefer working locally on your own machine, you will have less interference in your measurements from other students.
If you work on ````iccluster028.iccluster.epfl.ch````, you need to modify the PATH by default by adding the following line in ````~/.bashrc````:
````
export PATH=$PATH:/opt/sbt/sbt/bin
````
If you have multiple installations of openjdk, you need to specify the one to use as JAVA_HOME, e.g. on OSX with
openjdk@8 installed through Homebrew, you would do:
````
......@@ -21,33 +28,16 @@ openjdk@8 installed through Homebrew, you would do:
# Dataset
Download [data.zip](https://gitlab.epfl.ch/sacs/cs-449-sds-public/project/dataset/-/raw/main/data.zip).
Download [data-m2.zip](https://gitlab.epfl.ch/sacs/cs-449-sds-public/project/dataset/-/raw/main/data-m2.zip).
Unzip:
````
> unzip data.zip
> unzip data-m2.zip
````
It should unzip into ````data/```` by default. If not, manually move ````ml-100k```` and ````ml-25m```` into ````data/````.
It should unzip into ````data/```` by default. If not, manually move ````ml-100k```` and ````ml-1m```` into ````data/````.
# Personal Ratings
Additional personal ratings are provided in the 'data/personal.csv' file in a
csv format with ````<movie>, <movie title>, <rating>```` to test your recommender.
You can copy this file and change the ratings, with values [1,5] to obtain
references more to your liking!
Entries with no rating are in the following format:
````
1,Toy Story (1995),
````
Entries with ratings are in the following format:
````
1,Toy Story (1995),5
````
# Repository Structure
````src/main/scala/shared/predictions.scala````:
......@@ -56,11 +46,10 @@ This code should then be used in the following applications and tests.
## Applications
````src/main/scala/predict/Baseline.scala````: Output answers to questions **B.X**.
````src/main/scala/distributed/DistributedBaseline.scala````: Output answers to questions **D.X**.
````src/main/scala/predict/Personalized.scala````: Output answers to questions questions **P.X**.
````src/main/scala/predict/kNN.scala````: Output answers to questions questions **N.X**.
````src/main/scala/recommend/Recommender.scala````: Output answers to questions questions **N.X**.
1. ````src/main/scala/optimizing/Optimizing.scala````: Output answers to questions **BR.X**.
2. ````src/main/scala/distributed/Exact.scala````: Output answers to questions **EK.X**.
3. ````src/main/scala/distributed/Approximate.scala````: Output answers to questions **AK.X**.
4. ````src/main/scala/economics/Economics.scala````: Output answers to questions **E.X**
Applications are separate from tests to make it easier to test with different
inputs and permit outputting your answers and timings in JSON format for easier
......@@ -68,14 +57,12 @@ grading.
## Unit Tests
Corresponding unit tests for each application:
Corresponding unit tests for each application (except Economics.scala):
````
src/test/scala/predict/BaselineTests.scala
src/test/scala/distributed/DistributedBaselineTests.scala
src/test/scala/predict/PersonalizedTests.scala
src/test/scala/predict/kNNTests.scala
src/test/scala/recommend/RecommenderTests.scala
src/test/scala/optimizing/OptimizingTests.scala
src/test/scala/distributed/ExactTests.scala
src/test/scala/distributed/ApproximateTests.scala
````
Your tests should demonstrate how to call your code to obtain the answers of
......@@ -88,48 +75,36 @@ clear and regular structure to check its correctness.
## Execute unit tests
````sbt "testOnly test.AllTests"````
````
sbt "testOnly test.AllTests"
````
You should fill all tests and ensure they all succeed prior to submission.
## Run applications
### Baseline
On ````ml-100k````:
````
sbt "runMain predict.Baseline --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json baseline-100k.json"
````
### Optimizing
On ````ml-25m````:
````
sbt "runMain predict.Baseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --json baseline-25m.json"
sbt "runMain scaling.Optimizing --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json optimizing-100k.json --master local[1] --users 943 --movies 1682"
````
### Distributed Baseline
### Parallel Exact KNN
````
sbt "runMain distributed.DistributedBaseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --separator , --json distributed-25m-4.json --master local[4]"
sbt "runMain distributed.Exact --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json exact-100k-4.json --k 10 --master local[4] --users 943 --movies 1682"
````
You can vary the number of executors used locally by using ````local[X]```` with X being an integer representing the number of cores you want to use locally.
### Personalized
### Approximate KNN
````
sbt "runMain predict.Personalized --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json personalized-100k.json"
sbt "runMain distributed.Approximate --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json approximate-100k-4-k10-r2.json --k 10 --master local[4] --users 943 --movies 1682 --partitions 10 --replication 2"
````
### kNN
### Economics
````
sbt "runMain predict.kNN --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json knn-100k.json"
````
### Recommender
````
sbt "runMain recommend.Recommender --data data/ml-100k/u.data --personal data/personal.csv --json recommender-100k.json"
sbt "runMain economics.Economics --json economics.json"
````
## Time applications
......@@ -147,7 +122,7 @@ for other students.
````sbt clean````: clean up temporary files and previous assembly packages.
````sbt assembly````: create a new jar
````target/scala-2.11/m1_yourid-assembly-1.0.jar```` that can be used with
````target/scala-2.11/m2_yourid-assembly-1.0.jar```` that can be used with
````spark-submit````.
Prefer packaging your application locally and upload the tar archive of your application
......@@ -156,26 +131,33 @@ before running on cluster.
### Upload jar on Cluster
````
scp target/scala-2.11/m1_yourid-assembly-1.0.jar <username>@iccluster028.iccluster.epfl.ch:~
scp target/scala-2.11/m2_yourid-assembly-1.0.jar <username>@iccluster028.iccluster.epfl.ch:~
````
### Run on Cluster
See [config.sh](./config.sh) for HDFS paths to pre-uploaded train and test datasets to replace TRAIN and TEST, like in the example commands below:
#### When using ML-100k
````
spark-submit --class distributed.Exact --master yarn --conf "spark.dynamicAllocation.enabled=false" --num-executors 1 m2_yourid-assembly-1.0.jar --json exact-100k-1.json --train $ML100Ku2base --test $ML100Ku2test
````
spark-submit --class distributed.DistributedBaseline --master yarn --num-executors 1 target/scala-2.11/m1_yourid-assembly-1.0.jar --train TRAIN --test TEST --separator , --json distributed-25m-1.json --num_measurements 1
#### When using ML-1m
````
spark-submit --class distributed.Exact --master yarn --conf "spark.dynamicAllocation.enabled=false" --num-executors 1 m2_yourid-assembly-1.0.jar --json exact-1m-1.json --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --k 300 --users 6040 --movies 3952
````
See [config.sh](./config.sh) for HDFS paths to pre-uploaded TRAIN and TEST datasets. You can vary the number of executors with ````--num-executors X````, and number of measurements with ````--num_measurements Y````.
In order to keep results obtained with different parameters in different .json files, simply modify the corresponding parameter ("--json") passed and the values. For instance, with ```--num-executors 4``` : ```--json exact-1m-4.json```.
Note that when changing from ML-100k to ML-1M, the parameter ```--separator ::``` should be added, and the number of users and movies should be modified.
## Grading scripts
We will use the following scripts to grade your submission:
1. ````./test.sh````: Run all unit tests.
2. ````./run.sh````: Run all applications without timing measurements.
3. ````./timeTrials.sh````: Time applications to determine which student implementations are fastest.
4. ````./timeOthers.sh````: Time applications to check report answers against independent measurements.
4. ````./timeCluster.sh````: Package and time applications on Spark Cluster.
3. ````./time.sh````: Run all timing measurements.
All scripts will produce execution logs in the ````logs````
directory, including answers produced in the JSON format. Logs directories are
......@@ -183,26 +165,15 @@ in the format ````logs/<scriptname>-<datetime>-<machine>/```` and include at
least an execution log ````log.txt```` as well as possible JSON outputs from
applications.
Ensure all scripts run correctly locally before submitting. Avoid running
````timeCluster.sh```` on iccluster as the packaging and measurements will
interfere with other students working on their Milestone at the same time. If
````timeCluster.sh```` correctly runs locally on your machine, this should be
sufficient.
Ensure all scripts run correctly locally before submitting.
## Package for submission
## Submission
Steps:
1. Update the ````name````, ````maintainer```` fields of ````build.sbt````, with the correct Milestone number, your ID, and your email.
2. Ensure you only used the dependencies listed in ````build.sbt```` in this template, and did not add any other.
3. Remove ````project/project````, ````project/target````, and ````target/````.
4. Test that all previous commands for generating statistics, predictions, and recommendations correctly produce a JSON file (after downloading/reinstalling dependencies).
5. Remove the ml-100k dataset (````data/ml-100k.zip````, and ````data/ml-100k````), as well as the````project/project````, ````project/target````, and ````target/````.
6. Remove the ````.git```` repository information.
7. Add your report and any other necessary files listed in the Milestone description (see ````Deliverables````).
8. Zip the archive.
9. Submit to the TA for grading.
1. Create a zip archive with all your code within ````src/````, as well as your report: ````zip sciper1-sciper2.zip -r src/ report.pdf````
2. Submit ````sciper1-sciper2.zip```` the TA for grading on
https://cs449-submissions.epfl.ch:8083/m2 using the passcode you have previously received by email.
# References
......
name := "m1_yourid"
name := "m2_yourid"
version := "1.0"
libraryDependencies += "org.rogach" %% "scallop" % "4.0.2"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.7"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.7"
libraryDependencies += "org.scalanlp" %% "breeze" % "0.13.2"
libraryDependencies += "org.scalanlp" %% "breeze-natives" % "0.13.2"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.0" % Test
libraryDependencies += "com.lihaoyi" %% "ujson" % "1.5.0"
......
if [ $(hostname) == 'iccluster028' ];
then
export ML100Ku2base=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-100k/u2.base;
export ML100Ku2test=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-100k/u2.test;
export ML100Kudata=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-100k/u.data;
export ML25Mr2train=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-25m/r2.train;
export ML25Mr2test=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-25m/r2.test;
ICCLUSTER=hdfs://iccluster028.iccluster.epfl.ch:8020
export ML100Ku2base=$ICCLUSTER/cs449/data/ml-100k/u2.base;
export ML100Ku2test=$ICCLUSTER/cs449/data/ml-100k/u2.test;
export ML100Kudata=$ICCLUSTER/cs449/data/ml-100k/u.data;
export ML1Mrbtrain=$ICCLUSTER/cs449/data/ml-1m/rb.train;
export ML1Mrbtest=$ICCLUSTER/cs449/data/ml-1m/rb.test;
export SPARKMASTER='yarn'
else
export ML100Ku2base=data/ml-100k/u2.base;
export ML100Ku2test=data/ml-100k/u2.test;
export ML100Kudata=data/ml-100k/u.data;
export ML25Mr2train=data/ml-25m/r2.train;
export ML25Mr2test=data/ml-25m/r2.test;
export ML1Mrbtrain=data/ml-1m/rb.train;
export ML1Mrbtest=data/ml-1m/rb.test;
export SPARKMASTER='local[4]'
fi;
......@@ -8,14 +8,16 @@ RUN=./logs/run-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
mkdir -p $RUN
LOGS=$RUN/log.txt
source ./config.sh
echo "------------------- BASELINE ---------------------" >> $LOGS
sbt "runMain predict.Baseline --train $ML100Ku2base --test $ML100Ku2test --json $RUN/baseline-100k.json" 2>&1 >>$LOGS
echo "------------------- DISTRIBUTED ---------------------" >> $LOGS
sbt "runMain predict.Baseline --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/baseline-25m.json" 2>&1 >>$LOGS
sbt "runMain distributed.DistributedBaseline --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/distributed-25m-4.json --master $SPARKMASTER" 2>&1 >>$LOGS
echo "------------------- PERSONALIZED --------------------" >> $LOGS
sbt "runMain predict.Personalized --train $ML100Ku2base --test $ML100Ku2test --json $RUN/personalized-100k.json" 2>&1 >>$LOGS
echo "------------------- KNN -----------------------------" >> $LOGS
sbt "runMain predict.kNN --train $ML100Ku2base --test $ML100Ku2test --json $RUN/knn-100k.json" 2>&1 >>$LOGS
echo "------------------- RECOMMEND -----------------------" >> $LOGS
sbt "runMain recommend.Recommender --data $ML100Kudata --personal data/personal.csv --json $RUN/recommender-100k.json" 2>&1 >>$LOGS
echo "------------------- OPTIMIZING ---------------------" >> $LOGS
sbt "runMain scaling.Optimizing --train $ML100Ku2base --test $ML100Ku2test --json $RUN/optimizing-100k.json --users 943 --movies 1682 --master local[1]" 2>&1 >>$LOGS
echo "------------------- DISTRIBUTED EXACT ---------------------" >> $LOGS
sbt "runMain distributed.Exact --train $ML100Ku2base --test $ML100Ku2test --json $RUN/exact-100k-4.json --k 10 --master local[4] --users 943 --movies 1682" 2>&1 >>$LOGS
sbt "runMain distributed.Exact --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/exact-1m-4.json --k 300 --master local[4] --users 6040 --movies 3952" 2>&1 >>$LOGS
echo "------------------- DISTRIBUTED APPROXIMATE ---------------------" >> $LOGS
sbt "runMain distributed.Approximate --train $ML100Ku2base --test $ML100Ku2test --json $RUN/approximate-100k-4-k10-r2.json --k 10 --master local[4] --users 943 --movies 1682 --partitions 10 --replication 2" 2>&1 >>$LOGS;
for R in 1 2 3 4 6 8; do
sbt "runMain distributed.Approximate --train $ML100Ku2base --test $ML100Ku2test --json $RUN/approximate-100k-4-k300-r$R.json --k 300 --master local[4] --users 943 --movies 1682 --partitions 10 --replication $R" 2>&1 >>$LOGS;
done
sbt "runMain distributed.Approximate --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/approximate-1m-4.json --k 300 --master local[4] --users 6040 --movies 3952 --partitions 8 --replication 1" 2>&1 >>$LOGS
echo "------------------- ECONOMICS -----------------------------------" >> $LOGS
sbt "runMain economics.Economics --json $RUN/economics.json" 2>&1 >>$LOGS
import org.rogach.scallop._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import ujson._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import shared.predictions._
package distributed {
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val k = opt[Int]()
val json = opt[String]()
val users = opt[Int]()
val movies = opt[Int]()
val separator = opt[String](default=Some("\t"))
val replication = opt[Int](default=Some(1))
val partitions = opt[Int](default=Some(1))
val master = opt[String]()
val num_measurements = opt[Int](default=Some(1))
verify()
}
object Approximate {
def main(args: Array[String]) {
var conf = new Conf(args)
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = conf.master.toOption match {
case None => SparkSession.builder().getOrCreate();
case Some(master) => SparkSession.builder().master(master).getOrCreate();
}
val sc = spark.sparkContext
println("")
println("******************************************************")
// conf object is not serializable, extract values that
// will be serialized with the parallelize implementations
val conf_users = conf.users()
val conf_movies = conf.movies()
val conf_k = conf.k()
println("Loading training data")
val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies())
val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies())
var knn : CSCMatrix[Double] = null
println("Partitioning users")
var partitionedUsers : Seq[Set[Int]] = partitionUsers(
conf.users(),
conf.partitions(),
conf.replication()
)
val measurements = (1 to scala.math.max(1,conf.num_measurements()))
.map(_ => timingInMs( () => {
// Use partitionedUsers here
0.0
}))
val mae = measurements(0)._1
val timings = measurements.map(_._2)
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"train" -> ujson.Str(conf.train()),
"test" -> ujson.Str(conf.test()),
"k" -> ujson.Num(conf.k()),
"users" -> ujson.Num(conf.users()),
"movies" -> ujson.Num(conf.movies()),
"master" -> ujson.Str(sc.getConf.get("spark.master")),
"num-executors" -> ujson.Str(if (sc.getConf.contains("spark.executor.instances"))
sc.getConf.get("spark.executor.instances")
else
""),
"num_measurements" -> ujson.Num(conf.num_measurements()),
"partitions" -> ujson.Num(conf.partitions()),
"replication" -> ujson.Num(conf.replication())
),
"AK.1" -> ujson.Obj(
"knn_u1v1" -> ujson.Num(0.0),
"knn_u1v864" -> ujson.Num(0.0),
"knn_u1v344" -> ujson.Num(0.0),
"knn_u1v16" -> ujson.Num(0.0),
"knn_u1v334" -> ujson.Num(0.0),
"knn_u1v2" -> ujson.Num(0.0)
),
"AK.2" -> ujson.Obj(
"mae" -> ujson.Num(mae)
),
"AK.3" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)),
"stddev (ms)" -> ujson.Num(std(timings))
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.stop()
}
}
}
package distributed
import org.rogach.scallop._
import org.apache.spark.rdd.RDD
import ujson._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.math
import shared.predictions._
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val separator = opt[String](default=Some("\t"))
val master = opt[String](default=Some(""))
val num_measurements = opt[Int](default=Some(0))
val json = opt[String]()
verify()
}
object DistributedBaseline extends App {
var conf = new Conf(args)
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = if (conf.master() != "") {
SparkSession.builder().master(conf.master()).getOrCreate()
} else {
SparkSession.builder().getOrCreate()
}
spark.sparkContext.setLogLevel("ERROR")
println("")
println("******************************************************")
println("Loading training data from: " + conf.train())
val train = load(spark, conf.train(), conf.separator())
println("Loading test data from: " + conf.test())
val test = load(spark, conf.test(), conf.separator())
val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
Thread.sleep(1000) // Do everything here from train and test
42 // Output answer as last value
}))
val timings = measurements.map(t => t._2) // Retrieve the timing measurements
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"1.Train" -> conf.train(),
"2.Test" -> conf.test(),
"3.Master" -> conf.master(),
"4.Measurements" -> conf.num_measurements()
),
"D.1" -> ujson.Obj(
"1.GlobalAvg" -> ujson.Num(0.0), // Datatype of answer: Double
"2.User1Avg" -> ujson.Num(0.0), // Datatype of answer: Double
"3.Item1Avg" -> ujson.Num(0.0), // Datatype of answer: Double
"4.Item1AvgDev" -> ujson.Num(0.0), // Datatype of answer: Double,
"5.PredUser1Item1" -> ujson.Num(0.0), // Datatype of answer: Double
"6.Mae" -> ujson.Num(0.0) // Datatype of answer: Double
),
"D.2" -> ujson.Obj(
"1.DistributedBaseline" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
)
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.close()
}
import org.rogach.scallop._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import ujson._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import shared.predictions._
package distributed {
class ExactConf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val k = opt[Int](default=Some(10))
val json = opt[String]()
val users = opt[Int]()
val movies = opt[Int]()
val separator = opt[String](default=Some("\t"))
val master = opt[String]()
val num_measurements = opt[Int](default=Some(1))
verify()
}
object Exact {
def main(args: Array[String]) {
var conf = new ExactConf(args)
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = conf.master.toOption match {
case None => SparkSession.builder().getOrCreate();
case Some(master) => SparkSession.builder().master(master).getOrCreate();
}
spark.sparkContext.setLogLevel("ERROR")
val sc = spark.sparkContext
println("")
println("******************************************************")
// conf object is not serializable, extract values that
// will be serialized with the parallelize implementations
val conf_users = conf.users()
val conf_movies = conf.movies()
val conf_k = conf.k()
println("Loading training data from: " + conf.train())
val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies())
val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies())
val measurements = (1 to scala.math.max(1,conf.num_measurements())).map(_ => timingInMs( () => {
0.0
}))
val timings = measurements.map(_._2)
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"train" -> ujson.Str(conf.train()),
"test" -> ujson.Str(conf.test()),
"k" -> ujson.Num(conf.k()),
"users" -> ujson.Num(conf.users()),
"movies" -> ujson.Num(conf.movies()),
"master" -> ujson.Str(sc.getConf.get("spark.master")),
"num-executors" -> ujson.Str(if (sc.getConf.contains("spark.executor.instances"))
sc.getConf.get("spark.executor.instances")
else
""),
"num_measurements" -> ujson.Num(conf.num_measurements())
),
"EK.1" -> ujson.Obj(
"1.knn_u1v1" -> ujson.Num(0.0),
"2.knn_u1v864" -> ujson.Num(0.0),
"3.knn_u1v886" -> ujson.Num(0.0),
"4.PredUser1Item1" -> ujson.Num(0.0),
"5.PredUser327Item2" -> ujson.Num(0.0),
"6.Mae" -> ujson.Num(0.0)
),
"EK.2" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.stop()
}
}
}
import org.rogach.scallop._
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import ujson._
package economics {
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val json = opt[String]()
verify()
}
object Economics {
def main(args: Array[String]) {
println("")
println("******************************************************")
var conf = new Conf(args)
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"E.1" -> ujson.Obj(
"MinRentingDays" -> ujson.Num(0.0) // Datatype of answer: Double
),
"E.2" -> ujson.Obj(
"ContainerDailyCost" -> ujson.Num(0.0),
"4RPisDailyCostIdle" -> ujson.Num(0.0),
"4RPisDailyCostComputing" -> ujson.Num(0.0),
"MinRentingDaysIdleRPiPower" -> ujson.Num(0.0),
"MinRentingDaysComputingRPiPower" -> ujson.Num(0.0)
),
"E.3" -> ujson.Obj(
"NbRPisEqBuyingICCM7" -> ujson.Num(0.0),
"RatioRAMRPisVsICCM7" -> ujson.Num(0.0),
"RatioComputeRPisVsICCM7" -> ujson.Num(0.0)
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
}
}
}
import org.rogach.scallop._
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import ujson._
import shared.predictions._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
package scaling {
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val json = opt[String]()
val users = opt[Int]()
val movies = opt[Int]()
val separator = opt[String](default=Some("\t"))
val master = opt[String]()
val num_measurements = opt[Int](default=Some(1))
verify()
}
object Optimizing extends App {
var conf = new Conf(args)
// conf object is not serializable, extract values that
// will be serialized with the parallelize implementations
val conf_users = conf.users()
val conf_movies = conf.movies()
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = conf.master.toOption match {
case None => SparkSession.builder().getOrCreate();
case Some(master) => SparkSession.builder().master(master).getOrCreate();
}
spark.sparkContext.setLogLevel("ERROR")
val sc = spark.sparkContext
println("Loading training data from: " + conf.train())
val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies())
val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies())
val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
0.0
}))
val timings = measurements.map(t => t._2)
val mae = measurements(0)._1
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"train" -> ujson.Str(conf.train()),
"test" -> ujson.Str(conf.test()),
"users" -> ujson.Num(conf.users()),
"movies" -> ujson.Num(conf.movies()),
"master" -> ujson.Str(conf.master()),
"num_measurements" -> ujson.Num(conf.num_measurements())
),
"BR.1" -> ujson.Obj(
"1.k10u1v1" -> ujson.Num(0.0),
"2.k10u1v864" -> ujson.Num(0.0),
"3.k10u1v886" -> ujson.Num(0.0),
"4.PredUser1Item1" -> ujson.Num(0.0),
"5.PredUser327Item2" -> ujson.Num(0.0),
"6.Mae" -> ujson.Num(0.0)
),
"BR.2" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
}
}
package predict
import org.rogach.scallop._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.math
import shared.predictions._
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val separator = opt[String](default=Some("\t"))
val num_measurements = opt[Int](default=Some(0))
val json = opt[String]()
verify()
}
object Baseline extends App {
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
println("")
println("******************************************************")
var conf = new Conf(args)
// For these questions, data is collected in a scala Array
// to not depend on Spark
println("Loading training data from: " + conf.train())
val train = load(spark, conf.train(), conf.separator()).collect()
println("Loading test data from: " + conf.test())
val test = load(spark, conf.test(), conf.separator()).collect()
val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
Thread.sleep(1000) // Do everything here from train and test
42 // Output answer as last value
}))
val timings = measurements.map(t => t._2) // Retrieve the timing measurements
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
var answers = ujson.Obj(
"Meta" -> ujson.Obj(
"1.Train" -> ujson.Str(conf.train()),
"2.Test" -> ujson.Str(conf.test()),
"3.Measurements" -> ujson.Num(conf.num_measurements())
),
"B.1" -> ujson.Obj(
"1.GlobalAvg" -> ujson.Num(0.0), // Datatype of answer: Double
"2.User1Avg" -> ujson.Num(0.0), // Datatype of answer: Double
"3.Item1Avg" -> ujson.Num(0.0), // Datatype of answer: Double
"4.Item1AvgDev" -> ujson.Num(0.0), // Datatype of answer: Double
"5.PredUser1Item1" -> ujson.Num(0.0) // Datatype of answer: Double
),
"B.2" -> ujson.Obj(
"1.GlobalAvgMAE" -> ujson.Num(0.0), // Datatype of answer: Double
"2.UserAvgMAE" -> ujson.Num(0.0), // Datatype of answer: Double
"3.ItemAvgMAE" -> ujson.Num(0.0), // Datatype of answer: Double
"4.BaselineMAE" -> ujson.Num(0.0) // Datatype of answer: Double
),
"B.3" -> ujson.Obj(
"1.GlobalAvg" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
),
"2.UserAvg" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
),
"3.ItemAvg" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
),
"4.Baseline" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
)
)
)
val json = ujson.write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json.toString, jsonFile)
}
}
println("")
spark.close()
}
package predict
import org.rogach.scallop._
import org.apache.spark.rdd.RDD
import ujson._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.math
import shared.predictions._
class PersonalizedConf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val separator = opt[String](default=Some("\t"))
val num_measurements = opt[Int](default=Some(0))
val json = opt[String]()
verify()
}
object Personalized extends App {
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
println("")
println("******************************************************")
var conf = new PersonalizedConf(args)
println("Loading training data from: " + conf.train())
val train = load(spark, conf.train(), conf.separator()).collect()
println("Loading test data from: " + conf.test())
val test = load(spark, conf.test(), conf.separator()).collect()
// Compute here
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"1.Train" -> ujson.Str(conf.train()),
"2.Test" -> ujson.Str(conf.test()),
"3.Measurements" -> ujson.Num(conf.num_measurements())
),
"P.1" -> ujson.Obj(
"1.PredUser1Item1" -> ujson.Num(0.0), // Prediction of item 1 for user 1 (similarity 1 between users)
"2.OnesMAE" -> ujson.Num(0.0) // MAE when using similarities of 1 between all users
),
"P.2" -> ujson.Obj(
"1.AdjustedCosineUser1User2" -> ujson.Num(0.0), // Similarity between user 1 and user 2 (adjusted Cosine)
"2.PredUser1Item1" -> ujson.Num(0.0), // Prediction item 1 for user 1 (adjusted cosine)
"3.AdjustedCosineMAE" -> ujson.Num(0.0) // MAE when using adjusted cosine similarity
),
"P.3" -> ujson.Obj(
"1.JaccardUser1User2" -> ujson.Num(0.0), // Similarity between user 1 and user 2 (jaccard similarity)
"2.PredUser1Item1" -> ujson.Num(0.0), // Prediction item 1 for user 1 (jaccard)
"3.JaccardPersonalizedMAE" -> ujson.Num(0.0) // MAE when using jaccard similarity
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.close()
}
package predict
import org.rogach.scallop._
import org.apache.spark.rdd.RDD
import ujson._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.math
import shared.predictions._
class kNNConf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val separator = opt[String](default=Some("\t"))
val num_measurements = opt[Int](default=Some(0))
val json = opt[String]()
verify()
}
object kNN extends App {
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
println("")
println("******************************************************")
var conf = new PersonalizedConf(args)
println("Loading training data from: " + conf.train())
val train = load(spark, conf.train(), conf.separator()).collect()
println("Loading test data from: " + conf.test())
val test = load(spark, conf.test(), conf.separator()).collect()
val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
Thread.sleep(1000) // Do everything here from train and test
42 // Output answer as last value
}))
val timings = measurements.map(t => t._2) // Retrieve the timing measurements
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"1.Train" -> conf.train(),
"2.Test" -> conf.test(),
"3.Measurements" -> conf.num_measurements()
),
"N.1" -> ujson.Obj(
"1.k10u1v1" -> ujson.Num(0.0), // Similarity between user 1 and user 1 (k=10)
"2.k10u1v864" -> ujson.Num(0.0), // Similarity between user 1 and user 864 (k=10)
"3.k10u1v886" -> ujson.Num(0.0), // Similarity between user 1 and user 886 (k=10)
"4.PredUser1Item1" -> ujson.Num(0.0) // Prediction of item 1 for user 1 (k=10)
),
"N.2" -> ujson.Obj(
"1.kNN-Mae" -> List(10,30,50,100,200,300,400,800,943).map(k =>
List(
k,
0.0 // Compute MAE
)
).toList
),
"N.3" -> ujson.Obj(
"1.kNN" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)),
"stddev (ms)" -> ujson.Num(std(timings))
)
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.close()
}
package recommend
import org.rogach.scallop._
import org.apache.spark.rdd.RDD
import ujson._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import shared.predictions._
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val data = opt[String](required = true)
val personal = opt[String](required = true)
val separator = opt[String](default = Some("\t"))
val json = opt[String]()
verify()
}
object Recommender extends App {
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
println("")
println("******************************************************")
var conf = new Conf(args)
println("Loading data from: " + conf.data())
val data = load(spark, conf.data(), conf.separator()).collect()
assert(data.length == 100000, "Invalid data")
println("Loading personal data from: " + conf.personal())
val personalFile = spark.sparkContext.textFile(conf.personal())
val personal = personalFile.map(l => {
val cols = l.split(",").map(_.trim)
if (cols(0) == "id")
Rating(944,0,0.0)
else
if (cols.length < 3)
Rating(944, cols(0).toInt, 0.0)
else
Rating(944, cols(0).toInt, cols(2).toDouble)
}).filter(r => r.rating != 0).collect()
val movieNames = personalFile.map(l => {
val cols = l.split(",").map(_.trim)
if (cols(0) == "id") (0, "header")
else (cols(0).toInt, cols(1).toString)
}).collect().toMap
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"data" -> conf.data(),
"personal" -> conf.personal()
),
"R.1" -> ujson.Obj(
"PredUser1Item1" -> ujson.Num(0.0) // Prediction for user 1 of item 1
),
// IMPORTANT: To break ties and ensure reproducibility of results,
// please report the top-3 recommendations that have the smallest
// movie identifier.
"R.2" -> List((254, 0.0), (338, 0.0), (615, 0.0)).map(x => ujson.Arr(x._1, movieNames(x._1), x._2))
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.close()
}
package shared
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
package object predictions
{
// ------------------------ For template
case class Rating(user: Int, item: Int, rating: Double)
def timingInMs(f : ()=>Double ) : (Double, Double) = {
......@@ -11,36 +18,81 @@ package object predictions
return (output, (end-start)/1000000.0)
}
def toInt(s: String): Option[Int] = {
try {
Some(s.toInt)
} catch {
case e: Exception => None
}
}
def mean(s :Seq[Double]): Double = if (s.size > 0) s.reduce(_+_) / s.length else 0.0
def std(s :Seq[Double]): Double = {
if (s.size == 0) 0.0
else {
else {
val m = mean(s)
scala.math.sqrt(s.map(x => scala.math.pow(m-x, 2)).sum / s.length.toDouble)
scala.math.sqrt(s.map(x => scala.math.pow(m-x, 2)).sum / s.length.toDouble)
}
}
def toInt(s: String): Option[Int] = {
try {
Some(s.toInt)
} catch {
case e: Exception => None
def load(path : String, sep : String, nbUsers : Int, nbMovies : Int) : CSCMatrix[Double] = {
val file = Source.fromFile(path)
val builder = new CSCMatrix.Builder[Double](rows=nbUsers, cols=nbMovies)
for (line <- file.getLines) {
val cols = line.split(sep).map(_.trim)
toInt(cols(0)) match {
case Some(_) => builder.add(cols(0).toInt-1, cols(1).toInt-1, cols(2).toDouble)
case None => None
}
}
file.close
builder.result()
}
def loadSpark(sc : org.apache.spark.SparkContext, path : String, sep : String, nbUsers : Int, nbMovies : Int) : CSCMatrix[Double] = {
val file = sc.textFile(path)
val ratings = file
.map(l => {
val cols = l.split(sep).map(_.trim)
toInt(cols(0)) match {
case Some(_) => Some(((cols(0).toInt-1, cols(1).toInt-1), cols(2).toDouble))
case None => None
}
})
.filter({ case Some(_) => true
case None => false })
.map({ case Some(x) => x
case None => ((-1, -1), -1) }).collect()
val builder = new CSCMatrix.Builder[Double](rows=nbUsers, cols=nbMovies)
for ((k,v) <- ratings) {
v match {
case d: Double => {
val u = k._1
val i = k._2
builder.add(u, i, d)
}
}
}
return builder.result
}
def load(spark : org.apache.spark.sql.SparkSession, path : String, sep : String) : org.apache.spark.rdd.RDD[Rating] = {
val file = spark.sparkContext.textFile(path)
return file
.map(l => {
val cols = l.split(sep).map(_.trim)
toInt(cols(0)) match {
case Some(_) => Some(Rating(cols(0).toInt, cols(1).toInt, cols(2).toDouble))
case None => None
}
})
.filter({ case Some(_) => true
case None => false })
.map({ case Some(x) => x
case None => Rating(-1, -1, -1)})
def partitionUsers (nbUsers : Int, nbPartitions : Int, replication : Int) : Seq[Set[Int]] = {
val r = new scala.util.Random(1337)
val bins : Map[Int, collection.mutable.ListBuffer[Int]] = (0 to (nbPartitions-1))
.map(p => (p -> collection.mutable.ListBuffer[Int]())).toMap
(0 to (nbUsers-1)).foreach(u => {
val assignedBins = r.shuffle(0 to (nbPartitions-1)).take(replication)
for (b <- assignedBins) {
bins(b) += u
}
})
bins.values.toSeq.map(_.toSet)
}
}
......@@ -3,14 +3,12 @@ package test
import org.scalatest._
import funsuite._
import test.optimizing._
import test.distributed._
import test.predict._
class AllTests extends Sequential(
new test.predict.BaselineTests,
new test.distributed.DistributedBaselineTests,
new test.predict.PersonalizedTests,
new test.predict.kNNTests,
new test.recommend.RecommenderTests
new OptimizingTests,
new ExactTests,
new ApproximateTests
)
package test.predict
import org.scalatest._
import funsuite._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
package test.distributed
import breeze.linalg._
import breeze.numerics._
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.BeforeAndAfterAll
import shared.predictions._
import tests.shared.helpers._
import ujson._
class PersonalizedTests extends AnyFunSuite with BeforeAndAfterAll {
import test.shared.helpers._
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
class ApproximateTests extends AnyFunSuite with BeforeAndAfterAll {
val separator = "\t"
var spark : org.apache.spark.sql.SparkSession = _
val train2Path = "data/ml-100k/u2.base"
val test2Path = "data/ml-100k/u2.test"
var train2 : Array[shared.predictions.Rating] = null
var test2 : Array[shared.predictions.Rating] = null
var train2 : CSCMatrix[Double] = null
var test2 : CSCMatrix[Double] = null
var sc : SparkContext = null
override def beforeAll {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
// For these questions, train and test are collected in a scala Array
// to not depend on Spark
train2 = load(spark, train2Path, separator).collect()
test2 = load(spark, test2Path, separator).collect()
}
train2 = load(train2Path, separator, 943, 1682)
test2 = load(test2Path, separator, 943, 1682)
// All the functions definitions for the tests below (and the tests in other suites)
// should be in a single library, 'src/main/scala/shared/predictions.scala'.
val spark = SparkSession.builder().master("local[2]").getOrCreate();
spark.sparkContext.setLogLevel("ERROR")
sc = spark.sparkContext
}
// Provide tests to show how to call your code to do the following tasks.
// Ensure you use the same function calls to produce the JSON outputs in
// src/main/scala/predict/Baseline.scala.
// the corresponding application.
// Add assertions with the answer you expect from your code, up to the 4th
// decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
test("Test uniform unary similarities") {
// Create predictor with uniform similarities
// Compute personalized prediction for user 1 on item 1
assert(within(1.0, 0.0, 0.0001))
test("Approximate kNN predictor with 10 partitions and replication of 2") {
var partitionedUsers : Seq[Set[Int]] = partitionUsers(
943,
10,
2
)
// MAE
// Similarity between user 1 and itself
assert(within(1.0, 0.0, 0.0001))
}
test("Test ajusted cosine similarity") {
// Create predictor with adjusted cosine similarities
// Similarity between user 1 and user 2
// Similarity between user 1 and 864
assert(within(1.0, 0.0, 0.0001))
// Compute personalized prediction for user 1 on item 1
// Similarity between user 1 and 344
assert(within(1.0, 0.0, 0.0001))
// MAE
// Similarity between user 1 and 16
assert(within(1.0, 0.0, 0.0001))
}
test("Test jaccard similarity") {
// Create predictor with jaccard similarities
// Similarity between user 1 and user 2
// Similarity between user 1 and 334
assert(within(1.0, 0.0, 0.0001))
// Compute personalized prediction for user 1 on item 1
// Similarity between user 1 and 2
assert(within(1.0, 0.0, 0.0001))
// MAE
// MAE on test
assert(within(1.0, 0.0, 0.0001))
}
}
}