Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/cs-449-sds-public/project/cs449-template-m2-2022
  • hlanfran/cs449-template-m2-2022
2 results
Show changes
Commits on Source (54)
Showing
with 2590 additions and 19 deletions
.DS_Store
.bsp
**/*.swp
data/.DS_Store
data/ml-100k
data/ml-25m
project/project
project/target
src/main/scala/project/
src/main/scala/target/
target/
logs/
\documentclass{article}
\usepackage{hyperref}
\usepackage{algorithm}
\usepackage{algpseudocode}
\usepackage{ dsfont }
\usepackage{amsmath}
\usepackage{filemod}
\usepackage{ulem}
\usepackage{graphicx}
\usepackage{todonotes}
\input{Milestone-2-questions.sty}
% If you use BibTeX in apalike style, activate the following line:
\bibliographystyle{acm}
\title{CS-449 Project Milestone 2: Optimizing, Scaling, and Economics}
\author{
\textbf{Name}: xxx\\
\textbf{Sciper}: xxx\\
\textbf{Email:} xxx\\
\textbf{Name}: xxx\\
\textbf{Sciper}: xxx\\
\textbf{Email:} xxx\\
}
\begin{document}
\maketitle
\section{Optimizing with Breeze, a Linear Algebra Library}
\label{section:optimization}
\begin{itemize}
\item[\textbf{BR.1}] \BROne
\item [\textbf{BR.2}] \BRTwoOne
\BRTwoTwo
\BRTwoThree
\end{itemize}
\section{Parallel k-NN Computations with Replicated Ratings}
\begin{enumerate}
\item [\textbf{EK.1}] \EKOne
\item [\textbf{EK.2}] \EKTwo
\end{enumerate}
\section{Distributed Approximate k-NN}
\begin{enumerate}
\item [\textbf{AK.1}] \AKOne
\item [\textbf{AK.2}] \AKTwo
\item [\textbf{AK.3}] \AKThree
\end{enumerate}
\section{Economics}
\textit{Implement the computations for the different answers in the Economics.scala file. You don't need to provide unit tests for this question, nor written answers for these questions in your report.}
\end{document}
\ProvidesPackage{m2questions}[2022/03/11 v1.0]
% Breeze
\newcommand{\BROne}{
Reimplement the kNN predictor of Milestone 1 using the Breeze library and without using Spark. Using $k=10$ and \texttt{data/ml-100k/u2.base} for training, output the similarities between: (1) user $1$ and itself; (2) user $1$ and user $864$; (3) user $1$ and user $886$. Still using $k=10$, output the prediction for user 1 and item 1 ($p_{1,1}$), the prediction for user 327 and item 2 ($p_{327,2}$), and make sure that you obtain an MAE of $0.8287 \pm 0.0001$ on \texttt{data/ml-100k/u2.test}.
}
\newcommand{\BRTwoOne}{
Try making your implementation as fast as possible, both for computing all k-nearest neighbours and for computing the predictions and MAE on a test set. Your implementation should be based around \texttt{CSCMatrix}, but may involve conversions for individual operations. We will test your implementation on a secret test set. The teams with both a correct answer and the shortest time will receive more points.
}
\newcommand{\BRTwoTwo}{
Using $k=300$, compare the time for predicting all values and computing the MAE of \texttt{ml-100k/u2.test} to the one you obtained in Milestone 1. What is the speedup of your new implementation (as a ratio of $\frac{\textit{average time}_{old}}{\textit{average time}_{new}}$)? Use the same machine to measure the time for both versions and provide the answer in your report.
}
\newcommand{\BRTwoThree}{
Also ensure your implementation works with \texttt{data/ml-1m/rb.train} and \texttt{data/ml-1m/rb.test} since you will reuse it in the next questions.
}
% Parallel Exact Knn
\newcommand{\EKOne}{
Test your parallel implementation of k-NN for correctness with two workers. Using $k=10$ and \texttt{data/ml-100k/u2.base} for training, output the similarities between: (1) user $1$ and itself; (2) user $1$ and user $864$; (3) user $1$ and user $886$. Still using $k=10$, output the prediction for user 1 and item 1 ($p_{1,1}$), the prediction for user 327 and item 2 ($p_{327,2}$), and make sure that you obtain an MAE of $0.8287 \pm 0.0001$ on \texttt{data/ml-100k/u2.test}
}
\newcommand{\EKTwo}{
Measure and report the combined \textit{k-NN} and \textit{prediction} time when using 1, 2, 4 workers, $k=300$, and \texttt{ml-1m/rb.train} for training and \texttt{ml-1m/rb.test} for test, on the cluster (or a machine with at least 4 physical cores). Perform 3 measurements for each experiment and report the average and standard-deviation total time, including training, making predictions, and computing the MAE. Do you observe a speedup? Does this speedup grow linearly with the number of executors, i.e. is the running time $X$ times faster when using $X$ executors compared to using a single executor? Answer both questions in your report.
}
% Approximate Knn
\newcommand{\AKOne}{
Implement the approximate k-NN using your previous breeze implementation and Spark's RDDs. Using the partitioner of the template with 10 partitions and 2 replications, $k=10$, and \texttt{data/ml-100k/u2.base} for training, output the similarities of the approximate k-NN between user $1$ and the following users: $1,864,344,16,334,2$.
}
\newcommand{\AKTwo}{
Vary the number of partitions in which a given user appears. For the \texttt{data/ml-100k/u2.base} training set, partitioned equally between 10 workers, report the relationship between the level of replication (1,2,3,4,6,8) and the MAE you obtain on the \texttt{data/ml-100k/u2.test} test set. What is the minimum level of replication such that the MAE is still lower than the baseline predictor of Milestone 1 (MAE of 0.7604), when using $k=300$? Does this reduce the number of similarity computations compared to an exact k-NN? What is the ratio? Answer both questions in your report.
}
\newcommand{\AKThree}{
Measure and report the time required by your approximate \textit{k-NN} implementation, including both training on \texttt{data/ml-1m/rb.train} and computing the MAE on the test set \texttt{data/ml-1m/rb.test}, using $k=300$ on 8 partitions with a replication factor of 1 when using 1, 2, 4 workers. Perform each experiment 3 times and report the average and standard-deviation. Do you observe a speedup compared to the parallel (exact) k-NN with replicated ratings for the same number of workers?
}
% Economics
\newcommand{\EOne}{
What is the minimum number of days of renting to make buying the ICC.M7 less expensive, excluding any operating costs such as electricity and maintenance? Round up to the nearest integer.
}
\newcommand{\ETwoOne}{
After how many days of renting a container, is the cost higher than buying and running 4 Raspberry Pis? (1) Assuming optimistically no maintenance at minimum power usage for RPis, and (2) no maintenance at maximum power usage for RPis, to obtain a likely range. (Round up to the nearest integer in each case).
}
\newcommand{\ETwoTwo}{
Assume a single processor for the container and an equivalent amount of total RAM as the 4 Raspberry Pis. Also provide unrounded intermediary results for (1) Container Daily Cost, (2) 4 RPis (Idle) Daily Electricity Cost, (3) 4 RPis (Computing) Daily Electricity Cost.
}
\newcommand{\EThree}{
For the same buying price as an ICC.M7, how many Raspberry Pis can you get (floor the result to remove the decimal)? Assuming perfect scaling, would you obtain a larger overall throughput and RAM from these? If so, by how much? Compute the ratios using the previous floored number of RPis, but do not round the final results.
}
\ No newline at end of file
File added
# Milestone Description
[Milestone-2.pdf](./Milestone-2.pdf)
Note: Section 'Updates' lists the updates since the original release of the Milestone.
Mu has prepared a report template for your convenience here: [Report Template](./Milestone-2-QA-template.tex).
# Dependencies
````
sbt >= 1.4.7
openjdk@8
````
Should be available by default on ````iccluster028.iccluster.epfl.ch````. Otherwise, refer to each project installation instructions. Prefer working locally on your own machine, you will have less interference in your measurements from other students.
If you work on ````iccluster028.iccluster.epfl.ch````, you need to modify the PATH by default by adding the following line in ````~/.bashrc````:
````
export PATH=$PATH:/opt/sbt/sbt/bin
````
If you have multiple installations of openjdk, you need to specify the one to use as JAVA_HOME, e.g. on OSX with
openjdk@8 installed through Homebrew, you would do:
````
export JAVA_HOME="/usr/local/Cellar/openjdk@8/1.8.0+282";
````
# Dataset
Download [data-m2.zip](https://gitlab.epfl.ch/sacs/cs-449-sds-public/project/dataset/-/raw/main/data-m2.zip).
Unzip:
````
> unzip data-m2.zip
````
It should unzip into ````data/```` by default. If not, manually move ````ml-100k```` and ````ml-1m```` into ````data/````.
# Repository Structure
````src/main/scala/shared/predictions.scala````:
All the functionalities of your code for all questions should be defined there.
This code should then be used in the following applications and tests.
## Applications
1. ````src/main/scala/optimizing/Optimizing.scala````: Output answers to questions **BR.X**.
2. ````src/main/scala/distributed/Exact.scala````: Output answers to questions **EK.X**.
3. ````src/main/scala/distributed/Approximate.scala````: Output answers to questions **AK.X**.
4. ````src/main/scala/economics/Economics.scala````: Output answers to questions **E.X**
Applications are separate from tests to make it easier to test with different
inputs and permit outputting your answers and timings in JSON format for easier
grading.
## Unit Tests
Corresponding unit tests for each application (except Economics.scala):
````
src/test/scala/optimizing/OptimizingTests.scala
src/test/scala/distributed/ExactTests.scala
src/test/scala/distributed/ApproximateTests.scala
````
Your tests should demonstrate how to call your code to obtain the answers of
the applications, and should make exactly the same calls as for the
applications above. This structure intentionally encourages you to put as
little as possible functionality in the application. This also gives the TA a
clear and regular structure to check its correctness.
# Usage
## Execute unit tests
````
sbt "testOnly test.AllTests"
````
You should fill all tests and ensure they all succeed prior to submission.
## Run applications
### Optimizing
````
sbt "runMain scaling.Optimizing --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json optimizing-100k.json --master local[1] --users 943 --movies 1682"
````
### Parallel Exact KNN
````
sbt "runMain distributed.Exact --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json exact-100k-4.json --k 10 --master local[4] --users 943 --movies 1682"
````
### Approximate KNN
````
sbt "runMain distributed.Approximate --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json approximate-100k-4-k10-r2.json --k 10 --master local[4] --users 943 --movies 1682 --partitions 10 --replication 2"
````
### Economics
````
sbt "runMain economics.Economics --json economics.json"
````
## Time applications
For all the previous applications, you can set the number of measurements for timings by adding the following option ````--num_measurements X```` where X is an integer. The default value is ````0````.
## IC Cluster
Test your application locally as much as possible and only test on the iccluster
once everything works, to keep the cluster and the driver node maximally available
for other students.
### Assemble Application for Spark Submit
````sbt clean````: clean up temporary files and previous assembly packages.
````sbt assembly````: create a new jar
````target/scala-2.11/m2_yourid-assembly-1.0.jar```` that can be used with
````spark-submit````.
Prefer packaging your application locally and upload the tar archive of your application
before running on cluster.
### Upload jar on Cluster
````
scp target/scala-2.11/m2_yourid-assembly-1.0.jar <username>@iccluster028.iccluster.epfl.ch:~
````
### Run on Cluster
See [config.sh](./config.sh) for HDFS paths to pre-uploaded train and test datasets to replace TRAIN and TEST, like in the example commands below:
#### When using ML-100k
````
spark-submit --class distributed.Exact --master yarn --conf "spark.dynamicAllocation.enabled=false" --num-executors 1 m2_yourid-assembly-1.0.jar --json exact-100k-1.json --train $ML100Ku2base --test $ML100Ku2test
````
#### When using ML-1m
````
spark-submit --class distributed.Exact --master yarn --conf "spark.dynamicAllocation.enabled=false" --num-executors 1 m2_yourid-assembly-1.0.jar --json exact-1m-1.json --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --k 300 --users 6040 --movies 3952
````
In order to keep results obtained with different parameters in different .json files, simply modify the corresponding parameter ("--json") passed and the values. For instance, with ```--num-executors 4``` : ```--json exact-1m-4.json```.
Note that when changing from ML-100k to ML-1M, the parameter ```--separator ::``` should be added, and the number of users and movies should be modified.
## Grading scripts
We will use the following scripts to grade your submission:
1. ````./test.sh````: Run all unit tests.
2. ````./run.sh````: Run all applications without timing measurements.
3. ````./time.sh````: Run all timing measurements.
All scripts will produce execution logs in the ````logs````
directory, including answers produced in the JSON format. Logs directories are
in the format ````logs/<scriptname>-<datetime>-<machine>/```` and include at
least an execution log ````log.txt```` as well as possible JSON outputs from
applications.
Ensure all scripts run correctly locally before submitting.
## Submission
Steps:
1. Create a zip archive with all your code within ````src/````, as well as your report: ````zip sciper1-sciper2.zip -r src/ report.pdf````
2. Submit ````sciper1-sciper2.zip```` the TA for grading on
https://cs449-submissions.epfl.ch:8083/m2 using the passcode you have previously received by email.
# References
Essential sbt: https://www.scalawilliam.com/essential-sbt/
Explore Spark Interactively (supports autocompletion with tabs!): https://spark.apache.org/docs/latest/quick-start.html
Scallop Argument Parsing: https://github.com/scallop/scallop/wiki
Spark Resilient Distributed Dataset (RDD): https://spark.apache.org/docs/3.0.1/api/scala/org/apache/spark/rdd/RDD.html
# Credits
Erick Lavoie (Design, Implementation, Tests)
Athanasios Xygkis (Requirements, Tests)
name := "m2_yourid"
version := "1.0"
libraryDependencies += "org.rogach" %% "scallop" % "4.0.2"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.7"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.7"
libraryDependencies += "org.scalanlp" %% "breeze" % "0.13.2"
libraryDependencies += "org.scalanlp" %% "breeze-natives" % "0.13.2"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.0" % Test
scalaVersion in ThisBuild := "2.13.3"
libraryDependencies += "com.lihaoyi" %% "ujson" % "1.5.0"
scalaVersion in ThisBuild := "2.11.12"
enablePlugins(JavaAppPackaging)
logBuffered in Test := false
test in assembly := {}
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
if [ $(hostname) == 'iccluster028' ];
then
ICCLUSTER=hdfs://iccluster028.iccluster.epfl.ch:8020
export ML100Ku2base=$ICCLUSTER/cs449/data/ml-100k/u2.base;
export ML100Ku2test=$ICCLUSTER/cs449/data/ml-100k/u2.test;
export ML100Kudata=$ICCLUSTER/cs449/data/ml-100k/u.data;
export ML1Mrbtrain=$ICCLUSTER/cs449/data/ml-1m/rb.train;
export ML1Mrbtest=$ICCLUSTER/cs449/data/ml-1m/rb.test;
export SPARKMASTER='yarn'
else
export ML100Ku2base=data/ml-100k/u2.base;
export ML100Ku2test=data/ml-100k/u2.test;
export ML100Kudata=data/ml-100k/u.data;
export ML1Mrbtrain=data/ml-1m/rb.train;
export ML1Mrbtest=data/ml-1m/rb.test;
export SPARKMASTER='local[4]'
fi;
This diff is collapsed.
This directory should hold execution results.
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.7.4")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0")
#!/usr/bin/env bash
# If your default java install does not work, explicitly
# provide the path to the JDK 1.8 installation. On OSX
# with homebrew:
# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282; ./run.sh
export JAVA_OPTS="-Xmx8G";
RUN=./logs/run-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
mkdir -p $RUN
LOGS=$RUN/log.txt
source ./config.sh
echo "------------------- OPTIMIZING ---------------------" >> $LOGS
sbt "runMain scaling.Optimizing --train $ML100Ku2base --test $ML100Ku2test --json $RUN/optimizing-100k.json --users 943 --movies 1682 --master local[1]" 2>&1 >>$LOGS
echo "------------------- DISTRIBUTED EXACT ---------------------" >> $LOGS
sbt "runMain distributed.Exact --train $ML100Ku2base --test $ML100Ku2test --json $RUN/exact-100k-4.json --k 10 --master local[4] --users 943 --movies 1682" 2>&1 >>$LOGS
sbt "runMain distributed.Exact --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/exact-1m-4.json --k 300 --master local[4] --users 6040 --movies 3952" 2>&1 >>$LOGS
echo "------------------- DISTRIBUTED APPROXIMATE ---------------------" >> $LOGS
sbt "runMain distributed.Approximate --train $ML100Ku2base --test $ML100Ku2test --json $RUN/approximate-100k-4-k10-r2.json --k 10 --master local[4] --users 943 --movies 1682 --partitions 10 --replication 2" 2>&1 >>$LOGS;
for R in 1 2 3 4 6 8; do
sbt "runMain distributed.Approximate --train $ML100Ku2base --test $ML100Ku2test --json $RUN/approximate-100k-4-k300-r$R.json --k 300 --master local[4] --users 943 --movies 1682 --partitions 10 --replication $R" 2>&1 >>$LOGS;
done
sbt "runMain distributed.Approximate --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/approximate-1m-4.json --k 300 --master local[4] --users 6040 --movies 3952 --partitions 8 --replication 1" 2>&1 >>$LOGS
echo "------------------- ECONOMICS -----------------------------------" >> $LOGS
sbt "runMain economics.Economics --json $RUN/economics.json" 2>&1 >>$LOGS
import org.rogach.scallop._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import ujson._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import shared.predictions._
package distributed {
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val k = opt[Int]()
val json = opt[String]()
val users = opt[Int]()
val movies = opt[Int]()
val separator = opt[String](default=Some("\t"))
val replication = opt[Int](default=Some(1))
val partitions = opt[Int](default=Some(1))
val master = opt[String]()
val num_measurements = opt[Int](default=Some(1))
verify()
}
object Approximate {
def main(args: Array[String]) {
var conf = new Conf(args)
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = conf.master.toOption match {
case None => SparkSession.builder().getOrCreate();
case Some(master) => SparkSession.builder().master(master).getOrCreate();
}
val sc = spark.sparkContext
println("")
println("******************************************************")
// conf object is not serializable, extract values that
// will be serialized with the parallelize implementations
val conf_users = conf.users()
val conf_movies = conf.movies()
val conf_k = conf.k()
println("Loading training data")
val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies())
val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies())
var knn : CSCMatrix[Double] = null
println("Partitioning users")
var partitionedUsers : Seq[Set[Int]] = partitionUsers(
conf.users(),
conf.partitions(),
conf.replication()
)
val measurements = (1 to scala.math.max(1,conf.num_measurements()))
.map(_ => timingInMs( () => {
// Use partitionedUsers here
0.0
}))
val mae = measurements(0)._1
val timings = measurements.map(_._2)
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"train" -> ujson.Str(conf.train()),
"test" -> ujson.Str(conf.test()),
"k" -> ujson.Num(conf.k()),
"users" -> ujson.Num(conf.users()),
"movies" -> ujson.Num(conf.movies()),
"master" -> ujson.Str(sc.getConf.get("spark.master")),
"num-executors" -> ujson.Str(if (sc.getConf.contains("spark.executor.instances"))
sc.getConf.get("spark.executor.instances")
else
""),
"num_measurements" -> ujson.Num(conf.num_measurements()),
"partitions" -> ujson.Num(conf.partitions()),
"replication" -> ujson.Num(conf.replication())
),
"AK.1" -> ujson.Obj(
"knn_u1v1" -> ujson.Num(0.0),
"knn_u1v864" -> ujson.Num(0.0),
"knn_u1v344" -> ujson.Num(0.0),
"knn_u1v16" -> ujson.Num(0.0),
"knn_u1v334" -> ujson.Num(0.0),
"knn_u1v2" -> ujson.Num(0.0)
),
"AK.2" -> ujson.Obj(
"mae" -> ujson.Num(mae)
),
"AK.3" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)),
"stddev (ms)" -> ujson.Num(std(timings))
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.stop()
}
}
}
import org.rogach.scallop._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import ujson._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import shared.predictions._
package distributed {
class ExactConf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val k = opt[Int](default=Some(10))
val json = opt[String]()
val users = opt[Int]()
val movies = opt[Int]()
val separator = opt[String](default=Some("\t"))
val master = opt[String]()
val num_measurements = opt[Int](default=Some(1))
verify()
}
object Exact {
def main(args: Array[String]) {
var conf = new ExactConf(args)
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = conf.master.toOption match {
case None => SparkSession.builder().getOrCreate();
case Some(master) => SparkSession.builder().master(master).getOrCreate();
}
spark.sparkContext.setLogLevel("ERROR")
val sc = spark.sparkContext
println("")
println("******************************************************")
// conf object is not serializable, extract values that
// will be serialized with the parallelize implementations
val conf_users = conf.users()
val conf_movies = conf.movies()
val conf_k = conf.k()
println("Loading training data from: " + conf.train())
val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies())
val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies())
val measurements = (1 to scala.math.max(1,conf.num_measurements())).map(_ => timingInMs( () => {
0.0
}))
val timings = measurements.map(_._2)
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"train" -> ujson.Str(conf.train()),
"test" -> ujson.Str(conf.test()),
"k" -> ujson.Num(conf.k()),
"users" -> ujson.Num(conf.users()),
"movies" -> ujson.Num(conf.movies()),
"master" -> ujson.Str(sc.getConf.get("spark.master")),
"num-executors" -> ujson.Str(if (sc.getConf.contains("spark.executor.instances"))
sc.getConf.get("spark.executor.instances")
else
""),
"num_measurements" -> ujson.Num(conf.num_measurements())
),
"EK.1" -> ujson.Obj(
"1.knn_u1v1" -> ujson.Num(0.0),
"2.knn_u1v864" -> ujson.Num(0.0),
"3.knn_u1v886" -> ujson.Num(0.0),
"4.PredUser1Item1" -> ujson.Num(0.0),
"5.PredUser327Item2" -> ujson.Num(0.0),
"6.Mae" -> ujson.Num(0.0)
),
"EK.2" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
spark.stop()
}
}
}
import org.rogach.scallop._
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import ujson._
package economics {
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val json = opt[String]()
verify()
}
object Economics {
def main(args: Array[String]) {
println("")
println("******************************************************")
var conf = new Conf(args)
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"E.1" -> ujson.Obj(
"MinRentingDays" -> ujson.Num(0.0) // Datatype of answer: Double
),
"E.2" -> ujson.Obj(
"ContainerDailyCost" -> ujson.Num(0.0),
"4RPisDailyCostIdle" -> ujson.Num(0.0),
"4RPisDailyCostComputing" -> ujson.Num(0.0),
"MinRentingDaysIdleRPiPower" -> ujson.Num(0.0),
"MinRentingDaysComputingRPiPower" -> ujson.Num(0.0)
),
"E.3" -> ujson.Obj(
"NbRPisEqBuyingICCM7" -> ujson.Num(0.0),
"RatioRAMRPisVsICCM7" -> ujson.Num(0.0),
"RatioComputeRPisVsICCM7" -> ujson.Num(0.0)
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
}
}
}
import org.rogach.scallop._
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import ujson._
import shared.predictions._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
package scaling {
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val train = opt[String](required = true)
val test = opt[String](required = true)
val json = opt[String]()
val users = opt[Int]()
val movies = opt[Int]()
val separator = opt[String](default=Some("\t"))
val master = opt[String]()
val num_measurements = opt[Int](default=Some(1))
verify()
}
object Optimizing extends App {
var conf = new Conf(args)
// conf object is not serializable, extract values that
// will be serialized with the parallelize implementations
val conf_users = conf.users()
val conf_movies = conf.movies()
// Remove these lines if encountering/debugging Spark
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = conf.master.toOption match {
case None => SparkSession.builder().getOrCreate();
case Some(master) => SparkSession.builder().master(master).getOrCreate();
}
spark.sparkContext.setLogLevel("ERROR")
val sc = spark.sparkContext
println("Loading training data from: " + conf.train())
val train = loadSpark(sc, conf.train(), conf.separator(), conf.users(), conf.movies())
val test = loadSpark(sc, conf.test(), conf.separator(), conf.users(), conf.movies())
val measurements = (1 to conf.num_measurements()).map(x => timingInMs(() => {
0.0
}))
val timings = measurements.map(t => t._2)
val mae = measurements(0)._1
// Save answers as JSON
def printToFile(content: String,
location: String = "./answers.json") =
Some(new java.io.PrintWriter(location)).foreach{
f => try{
f.write(content)
} finally{ f.close }
}
conf.json.toOption match {
case None => ;
case Some(jsonFile) => {
val answers = ujson.Obj(
"Meta" -> ujson.Obj(
"train" -> ujson.Str(conf.train()),
"test" -> ujson.Str(conf.test()),
"users" -> ujson.Num(conf.users()),
"movies" -> ujson.Num(conf.movies()),
"master" -> ujson.Str(conf.master()),
"num_measurements" -> ujson.Num(conf.num_measurements())
),
"BR.1" -> ujson.Obj(
"1.k10u1v1" -> ujson.Num(0.0),
"2.k10u1v864" -> ujson.Num(0.0),
"3.k10u1v886" -> ujson.Num(0.0),
"4.PredUser1Item1" -> ujson.Num(0.0),
"5.PredUser327Item2" -> ujson.Num(0.0),
"6.Mae" -> ujson.Num(0.0)
),
"BR.2" -> ujson.Obj(
"average (ms)" -> ujson.Num(mean(timings)), // Datatype of answer: Double
"stddev (ms)" -> ujson.Num(std(timings)) // Datatype of answer: Double
)
)
val json = write(answers, 4)
println(json)
println("Saving answers in: " + jsonFile)
printToFile(json, jsonFile)
}
}
println("")
}
}
package predict
object Predictor extends App {
println("Computing predictions ...")
println("Done")
}
package recommend
object Recommender extends App {
println("Computing recommendations ...")
println("Done")
}
package shared
import breeze.linalg._
import breeze.numerics._
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
package object predictions
{
// ------------------------ For template
case class Rating(user: Int, item: Int, rating: Double)
def timingInMs(f : ()=>Double ) : (Double, Double) = {
val start = System.nanoTime()
val output = f()
val end = System.nanoTime()
return (output, (end-start)/1000000.0)
}
def toInt(s: String): Option[Int] = {
try {
Some(s.toInt)
} catch {
case e: Exception => None
}
}
def mean(s :Seq[Double]): Double = if (s.size > 0) s.reduce(_+_) / s.length else 0.0
def std(s :Seq[Double]): Double = {
if (s.size == 0) 0.0
else {
val m = mean(s)
scala.math.sqrt(s.map(x => scala.math.pow(m-x, 2)).sum / s.length.toDouble)
}
}
def load(path : String, sep : String, nbUsers : Int, nbMovies : Int) : CSCMatrix[Double] = {
val file = Source.fromFile(path)
val builder = new CSCMatrix.Builder[Double](rows=nbUsers, cols=nbMovies)
for (line <- file.getLines) {
val cols = line.split(sep).map(_.trim)
toInt(cols(0)) match {
case Some(_) => builder.add(cols(0).toInt-1, cols(1).toInt-1, cols(2).toDouble)
case None => None
}
}
file.close
builder.result()
}
def loadSpark(sc : org.apache.spark.SparkContext, path : String, sep : String, nbUsers : Int, nbMovies : Int) : CSCMatrix[Double] = {
val file = sc.textFile(path)
val ratings = file
.map(l => {
val cols = l.split(sep).map(_.trim)
toInt(cols(0)) match {
case Some(_) => Some(((cols(0).toInt-1, cols(1).toInt-1), cols(2).toDouble))
case None => None
}
})
.filter({ case Some(_) => true
case None => false })
.map({ case Some(x) => x
case None => ((-1, -1), -1) }).collect()
val builder = new CSCMatrix.Builder[Double](rows=nbUsers, cols=nbMovies)
for ((k,v) <- ratings) {
v match {
case d: Double => {
val u = k._1
val i = k._2
builder.add(u, i, d)
}
}
}
return builder.result
}
def partitionUsers (nbUsers : Int, nbPartitions : Int, replication : Int) : Seq[Set[Int]] = {
val r = new scala.util.Random(1337)
val bins : Map[Int, collection.mutable.ListBuffer[Int]] = (0 to (nbPartitions-1))
.map(p => (p -> collection.mutable.ListBuffer[Int]())).toMap
(0 to (nbUsers-1)).foreach(u => {
val assignedBins = r.shuffle(0 to (nbPartitions-1)).take(replication)
for (b <- assignedBins) {
bins(b) += u
}
})
bins.values.toSeq.map(_.toSet)
}
}
package test
import org.scalatest._
import funsuite._
import test.optimizing._
import test.distributed._
class AllTests extends Sequential(
new OptimizingTests,
new ExactTests,
new ApproximateTests
)
import org.scalatest.funsuite._
final class Prediction extends AnyFunSuite {
test("prediction works") {
assert(true)
}
}