Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/cs-449-sds-public/project/cs449-template-m2-2022
  • hlanfran/cs449-template-m2-2022
2 results
Show changes
Commits on Source (27)
Showing
with 745 additions and 601 deletions
File deleted
\documentclass{article}
\usepackage{hyperref}
\usepackage{algorithm}
\usepackage{algpseudocode}
\usepackage{ dsfont }
\usepackage{amsmath}
\usepackage{filemod}
\usepackage{ulem}
\usepackage{graphicx}
\usepackage{todonotes}
\input{Milestone-2-questions.sty}
% If you use BibTeX in apalike style, activate the following line:
\bibliographystyle{acm}
\title{CS-449 Project Milestone 2: Optimizing, Scaling, and Economics}
\author{
\textbf{Name}: xxx\\
\textbf{Sciper}: xxx\\
\textbf{Email:} xxx\\
\textbf{Name}: xxx\\
\textbf{Sciper}: xxx\\
\textbf{Email:} xxx\\
}
\begin{document}
\maketitle
\section{Optimizing with Breeze, a Linear Algebra Library}
\label{section:optimization}
\begin{itemize}
\item[\textbf{BR.1}] \BROne
\item [\textbf{BR.2}] \BRTwoOne
\BRTwoTwo
\BRTwoThree
\end{itemize}
\section{Parallel k-NN Computations with Replicated Ratings}
\begin{enumerate}
\item [\textbf{EK.1}] \EKOne
\item [\textbf{EK.2}] \EKTwo
\end{enumerate}
\section{Distributed Approximate k-NN}
\begin{enumerate}
\item [\textbf{AK.1}] \AKOne
\item [\textbf{AK.2}] \AKTwo
\item [\textbf{AK.3}] \AKThree
\end{enumerate}
\section{Economics}
\textit{Implement the computations for the different answers in the Economics.scala file. You don't need to provide unit tests for this question, nor written answers for these questions in your report.}
\end{document}
\ProvidesPackage{m2questions}[2022/03/11 v1.0]
% Breeze
\newcommand{\BROne}{
Reimplement the kNN predictor of Milestone 1 using the Breeze library and without using Spark. Using $k=10$ and \texttt{data/ml-100k/u2.base} for training, output the similarities between: (1) user $1$ and itself; (2) user $1$ and user $864$; (3) user $1$ and user $886$. Still using $k=10$, output the prediction for user 1 and item 1 ($p_{1,1}$), the prediction for user 327 and item 2 ($p_{327,2}$), and make sure that you obtain an MAE of $0.8287 \pm 0.0001$ on \texttt{data/ml-100k/u2.test}.
}
\newcommand{\BRTwoOne}{
Try making your implementation as fast as possible, both for computing all k-nearest neighbours and for computing the predictions and MAE on a test set. Your implementation should be based around \texttt{CSCMatrix}, but may involve conversions for individual operations. We will test your implementation on a secret test set. The teams with both a correct answer and the shortest time will receive more points.
}
\newcommand{\BRTwoTwo}{
Using $k=300$, compare the time for predicting all values and computing the MAE of \texttt{ml-100k/u2.test} to the one you obtained in Milestone 1. What is the speedup of your new implementation (as a ratio of $\frac{\textit{average time}_{old}}{\textit{average time}_{new}}$)? Use the same machine to measure the time for both versions and provide the answer in your report.
}
\newcommand{\BRTwoThree}{
Also ensure your implementation works with \texttt{data/ml-1m/rb.train} and \texttt{data/ml-1m/rb.test} since you will reuse it in the next questions.
}
% Parallel Exact Knn
\newcommand{\EKOne}{
Test your parallel implementation of k-NN for correctness with two workers. Using $k=10$ and \texttt{data/ml-100k/u2.base} for training, output the similarities between: (1) user $1$ and itself; (2) user $1$ and user $864$; (3) user $1$ and user $886$. Still using $k=10$, output the prediction for user 1 and item 1 ($p_{1,1}$), the prediction for user 327 and item 2 ($p_{327,2}$), and make sure that you obtain an MAE of $0.8287 \pm 0.0001$ on \texttt{data/ml-100k/u2.test}
}
\newcommand{\EKTwo}{
Measure and report the combined \textit{k-NN} and \textit{prediction} time when using 1, 2, 4 workers, $k=300$, and \texttt{ml-1m/rb.train} for training and \texttt{ml-1m/rb.test} for test, on the cluster (or a machine with at least 4 physical cores). Perform 3 measurements for each experiment and report the average and standard-deviation total time, including training, making predictions, and computing the MAE. Do you observe a speedup? Does this speedup grow linearly with the number of executors, i.e. is the running time $X$ times faster when using $X$ executors compared to using a single executor? Answer both questions in your report.
}
% Approximate Knn
\newcommand{\AKOne}{
Implement the approximate k-NN using your previous breeze implementation and Spark's RDDs. Using the partitioner of the template with 10 partitions and 2 replications, $k=10$, and \texttt{data/ml-100k/u2.base} for training, output the similarities of the approximate k-NN between user $1$ and the following users: $1,864,344,16,334,2$.
}
\newcommand{\AKTwo}{
Vary the number of partitions in which a given user appears. For the \texttt{data/ml-100k/u2.base} training set, partitioned equally between 10 workers, report the relationship between the level of replication (1,2,3,4,6,8) and the MAE you obtain on the \texttt{data/ml-100k/u2.test} test set. What is the minimum level of replication such that the MAE is still lower than the baseline predictor of Milestone 1 (MAE of 0.7604), when using $k=300$? Does this reduce the number of similarity computations compared to an exact k-NN? What is the ratio? Answer both questions in your report.
}
\newcommand{\AKThree}{
Measure and report the time required by your approximate \textit{k-NN} implementation, including both training on \texttt{data/ml-1m/rb.train} and computing the MAE on the test set \texttt{data/ml-1m/rb.test}, using $k=300$ on 8 partitions with a replication factor of 1 when using 1, 2, 4 workers. Perform each experiment 3 times and report the average and standard-deviation. Do you observe a speedup compared to the parallel (exact) k-NN with replicated ratings for the same number of workers?
}
% Economics
\newcommand{\EOne}{
What is the minimum number of days of renting to make buying the ICC.M7 less expensive, excluding any operating costs such as electricity and maintenance? Round up to the nearest integer.
}
\newcommand{\ETwoOne}{
After how many days of renting a container, is the cost higher than buying and running 4 Raspberry Pis? (1) Assuming optimistically no maintenance at minimum power usage for RPis, and (2) no maintenance at maximum power usage for RPis, to obtain a likely range. (Round up to the nearest integer in each case).
}
\newcommand{\ETwoTwo}{
Assume a single processor for the container and an equivalent amount of total RAM as the 4 Raspberry Pis. Also provide unrounded intermediary results for (1) Container Daily Cost, (2) 4 RPis (Idle) Daily Electricity Cost, (3) 4 RPis (Computing) Daily Electricity Cost.
}
\newcommand{\EThree}{
For the same buying price as an ICC.M7, how many Raspberry Pis can you get (floor the result to remove the decimal)? Assuming perfect scaling, would you obtain a larger overall throughput and RAM from these? If so, by how much? Compute the ratios using the previous floored number of RPis, but do not round the final results.
}
\ No newline at end of file
File added
# Milestone Description # Milestone Description
[To Be Released](./Milestone-1.pdf) [Milestone-2.pdf](./Milestone-2.pdf)
Note: Section 'Updates' lists the updates since the original release of the Milestone.. Note: Section 'Updates' lists the updates since the original release of the Milestone.
Mu has prepared a report template for your convenience here: [Report Template](./Milestone-2-QA-template.tex).
# Dependencies # Dependencies
...@@ -13,6 +15,11 @@ Note: Section 'Updates' lists the updates since the original release of the Mile ...@@ -13,6 +15,11 @@ Note: Section 'Updates' lists the updates since the original release of the Mile
Should be available by default on ````iccluster028.iccluster.epfl.ch````. Otherwise, refer to each project installation instructions. Prefer working locally on your own machine, you will have less interference in your measurements from other students. Should be available by default on ````iccluster028.iccluster.epfl.ch````. Otherwise, refer to each project installation instructions. Prefer working locally on your own machine, you will have less interference in your measurements from other students.
If you work on ````iccluster028.iccluster.epfl.ch````, you need to modify the PATH by default by adding the following line in ````~/.bashrc````:
````
export PATH=$PATH:/opt/sbt/sbt/bin
````
If you have multiple installations of openjdk, you need to specify the one to use as JAVA_HOME, e.g. on OSX with If you have multiple installations of openjdk, you need to specify the one to use as JAVA_HOME, e.g. on OSX with
openjdk@8 installed through Homebrew, you would do: openjdk@8 installed through Homebrew, you would do:
```` ````
...@@ -21,33 +28,16 @@ openjdk@8 installed through Homebrew, you would do: ...@@ -21,33 +28,16 @@ openjdk@8 installed through Homebrew, you would do:
# Dataset # Dataset
Download [data.zip](https://gitlab.epfl.ch/sacs/cs-449-sds-public/project/dataset/-/raw/main/data.zip). Download [data-m2.zip](https://gitlab.epfl.ch/sacs/cs-449-sds-public/project/dataset/-/raw/main/data-m2.zip).
Unzip: Unzip:
```` ````
> unzip data.zip > unzip data-m2.zip
```` ````
It should unzip into ````data/```` by default. If not, manually move ````ml-100k```` and ````ml-25m```` into ````data/````. It should unzip into ````data/```` by default. If not, manually move ````ml-100k```` and ````ml-1m```` into ````data/````.
# Personal Ratings
Additional personal ratings are provided in the 'data/personal.csv' file in a
csv format with ````<movie>, <movie title>, <rating>```` to test your recommender.
You can copy this file and change the ratings, with values [1,5] to obtain
references more to your liking!
Entries with no rating are in the following format:
````
1,Toy Story (1995),
````
Entries with ratings are in the following format:
````
1,Toy Story (1995),5
````
# Repository Structure # Repository Structure
````src/main/scala/shared/predictions.scala````: ````src/main/scala/shared/predictions.scala````:
...@@ -56,11 +46,10 @@ This code should then be used in the following applications and tests. ...@@ -56,11 +46,10 @@ This code should then be used in the following applications and tests.
## Applications ## Applications
````src/main/scala/predict/Baseline.scala````: Output answers to questions **B.X**. 1. ````src/main/scala/optimizing/Optimizing.scala````: Output answers to questions **BR.X**.
````src/main/scala/distributed/DistributedBaseline.scala````: Output answers to questions **D.X**. 2. ````src/main/scala/distributed/Exact.scala````: Output answers to questions **EK.X**.
````src/main/scala/predict/Personalized.scala````: Output answers to questions questions **P.X**. 3. ````src/main/scala/distributed/Approximate.scala````: Output answers to questions **AK.X**.
````src/main/scala/predict/kNN.scala````: Output answers to questions questions **N.X**. 4. ````src/main/scala/economics/Economics.scala````: Output answers to questions **E.X**
````src/main/scala/recommend/Recommender.scala````: Output answers to questions questions **N.X**.
Applications are separate from tests to make it easier to test with different Applications are separate from tests to make it easier to test with different
inputs and permit outputting your answers and timings in JSON format for easier inputs and permit outputting your answers and timings in JSON format for easier
...@@ -68,14 +57,12 @@ grading. ...@@ -68,14 +57,12 @@ grading.
## Unit Tests ## Unit Tests
Corresponding unit tests for each application: Corresponding unit tests for each application (except Economics.scala):
```` ````
src/test/scala/predict/BaselineTests.scala src/test/scala/optimizing/OptimizingTests.scala
src/test/scala/distributed/DistributedBaselineTests.scala src/test/scala/distributed/ExactTests.scala
src/test/scala/predict/PersonalizedTests.scala src/test/scala/distributed/ApproximateTests.scala
src/test/scala/predict/kNNTests.scala
src/test/scala/recommend/RecommenderTests.scala
```` ````
Your tests should demonstrate how to call your code to obtain the answers of Your tests should demonstrate how to call your code to obtain the answers of
...@@ -88,48 +75,36 @@ clear and regular structure to check its correctness. ...@@ -88,48 +75,36 @@ clear and regular structure to check its correctness.
## Execute unit tests ## Execute unit tests
````sbt "testOnly test.AllTests"```` ````
sbt "testOnly test.AllTests"
````
You should fill all tests and ensure they all succeed prior to submission. You should fill all tests and ensure they all succeed prior to submission.
## Run applications ## Run applications
### Baseline ### Optimizing
On ````ml-100k````:
````
sbt "runMain predict.Baseline --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json baseline-100k.json"
````
On ````ml-25m````:
```` ````
sbt "runMain predict.Baseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --json baseline-25m.json" sbt "runMain scaling.Optimizing --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json optimizing-100k.json --master local[1] --users 943 --movies 1682"
```` ````
### Distributed Baseline ### Parallel Exact KNN
```` ````
sbt "runMain distributed.DistributedBaseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --separator , --json distributed-25m-4.json --master local[4]" sbt "runMain distributed.Exact --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json exact-100k-4.json --k 10 --master local[4] --users 943 --movies 1682"
```` ````
You can vary the number of executors used locally by using ````local[X]```` with X being an integer representing the number of cores you want to use locally. ### Approximate KNN
### Personalized
```` ````
sbt "runMain predict.Personalized --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json personalized-100k.json" sbt "runMain distributed.Approximate --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json approximate-100k-4-k10-r2.json --k 10 --master local[4] --users 943 --movies 1682 --partitions 10 --replication 2"
```` ````
### kNN ### Economics
```` ````
sbt "runMain predict.kNN --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json knn-100k.json" sbt "runMain economics.Economics --json economics.json"
````
### Recommender
````
sbt "runMain recommend.Recommender --data data/ml-100k/u.data --personal data/personal.csv --json recommender-100k.json"
```` ````
## Time applications ## Time applications
...@@ -147,7 +122,7 @@ for other students. ...@@ -147,7 +122,7 @@ for other students.
````sbt clean````: clean up temporary files and previous assembly packages. ````sbt clean````: clean up temporary files and previous assembly packages.
````sbt assembly````: create a new jar ````sbt assembly````: create a new jar
````target/scala-2.11/m1_yourid-assembly-1.0.jar```` that can be used with ````target/scala-2.11/m2_yourid-assembly-1.0.jar```` that can be used with
````spark-submit````. ````spark-submit````.
Prefer packaging your application locally and upload the tar archive of your application Prefer packaging your application locally and upload the tar archive of your application
...@@ -156,26 +131,33 @@ before running on cluster. ...@@ -156,26 +131,33 @@ before running on cluster.
### Upload jar on Cluster ### Upload jar on Cluster
```` ````
scp target/scala-2.11/m1_yourid-assembly-1.0.jar <username>@iccluster028.iccluster.epfl.ch:~ scp target/scala-2.11/m2_yourid-assembly-1.0.jar <username>@iccluster028.iccluster.epfl.ch:~
```` ````
### Run on Cluster ### Run on Cluster
See [config.sh](./config.sh) for HDFS paths to pre-uploaded train and test datasets to replace TRAIN and TEST, like in the example commands below:
#### When using ML-100k
````
spark-submit --class distributed.Exact --master yarn --conf "spark.dynamicAllocation.enabled=false" --num-executors 1 m2_yourid-assembly-1.0.jar --json exact-100k-1.json --train $ML100Ku2base --test $ML100Ku2test
```` ````
spark-submit --class distributed.DistributedBaseline --master yarn --num-executors 1 target/scala-2.11/m1_yourid-assembly-1.0.jar --train TRAIN --test TEST --separator , --json distributed-25m-1.json --num_measurements 1 #### When using ML-1m
````
spark-submit --class distributed.Exact --master yarn --conf "spark.dynamicAllocation.enabled=false" --num-executors 1 m2_yourid-assembly-1.0.jar --json exact-1m-1.json --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --k 300 --users 6040 --movies 3952
```` ````
See [config.sh](./config.sh) for HDFS paths to pre-uploaded TRAIN and TEST datasets. You can vary the number of executors with ````--num-executors X````, and number of measurements with ````--num_measurements Y````. In order to keep results obtained with different parameters in different .json files, simply modify the corresponding parameter ("--json") passed and the values. For instance, with ```--num-executors 4``` : ```--json exact-1m-4.json```.
Note that when changing from ML-100k to ML-1M, the parameter ```--separator ::``` should be added, and the number of users and movies should be modified.
## Grading scripts ## Grading scripts
We will use the following scripts to grade your submission: We will use the following scripts to grade your submission:
1. ````./test.sh````: Run all unit tests. 1. ````./test.sh````: Run all unit tests.
2. ````./run.sh````: Run all applications without timing measurements. 2. ````./run.sh````: Run all applications without timing measurements.
3. ````./timeTrials.sh````: Time applications to determine which student implementations are fastest. 3. ````./time.sh````: Run all timing measurements.
4. ````./timeOthers.sh````: Time applications to check report answers against independent measurements.
4. ````./timeCluster.sh````: Package and time applications on Spark Cluster.
All scripts will produce execution logs in the ````logs```` All scripts will produce execution logs in the ````logs````
directory, including answers produced in the JSON format. Logs directories are directory, including answers produced in the JSON format. Logs directories are
...@@ -183,26 +165,15 @@ in the format ````logs/<scriptname>-<datetime>-<machine>/```` and include at ...@@ -183,26 +165,15 @@ in the format ````logs/<scriptname>-<datetime>-<machine>/```` and include at
least an execution log ````log.txt```` as well as possible JSON outputs from least an execution log ````log.txt```` as well as possible JSON outputs from
applications. applications.
Ensure all scripts run correctly locally before submitting. Avoid running Ensure all scripts run correctly locally before submitting.
````timeCluster.sh```` on iccluster as the packaging and measurements will
interfere with other students working on their Milestone at the same time. If
````timeCluster.sh```` correctly runs locally on your machine, this should be
sufficient.
## Package for submission ## Submission
Steps: Steps:
1. Update the ````name````, ````maintainer```` fields of ````build.sbt````, with the correct Milestone number, your ID, and your email. 1. Create a zip archive with all your code within ````src/````, as well as your report: ````zip sciper1-sciper2.zip -r src/ report.pdf````
2. Ensure you only used the dependencies listed in ````build.sbt```` in this template, and did not add any other. 2. Submit ````sciper1-sciper2.zip```` the TA for grading on
3. Remove ````project/project````, ````project/target````, and ````target/````. https://cs449-submissions.epfl.ch:8083/m2 using the passcode you have previously received by email.
4. Test that all previous commands for generating statistics, predictions, and recommendations correctly produce a JSON file (after downloading/reinstalling dependencies).
5. Remove the ml-100k dataset (````data/ml-100k.zip````, and ````data/ml-100k````), as well as the````project/project````, ````project/target````, and ````target/````.
6. Remove the ````.git```` repository information.
7. Add your report and any other necessary files listed in the Milestone description (see ````Deliverables````).
8. Zip the archive.
9. Submit to the TA for grading.
# References # References
......
name := "m1_yourid" name := "m2_yourid"
version := "1.0" version := "1.0"
libraryDependencies += "org.rogach" %% "scallop" % "4.0.2" libraryDependencies += "org.rogach" %% "scallop" % "4.0.2"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.7" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.7"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.7" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.7"
libraryDependencies += "org.scalanlp" %% "breeze" % "0.13.2"
libraryDependencies += "org.scalanlp" %% "breeze-natives" % "0.13.2"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.0" % Test libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.0" % Test
libraryDependencies += "com.lihaoyi" %% "ujson" % "1.5.0" libraryDependencies += "com.lihaoyi" %% "ujson" % "1.5.0"
......
if [ $(hostname) == 'iccluster028' ]; if [ $(hostname) == 'iccluster028' ];
then then
export ML100Ku2base=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-100k/u2.base; ICCLUSTER=hdfs://iccluster028.iccluster.epfl.ch:8020
export ML100Ku2test=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-100k/u2.test; export ML100Ku2base=$ICCLUSTER/cs449/data/ml-100k/u2.base;
export ML100Kudata=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-100k/u.data; export ML100Ku2test=$ICCLUSTER/cs449/data/ml-100k/u2.test;
export ML25Mr2train=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-25m/r2.train; export ML100Kudata=$ICCLUSTER/cs449/data/ml-100k/u.data;
export ML25Mr2test=hdfs://iccluster028.iccluster.epfl.ch:8020/user/lavoie/cs449/data/ml-25m/r2.test; export ML1Mrbtrain=$ICCLUSTER/cs449/data/ml-1m/rb.train;
export ML1Mrbtest=$ICCLUSTER/cs449/data/ml-1m/rb.test;
export SPARKMASTER='yarn' export SPARKMASTER='yarn'
else else
export ML100Ku2base=data/ml-100k/u2.base; export ML100Ku2base=data/ml-100k/u2.base;
export ML100Ku2test=data/ml-100k/u2.test; export ML100Ku2test=data/ml-100k/u2.test;
export ML100Kudata=data/ml-100k/u.data; export ML100Kudata=data/ml-100k/u.data;
export ML25Mr2train=data/ml-25m/r2.train; export ML1Mrbtrain=data/ml-1m/rb.train;
export ML25Mr2test=data/ml-25m/r2.test; export ML1Mrbtest=data/ml-1m/rb.test;
export SPARKMASTER='local[4]' export SPARKMASTER='local[4]'
fi; fi;
...@@ -8,14 +8,16 @@ RUN=./logs/run-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname) ...@@ -8,14 +8,16 @@ RUN=./logs/run-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
mkdir -p $RUN mkdir -p $RUN
LOGS=$RUN/log.txt LOGS=$RUN/log.txt
source ./config.sh source ./config.sh
echo "------------------- BASELINE ---------------------" >> $LOGS echo "------------------- OPTIMIZING ---------------------" >> $LOGS
sbt "runMain predict.Baseline --train $ML100Ku2base --test $ML100Ku2test --json $RUN/baseline-100k.json" 2>&1 >>$LOGS sbt "runMain scaling.Optimizing --train $ML100Ku2base --test $ML100Ku2test --json $RUN/optimizing-100k.json --users 943 --movies 1682 --master local[1]" 2>&1 >>$LOGS
echo "------------------- DISTRIBUTED ---------------------" >> $LOGS echo "------------------- DISTRIBUTED EXACT ---------------------" >> $LOGS
sbt "runMain predict.Baseline --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/baseline-25m.json" 2>&1 >>$LOGS sbt "runMain distributed.Exact --train $ML100Ku2base --test $ML100Ku2test --json $RUN/exact-100k-4.json --k 10 --master local[4] --users 943 --movies 1682" 2>&1 >>$LOGS
sbt "runMain distributed.DistributedBaseline --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/distributed-25m-4.json --master $SPARKMASTER" 2>&1 >>$LOGS sbt "runMain distributed.Exact --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/exact-1m-4.json --k 300 --master local[4] --users 6040 --movies 3952" 2>&1 >>$LOGS
echo "------------------- PERSONALIZED --------------------" >> $LOGS echo "------------------- DISTRIBUTED APPROXIMATE ---------------------" >> $LOGS
sbt "runMain predict.Personalized --train $ML100Ku2base --test $ML100Ku2test --json $RUN/personalized-100k.json" 2>&1 >>$LOGS sbt "runMain distributed.Approximate --train $ML100Ku2base --test $ML100Ku2test --json $RUN/approximate-100k-4-k10-r2.json --k 10 --master local[4] --users 943 --movies 1682 --partitions 10 --replication 2" 2>&1 >>$LOGS;
echo "------------------- KNN -----------------------------" >> $LOGS for R in 1 2 3 4 6 8; do
sbt "runMain predict.kNN --train $ML100Ku2base --test $ML100Ku2test --json $RUN/knn-100k.json" 2>&1 >>$LOGS sbt "runMain distributed.Approximate --train $ML100Ku2base --test $ML100Ku2test --json $RUN/approximate-100k-4-k300-r$R.json --k 300 --master local[4] --users 943 --movies 1682 --partitions 10 --replication $R" 2>&1 >>$LOGS;
echo "------------------- RECOMMEND -----------------------" >> $LOGS done
sbt "runMain recommend.Recommender --data $ML100Kudata --personal data/personal.csv --json $RUN/recommender-100k.json" 2>&1 >>$LOGS sbt "runMain distributed.Approximate --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/approximate-1m-4.json --k 300 --master local[4] --users 6040 --movies 3952 --partitions 8 --replication 1" 2>&1 >>$LOGS
echo "------------------- ECONOMICS -----------------------------------" >> $LOGS
sbt "runMain economics.Economics --json $RUN/economics.json" 2>&1 >>$LOGS
import org.rogach.scallop._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import ujson._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import shared.predictions._
package distributed {
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val k = opt[Int]()
val json = opt[String]()
val users = opt[Int]()
val movies = opt[Int]()
val separator = opt[String](default=Some("\t"))
val replication = opt[Int](default=Some(1))
val partitions = opt[Int](default=Some(1))
val master = opt[String]()
val num_measurements = opt[Int](default=Some(1))
verify()
}
object Approximate {
def main(args: Array[String]) {
var conf = new Conf(args)
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = conf.master.toOption match {
case None => SparkSession.builder().getOrCreate();
case Some(master) => SparkSession.builder().master(master).getOrCreate();
}
val sc = spark.sparkContext
println("")
println("******************************************************")
// conf object is not serializable, extract values that
// will be serialized with the parallelize implementations
val conf_users = conf.users()
val conf_movies = conf.movies()
val conf_k = conf.k()
println("Loading training data")
val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies())
val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies())
var knn : CSCMatrix[Double] = null
println("Partitioning users")
var partitionedUsers : Seq[Set[Int]] = partitionUsers(
conf.users(),
conf.partitions(),
conf.replication()
)
val measurements = (1 to scala.math.max(1,conf.num_measurements()))
.map(_ => timingInMs( () => {
// Use partitionedUsers here
0.0
}))
val mae = measurements(0)._1
val timings = measurements.map(_._2)
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"train" -> ujson.Str(conf.train()),
"test" -> ujson.Str(conf.test()),
"k" -> ujson.Num(conf.k()),
"users" -> ujson.Num(conf.users()),
"movies" -> ujson.Num(conf.movies()),
"master" -> ujson.Str(sc.getConf.get("spark.master")),
"num-executors" -> ujson.Str(if (sc.getConf.contains("spark.executor.instances"))
sc.getConf.get("spark.executor.instances")
else
""),
"num_measurements" -> ujson.Num(conf.num_measurements()),
"partitions" -> ujson.Num(conf.partitions()),
"replication" -> ujson.Num(conf.replication())
),
"AK.1" -> ujson.Obj(
"knn_u1v1" -> ujson.Num(0.0),
"knn_u1v864" -> ujson.Num(0.0),
"knn_u1v344" -> ujson.Num(0.0),
"knn_u1v16" -> ujson.Num(0.0),
"knn_u1v334" -> ujson.Num(0.0),
"knn_u1v2" -> ujson.Num(0.0)
),
"AK.2" -> ujson.Obj(
"mae" -> ujson.Num(mae)
),
"AK.3" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)),
"stddev (ms)" -> ujson.Num(std(timings))
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.stop()
}
}
}
package distributed
import org.rogach.scallop._
import org.apache.spark.rdd.RDD
import ujson._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.math
import shared.predictions._
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val separator = opt[String](default=Some("\t"))
val master = opt[String](default=Some(""))
val num_measurements = opt[Int](default=Some(0))
val json = opt[String]()
verify()
}
object DistributedBaseline extends App {
var conf = new Conf(args)
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = if (conf.master() != "") {
SparkSession.builder().master(conf.master()).getOrCreate()
} else {
SparkSession.builder().getOrCreate()
}
spark.sparkContext.setLogLevel("ERROR")
println("")
println("******************************************************")
println("Loading training data from: " + conf.train())
val train = load(spark, conf.train(), conf.separator())
println("Loading test data from: " + conf.test())
val test = load(spark, conf.test(), conf.separator())
val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
Thread.sleep(1000) // Do everything here from train and test
42 // Output answer as last value
}))
val timings = measurements.map(t => t._2) // Retrieve the timing measurements
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"1.Train" -> conf.train(),
"2.Test" -> conf.test(),
"3.Master" -> conf.master(),
"4.Measurements" -> conf.num_measurements()
),
"D.1" -> ujson.Obj(
"1.GlobalAvg" -> ujson.Num(0.0), // Datatype of answer: Double
"2.User1Avg" -> ujson.Num(0.0), // Datatype of answer: Double
"3.Item1Avg" -> ujson.Num(0.0), // Datatype of answer: Double
"4.Item1AvgDev" -> ujson.Num(0.0), // Datatype of answer: Double,
"5.PredUser1Item1" -> ujson.Num(0.0), // Datatype of answer: Double
"6.Mae" -> ujson.Num(0.0) // Datatype of answer: Double
),
"D.2" -> ujson.Obj(
"1.DistributedBaseline" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
)
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.close()
}
import org.rogach.scallop._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import ujson._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import shared.predictions._
package distributed {
class ExactConf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val k = opt[Int](default=Some(10))
val json = opt[String]()
val users = opt[Int]()
val movies = opt[Int]()
val separator = opt[String](default=Some("\t"))
val master = opt[String]()
val num_measurements = opt[Int](default=Some(1))
verify()
}
object Exact {
def main(args: Array[String]) {
var conf = new ExactConf(args)
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = conf.master.toOption match {
case None => SparkSession.builder().getOrCreate();
case Some(master) => SparkSession.builder().master(master).getOrCreate();
}
spark.sparkContext.setLogLevel("ERROR")
val sc = spark.sparkContext
println("")
println("******************************************************")
// conf object is not serializable, extract values that
// will be serialized with the parallelize implementations
val conf_users = conf.users()
val conf_movies = conf.movies()
val conf_k = conf.k()
println("Loading training data from: " + conf.train())
val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies())
val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies())
val measurements = (1 to scala.math.max(1,conf.num_measurements())).map(_ => timingInMs( () => {
0.0
}))
val timings = measurements.map(_._2)
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"train" -> ujson.Str(conf.train()),
"test" -> ujson.Str(conf.test()),
"k" -> ujson.Num(conf.k()),
"users" -> ujson.Num(conf.users()),
"movies" -> ujson.Num(conf.movies()),
"master" -> ujson.Str(sc.getConf.get("spark.master")),
"num-executors" -> ujson.Str(if (sc.getConf.contains("spark.executor.instances"))
sc.getConf.get("spark.executor.instances")
else
""),
"num_measurements" -> ujson.Num(conf.num_measurements())
),
"EK.1" -> ujson.Obj(
"1.knn_u1v1" -> ujson.Num(0.0),
"2.knn_u1v864" -> ujson.Num(0.0),
"3.knn_u1v886" -> ujson.Num(0.0),
"4.PredUser1Item1" -> ujson.Num(0.0),
"5.PredUser327Item2" -> ujson.Num(0.0),
"6.Mae" -> ujson.Num(0.0)
),
"EK.2" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.stop()
}
}
}
import org.rogach.scallop._
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import ujson._
package economics {
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val json = opt[String]()
verify()
}
object Economics {
def main(args: Array[String]) {
println("")
println("******************************************************")
var conf = new Conf(args)
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"E.1" -> ujson.Obj(
"MinRentingDays" -> ujson.Num(0.0) // Datatype of answer: Double
),
"E.2" -> ujson.Obj(
"ContainerDailyCost" -> ujson.Num(0.0),
"4RPisDailyCostIdle" -> ujson.Num(0.0),
"4RPisDailyCostComputing" -> ujson.Num(0.0),
"MinRentingDaysIdleRPiPower" -> ujson.Num(0.0),
"MinRentingDaysComputingRPiPower" -> ujson.Num(0.0)
),
"E.3" -> ujson.Obj(
"NbRPisEqBuyingICCM7" -> ujson.Num(0.0),
"RatioRAMRPisVsICCM7" -> ujson.Num(0.0),
"RatioComputeRPisVsICCM7" -> ujson.Num(0.0)
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
}
}
}
import org.rogach.scallop._
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import ujson._
import shared.predictions._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
package scaling {
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val json = opt[String]()
val users = opt[Int]()
val movies = opt[Int]()
val separator = opt[String](default=Some("\t"))
val master = opt[String]()
val num_measurements = opt[Int](default=Some(1))
verify()
}
object Optimizing extends App {
var conf = new Conf(args)
// conf object is not serializable, extract values that
// will be serialized with the parallelize implementations
val conf_users = conf.users()
val conf_movies = conf.movies()
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = conf.master.toOption match {
case None => SparkSession.builder().getOrCreate();
case Some(master) => SparkSession.builder().master(master).getOrCreate();
}
spark.sparkContext.setLogLevel("ERROR")
val sc = spark.sparkContext
println("Loading training data from: " + conf.train())
val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies())
val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies())
val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
0.0
}))
val timings = measurements.map(t => t._2)
val mae = measurements(0)._1
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"train" -> ujson.Str(conf.train()),
"test" -> ujson.Str(conf.test()),
"users" -> ujson.Num(conf.users()),
"movies" -> ujson.Num(conf.movies()),
"master" -> ujson.Str(conf.master()),
"num_measurements" -> ujson.Num(conf.num_measurements())
),
"BR.1" -> ujson.Obj(
"1.k10u1v1" -> ujson.Num(0.0),
"2.k10u1v864" -> ujson.Num(0.0),
"3.k10u1v886" -> ujson.Num(0.0),
"4.PredUser1Item1" -> ujson.Num(0.0),
"5.PredUser327Item2" -> ujson.Num(0.0),
"6.Mae" -> ujson.Num(0.0)
),
"BR.2" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
}
}
package predict
import org.rogach.scallop._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.math
import shared.predictions._
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val separator = opt[String](default=Some("\t"))
val num_measurements = opt[Int](default=Some(0))
val json = opt[String]()
verify()
}
object Baseline extends App {
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
println("")
println("******************************************************")
var conf = new Conf(args)
// For these questions, data is collected in a scala Array
// to not depend on Spark
println("Loading training data from: " + conf.train())
val train = load(spark, conf.train(), conf.separator()).collect()
println("Loading test data from: " + conf.test())
val test = load(spark, conf.test(), conf.separator()).collect()
val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
Thread.sleep(1000) // Do everything here from train and test
42 // Output answer as last value
}))
val timings = measurements.map(t => t._2) // Retrieve the timing measurements
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
var answers = ujson.Obj(
"Meta" -> ujson.Obj(
"1.Train" -> ujson.Str(conf.train()),
"2.Test" -> ujson.Str(conf.test()),
"3.Measurements" -> ujson.Num(conf.num_measurements())
),
"B.1" -> ujson.Obj(
"1.GlobalAvg" -> ujson.Num(0.0), // Datatype of answer: Double
"2.User1Avg" -> ujson.Num(0.0), // Datatype of answer: Double
"3.Item1Avg" -> ujson.Num(0.0), // Datatype of answer: Double
"4.Item1AvgDev" -> ujson.Num(0.0), // Datatype of answer: Double
"5.PredUser1Item1" -> ujson.Num(0.0) // Datatype of answer: Double
),
"B.2" -> ujson.Obj(
"1.GlobalAvgMAE" -> ujson.Num(0.0), // Datatype of answer: Double
"2.UserAvgMAE" -> ujson.Num(0.0), // Datatype of answer: Double
"3.ItemAvgMAE" -> ujson.Num(0.0), // Datatype of answer: Double
"4.BaselineMAE" -> ujson.Num(0.0) // Datatype of answer: Double
),
"B.3" -> ujson.Obj(
"1.GlobalAvg" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
),
"2.UserAvg" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
),
"3.ItemAvg" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
),
"4.Baseline" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
)
)
)
val json = ujson.write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json.toString, jsonFile)
}
}
println("")
spark.close()
}
package predict
import org.rogach.scallop._
import org.apache.spark.rdd.RDD
import ujson._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.math
import shared.predictions._
class PersonalizedConf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val separator = opt[String](default=Some("\t"))
val num_measurements = opt[Int](default=Some(0))
val json = opt[String]()
verify()
}
object Personalized extends App {
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
println("")
println("******************************************************")
var conf = new PersonalizedConf(args)
println("Loading training data from: " + conf.train())
val train = load(spark, conf.train(), conf.separator()).collect()
println("Loading test data from: " + conf.test())
val test = load(spark, conf.test(), conf.separator()).collect()
// Compute here
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"1.Train" -> ujson.Str(conf.train()),
"2.Test" -> ujson.Str(conf.test()),
"3.Measurements" -> ujson.Num(conf.num_measurements())
),
"P.1" -> ujson.Obj(
"1.PredUser1Item1" -> ujson.Num(0.0), // Prediction of item 1 for user 1 (similarity 1 between users)
"2.OnesMAE" -> ujson.Num(0.0) // MAE when using similarities of 1 between all users
),
"P.2" -> ujson.Obj(
"1.AdjustedCosineUser1User2" -> ujson.Num(0.0), // Similarity between user 1 and user 2 (adjusted Cosine)
"2.PredUser1Item1" -> ujson.Num(0.0), // Prediction item 1 for user 1 (adjusted cosine)
"3.AdjustedCosineMAE" -> ujson.Num(0.0) // MAE when using adjusted cosine similarity
),
"P.3" -> ujson.Obj(
"1.JaccardUser1User2" -> ujson.Num(0.0), // Similarity between user 1 and user 2 (jaccard similarity)
"2.PredUser1Item1" -> ujson.Num(0.0), // Prediction item 1 for user 1 (jaccard)
"3.JaccardPersonalizedMAE" -> ujson.Num(0.0) // MAE when using jaccard similarity
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.close()
}
package predict
import org.rogach.scallop._
import org.apache.spark.rdd.RDD
import ujson._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.math
import shared.predictions._
class kNNConf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val separator = opt[String](default=Some("\t"))
val num_measurements = opt[Int](default=Some(0))
val json = opt[String]()
verify()
}
object kNN extends App {
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
println("")
println("******************************************************")
var conf = new PersonalizedConf(args)
println("Loading training data from: " + conf.train())
val train = load(spark, conf.train(), conf.separator()).collect()
println("Loading test data from: " + conf.test())
val test = load(spark, conf.test(), conf.separator()).collect()
val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
Thread.sleep(1000) // Do everything here from train and test
42 // Output answer as last value
}))
val timings = measurements.map(t => t._2) // Retrieve the timing measurements
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"1.Train" -> conf.train(),
"2.Test" -> conf.test(),
"3.Measurements" -> conf.num_measurements()
),
"N.1" -> ujson.Obj(
"1.k10u1v1" -> ujson.Num(0.0), // Similarity between user 1 and user 1 (k=10)
"2.k10u1v864" -> ujson.Num(0.0), // Similarity between user 1 and user 864 (k=10)
"3.k10u1v886" -> ujson.Num(0.0), // Similarity between user 1 and user 886 (k=10)
"4.PredUser1Item1" -> ujson.Num(0.0) // Prediction of item 1 for user 1 (k=10)
),
"N.2" -> ujson.Obj(
"1.kNN-Mae" -> List(10,30,50,100,200,300,400,800,943).map(k =>
List(
k,
0.0 // Compute MAE
)
).toList
),
"N.3" -> ujson.Obj(
"1.kNN" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)),
"stddev (ms)" -> ujson.Num(std(timings))
)
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.close()
}
package recommend
import org.rogach.scallop._
import org.apache.spark.rdd.RDD
import ujson._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import shared.predictions._
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val data = opt[String](required = true)
val personal = opt[String](required = true)
val separator = opt[String](default = Some("\t"))
val json = opt[String]()
verify()
}
object Recommender extends App {
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
println("")
println("******************************************************")
var conf = new Conf(args)
println("Loading data from: " + conf.data())
val data = load(spark, conf.data(), conf.separator()).collect()
assert(data.length == 100000, "Invalid data")
println("Loading personal data from: " + conf.personal())
val personalFile = spark.sparkContext.textFile(conf.personal())
val personal = personalFile.map(l => {
val cols = l.split(",").map(_.trim)
if (cols(0) == "id")
Rating(944,0,0.0)
else
if (cols.length < 3)
Rating(944, cols(0).toInt, 0.0)
else
Rating(944, cols(0).toInt, cols(2).toDouble)
}).filter(r => r.rating != 0).collect()
val movieNames = personalFile.map(l => {
val cols = l.split(",").map(_.trim)
if (cols(0) == "id") (0, "header")
else (cols(0).toInt, cols(1).toString)
}).collect().toMap
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"data" -> conf.data(),
"personal" -> conf.personal()
),
"R.1" -> ujson.Obj(
"PredUser1Item1" -> ujson.Num(0.0) // Prediction for user 1 of item 1
),
// IMPORTANT: To break ties and ensure reproducibility of results,
// please report the top-3 recommendations that have the smallest
// movie identifier.
"R.2" -> List((254, 0.0), (338, 0.0), (615, 0.0)).map(x => ujson.Arr(x._1, movieNames(x._1), x._2))
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.close()
}
package shared package shared
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
package object predictions package object predictions
{ {
// ------------------------ For template
case class Rating(user: Int, item: Int, rating: Double) case class Rating(user: Int, item: Int, rating: Double)
def timingInMs(f : ()=>Double ) : (Double, Double) = { def timingInMs(f : ()=>Double ) : (Double, Double) = {
...@@ -11,36 +18,81 @@ package object predictions ...@@ -11,36 +18,81 @@ package object predictions
return (output, (end-start)/1000000.0) return (output, (end-start)/1000000.0)
} }
def toInt(s: String): Option[Int] = {
try {
Some(s.toInt)
} catch {
case e: Exception => None
}
}
def mean(s :Seq[Double]): Double = if (s.size > 0) s.reduce(_+_) / s.length else 0.0 def mean(s :Seq[Double]): Double = if (s.size > 0) s.reduce(_+_) / s.length else 0.0
def std(s :Seq[Double]): Double = { def std(s :Seq[Double]): Double = {
if (s.size == 0) 0.0 if (s.size == 0) 0.0
else { else {
val m = mean(s) val m = mean(s)
scala.math.sqrt(s.map(x => scala.math.pow(m-x, 2)).sum / s.length.toDouble) scala.math.sqrt(s.map(x => scala.math.pow(m-x, 2)).sum / s.length.toDouble)
} }
} }
def toInt(s: String): Option[Int] = {
try { def load(path : String, sep : String, nbUsers : Int, nbMovies : Int) : CSCMatrix[Double] = {
Some(s.toInt) val file = Source.fromFile(path)
} catch { val builder = new CSCMatrix.Builder[Double](rows=nbUsers, cols=nbMovies)
case e: Exception => None for (line <- file.getLines) {
val cols = line.split(sep).map(_.trim)
toInt(cols(0)) match {
case Some(_) => builder.add(cols(0).toInt-1, cols(1).toInt-1, cols(2).toDouble)
case None => None
}
}
file.close
builder.result()
}
def loadSpark(sc : org.apache.spark.SparkContext, path : String, sep : String, nbUsers : Int, nbMovies : Int) : CSCMatrix[Double] = {
val file = sc.textFile(path)
val ratings = file
.map(l => {
val cols = l.split(sep).map(_.trim)
toInt(cols(0)) match {
case Some(_) => Some(((cols(0).toInt-1, cols(1).toInt-1), cols(2).toDouble))
case None => None
}
})
.filter({ case Some(_) => true
case None => false })
.map({ case Some(x) => x
case None => ((-1, -1), -1) }).collect()
val builder = new CSCMatrix.Builder[Double](rows=nbUsers, cols=nbMovies)
for ((k,v) <- ratings) {
v match {
case d: Double => {
val u = k._1
val i = k._2
builder.add(u, i, d)
}
}
} }
return builder.result
} }
def load(spark : org.apache.spark.sql.SparkSession, path : String, sep : String) : org.apache.spark.rdd.RDD[Rating] = { def partitionUsers (nbUsers : Int, nbPartitions : Int, replication : Int) : Seq[Set[Int]] = {
val file = spark.sparkContext.textFile(path) val r = new scala.util.Random(1337)
return file val bins : Map[Int, collection.mutable.ListBuffer[Int]] = (0 to (nbPartitions-1))
.map(l => { .map(p => (p -> collection.mutable.ListBuffer[Int]())).toMap
val cols = l.split(sep).map(_.trim) (0 to (nbUsers-1)).foreach(u => {
toInt(cols(0)) match { val assignedBins = r.shuffle(0 to (nbPartitions-1)).take(replication)
case Some(_) => Some(Rating(cols(0).toInt, cols(1).toInt, cols(2).toDouble)) for (b <- assignedBins) {
case None => None bins(b) += u
} }
}) })
.filter({ case Some(_) => true bins.values.toSeq.map(_.toSet)
case None => false })
.map({ case Some(x) => x
case None => Rating(-1, -1, -1)})
} }
} }
...@@ -3,14 +3,12 @@ package test ...@@ -3,14 +3,12 @@ package test
import org.scalatest._ import org.scalatest._
import funsuite._ import funsuite._
import test.optimizing._
import test.distributed._ import test.distributed._
import test.predict._
class AllTests extends Sequential( class AllTests extends Sequential(
new test.predict.BaselineTests, new OptimizingTests,
new test.distributed.DistributedBaselineTests, new ExactTests,
new test.predict.PersonalizedTests, new ApproximateTests
new test.predict.kNNTests,
new test.recommend.RecommenderTests
) )
package test.predict package test.distributed
import org.scalatest._
import funsuite._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import breeze.linalg._
import breeze.numerics._
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.BeforeAndAfterAll
import shared.predictions._ import shared.predictions._
import tests.shared.helpers._ import test.shared.helpers._
import ujson._ import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
class PersonalizedTests extends AnyFunSuite with BeforeAndAfterAll {
class ApproximateTests extends AnyFunSuite with BeforeAndAfterAll {
val separator = "\t" val separator = "\t"
var spark : org.apache.spark.sql.SparkSession = _
val train2Path = "data/ml-100k/u2.base" val train2Path = "data/ml-100k/u2.base"
val test2Path = "data/ml-100k/u2.test" val test2Path = "data/ml-100k/u2.test"
var train2 : Array[shared.predictions.Rating] = null var train2 : CSCMatrix[Double] = null
var test2 : Array[shared.predictions.Rating] = null var test2 : CSCMatrix[Double] = null
var sc : SparkContext = null
override def beforeAll { override def beforeAll {
Logger.getLogger("org").setLevel(Level.OFF) train2 = load(train2Path, separator, 943, 1682)
Logger.getLogger("akka").setLevel(Level.OFF) test2 = load(test2Path, separator, 943, 1682)
spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
// For these questions, train and test are collected in a scala Array
// to not depend on Spark
train2 = load(spark, train2Path, separator).collect()
test2 = load(spark, test2Path, separator).collect()
}
// All the functions definitions for the tests below (and the tests in other suites) val spark = SparkSession.builder().master("local[2]").getOrCreate();
// should be in a single library, 'src/main/scala/shared/predictions.scala'. spark.sparkContext.setLogLevel("ERROR")
sc = spark.sparkContext
}
// Provide tests to show how to call your code to do the following tasks. // Provide tests to show how to call your code to do the following tasks.
// Ensure you use the same function calls to produce the JSON outputs in // Ensure you use the same function calls to produce the JSON outputs in
// src/main/scala/predict/Baseline.scala. // the corresponding application.
// Add assertions with the answer you expect from your code, up to the 4th // Add assertions with the answer you expect from your code, up to the 4th
// decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above). // decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
test("Test uniform unary similarities") { test("Approximate kNN predictor with 10 partitions and replication of 2") {
// Create predictor with uniform similarities var partitionedUsers : Seq[Set[Int]] = partitionUsers(
943,
// Compute personalized prediction for user 1 on item 1 10,
assert(within(1.0, 0.0, 0.0001)) 2
)
// MAE // Similarity between user 1 and itself
assert(within(1.0, 0.0, 0.0001)) assert(within(1.0, 0.0, 0.0001))
}
// Similarity between user 1 and 864
test("Test ajusted cosine similarity") {
// Create predictor with adjusted cosine similarities
// Similarity between user 1 and user 2
assert(within(1.0, 0.0, 0.0001)) assert(within(1.0, 0.0, 0.0001))
// Compute personalized prediction for user 1 on item 1 // Similarity between user 1 and 344
assert(within(1.0, 0.0, 0.0001)) assert(within(1.0, 0.0, 0.0001))
// MAE // Similarity between user 1 and 16
assert(within(1.0, 0.0, 0.0001)) assert(within(1.0, 0.0, 0.0001))
}
test("Test jaccard similarity") {
// Create predictor with jaccard similarities
// Similarity between user 1 and user 2 // Similarity between user 1 and 334
assert(within(1.0, 0.0, 0.0001)) assert(within(1.0, 0.0, 0.0001))
// Compute personalized prediction for user 1 on item 1 // Similarity between user 1 and 2
assert(within(1.0, 0.0, 0.0001)) assert(within(1.0, 0.0, 0.0001))
// MAE // MAE on test
assert(within(1.0, 0.0, 0.0001)) assert(within(1.0, 0.0, 0.0001))
} }
} }