Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/cs-449-sds-public/project/cs449-template-m2-2022
  • hlanfran/cs449-template-m2-2022
2 results
Show changes
package test.distributed
import org.scalatest._
import funsuite._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import shared.predictions._
import tests.shared.helpers._
class DistributedBaselineTests extends AnyFunSuite with BeforeAndAfterAll {
val separator = "\t"
var spark : org.apache.spark.sql.SparkSession = _
val train2Path = "data/ml-100k/u2.base"
val test2Path = "data/ml-100k/u2.test"
var train2 : org.apache.spark.rdd.RDD[shared.predictions.Rating] = null
var test2 : org.apache.spark.rdd.RDD[shared.predictions.Rating] = null
override def beforeAll {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
train2 = load(spark, train2Path, separator)
test2 = load(spark, test2Path, separator)
}
// All the functions definitions for the tests below (and the tests in other suites)
// should be in a single library, 'src/main/scala/shared/predictions.scala'.
// Provide tests to show how to call your code to do the following tasks (each in with their own test):
// each method should be invoked with a single function call.
// Ensure you use the same function calls to produce the JSON outputs in
// src/main/scala/predict/Baseline.scala.
// Add assertions with the answer you expect from your code, up to the 4th
// decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
test("Compute global average") { assert(within(1.0, 0.0, 0.0001)) }
test("Compute user 1 average") { assert(within(1.0, 0.0, 0.0001)) }
test("Compute item 1 average") { assert(within(1.0, 0.0, 0.0001)) }
test("Compute item 1 average deviation") { assert(within(1.0, 0.0, 0.0001)) }
test("Compute baseline prediction for user 1 on item 1") { assert(within(1.0, 0.0, 0.0001)) }
// Show how to compute the MAE on all four non-personalized methods:
// 1. There should be four different functions, one for each method, to create a predictor
// with the following signature: ````predictor: (train: Seq[shared.predictions.Rating]) => ((u: Int, i: Int) => Double)````;
// 2. There should be a single reusable function to compute the MAE on the test set, given a predictor;
// 3. There should be invocations of both to show they work on the following datasets.
test("MAE on all four non-personalized methods on data/ml-100k/u2.base and data/ml-100k/u2.test") {
assert(within(1.0, 0.0, 0.0001))
assert(within(1.0, 0.0, 0.0001))
assert(within(1.0, 0.0, 0.0001))
assert(within(1.0, 0.0, 0.0001))
}
}
package test.distributed
import breeze.linalg._
import breeze.numerics._
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.BeforeAndAfterAll
import shared.predictions._
import test.shared.helpers._
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
class ExactTests extends AnyFunSuite with BeforeAndAfterAll {
val separator = "\t"
val train2Path = "data/ml-100k/u2.base"
val test2Path = "data/ml-100k/u2.test"
var train2 : CSCMatrix[Double] = null
var test2 : CSCMatrix[Double] = null
var sc : SparkContext = null
override def beforeAll {
train2 = load(train2Path, separator, 943, 1682)
test2 = load(test2Path, separator, 943, 1682)
val spark = SparkSession.builder().master("local[2]").getOrCreate();
spark.sparkContext.setLogLevel("ERROR")
sc = spark.sparkContext
}
// Provide tests to show how to call your code to do the following tasks.
// Ensure you use the same function calls to produce the JSON outputs in
// the corresponding application.
// Add assertions with the answer you expect from your code, up to the 4th
// decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
test("kNN predictor with k=10") {
// Similarity between user 1 and itself
assert(within(1.0, 0.0, 0.0001))
// Similarity between user 1 and 864
assert(within(1.0, 0.0, 0.0001))
// Similarity between user 1 and 886
assert(within(1.0, 0.0, 0.0001))
// Prediction user 1 and item 1
assert(within(1.0, 0.0, 0.0001))
// Prediction user 327 and item 2
assert(within(1.0, 0.0, 0.0001))
// MAE on test
assert(within(1.0, 0.0, 0.0001))
}
}
package test.predict
import org.scalatest._
import funsuite._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
package test.optimizing
import breeze.linalg._
import breeze.numerics._
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.BeforeAndAfterAll
import shared.predictions._
import tests.shared.helpers._
import ujson._
class kNNTests extends AnyFunSuite with BeforeAndAfterAll {
import test.shared.helpers._
class OptimizingTests extends AnyFunSuite with BeforeAndAfterAll {
val separator = "\t"
var spark : org.apache.spark.sql.SparkSession = _
val train2Path = "data/ml-100k/u2.base"
val test2Path = "data/ml-100k/u2.test"
var train2 : Array[shared.predictions.Rating] = null
var test2 : Array[shared.predictions.Rating] = null
var adjustedCosine : Map[Int, Map[Int, Double]] = null
var train2 : CSCMatrix[Double] = null
var test2 : CSCMatrix[Double] = null
override def beforeAll {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
// For these questions, train and test are collected in a scala Array
// to not depend on Spark
train2 = load(spark, train2Path, separator).collect()
test2 = load(spark, test2Path, separator).collect()
train2 = load(train2Path, separator, 943, 1682)
test2 = load(test2Path, separator, 943, 1682)
}
// All the functions definitions for the tests below (and the tests in other suites)
// should be in a single library, 'src/main/scala/shared/predictions.scala'.
// Provide tests to show how to call your code to do the following tasks.
// Ensure you use the same function calls to produce the JSON outputs in
// src/main/scala/predict/Baseline.scala.
// the corresponding application.
// Add assertions with the answer you expect from your code, up to the 4th
// decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
test("kNN predictor with k=10") {
// Create predictor on train2
// Similarity between user 1 and itself
assert(within(1.0, 0.0, 0.0001))
......@@ -61,14 +41,10 @@ class kNNTests extends AnyFunSuite with BeforeAndAfterAll {
// Prediction user 1 and item 1
assert(within(1.0, 0.0, 0.0001))
// MAE on test2
// Prediction user 327 and item 2
assert(within(1.0, 0.0, 0.0001))
}
test("kNN Mae") {
// Compute MAE for k around the baseline MAE
// Ensure the MAEs are indeed lower/higher than baseline
assert(1.0 < 0.0)
}
// MAE on test2
assert(within(1.0, 0.0, 0.0001))
}
}
package test.predict
import org.scalatest._
import funsuite._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import shared.predictions._
import tests.shared.helpers._
import ujson._
class BaselineTests extends AnyFunSuite with BeforeAndAfterAll {
val separator = "\t"
var spark : org.apache.spark.sql.SparkSession = _
val train2Path = "data/ml-100k/u2.base"
val test2Path = "data/ml-100k/u2.test"
var train2 : Array[shared.predictions.Rating] = null
var test2 : Array[shared.predictions.Rating] = null
override def beforeAll {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
// For these questions, train and test are collected in a scala Array
// to not depend on Spark
train2 = load(spark, train2Path, separator).collect()
test2 = load(spark, test2Path, separator).collect()
}
// All the functions definitions for the tests below (and the tests in other suites)
// should be in a single library, 'src/main/scala/shared/predictions.scala'.
// Provide tests to show how to call your code to do the following tasks (each in with their own test):
// each method should be invoked with a single function call.
// Ensure you use the same function calls to produce the JSON outputs in
// src/main/scala/predict/Baseline.scala.
// Add assertions with the answer you expect from your code, up to the 4th
// decimal after the (floating) point, on data/ml-100k/u2.base (as loaded above).
test("Compute global average") { assert(within(1.0, 0.0, 0.0001)) }
test("Compute user 1 average") { assert(within(1.0, 0.0, 0.0001)) }
test("Compute item 1 average") { assert(within(1.0, 0.0, 0.0001)) }
test("Compute item 1 average deviation") { assert(within(1.0, 0.0, 0.0001)) }
test("Compute baseline prediction for user 1 on item 1") { assert(within(1.0, 0.0, 0.0001)) }
// Show how to compute the MAE on all four non-personalized methods:
// 1. There should be four different functions, one for each method, to create a predictor
// with the following signature: ````predictor: (train: Seq[shared.predictions.Rating]) => ((u: Int, i: Int) => Double)````;
// 2. There should be a single reusable function to compute the MAE on the test set, given a predictor;
// 3. There should be invocations of both to show they work on the following datasets.
test("MAE on all four non-personalized methods on data/ml-100k/u2.base and data/ml-100k/u2.test") {
assert(within(1.0, 0.0, 0.0001))
assert(within(1.0, 0.0, 0.0001))
assert(within(1.0, 0.0, 0.0001))
assert(within(1.0, 0.0, 0.0001))
}
}
package test.recommend
import org.scalatest._
import funsuite._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import shared.predictions._
import tests.shared.helpers._
import ujson._
class RecommenderTests extends AnyFunSuite with BeforeAndAfterAll {
val separator = "\t"
var spark : org.apache.spark.sql.SparkSession = _
val dataPath = "data/ml-100k/u.data"
val personalPath = "data/personal.csv"
var data : Array[shared.predictions.Rating] = null
var personal : Array[shared.predictions.Rating] = null
var train : Array[shared.predictions.Rating] = null
var predictor : (Int, Int) => Double = null
override def beforeAll {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
data = load(spark, dataPath, separator).collect()
println("Loading personal data from: " + personalPath)
val personalFile = spark.sparkContext.textFile(personalPath)
personal = personalFile.map(l => {
val cols = l.split(",").map(_.trim)
if (cols(0) == "id")
Rating(944,0,0.0)
else
if (cols.length < 3)
Rating(944, cols(0).toInt, 0.0)
else
Rating(944, cols(0).toInt, cols(2).toDouble)
}).filter(r => r.rating != 0).collect()
// TODO: Create predictor
}
// All the functions definitions for the tests below (and the tests in other suites)
// should be in a single library, 'src/main/scala/shared/predictions.scala'.
//
test("Prediction for user 1 of item 1") {
assert(within(1.0, 0.0, 0.0001))
}
test("Top 3 recommendations for user 944") {
val recommendations = List((1,0.0), (2,0.0), (3,0.0))
assert(recommendations(0)._1 == 4)
assert(within(recommendations(0)._2, 5.0, 0.0001))
// Idem recommendation 2 and 3
}
}
package tests.shared
package test.shared
package object helpers {
def within(actual :Double, expected :Double, interval :Double) : Boolean = {
return actual >= (expected - interval) && actual <= (expected + interval)
}
}
#!/usr/bin/env bash
# If your default java install does not work, explicitly
# provide the path to the JDK 1.8 installation. On OSX
# with homebrew:
# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282; ./run.sh
export JAVA_OPTS="-Xmx8G";
RUN=./logs/timecluster-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
mkdir -p $RUN
LOGS=$RUN/log.txt
source ./config.sh
echo "------------------- DISTRIBUTED ---------------------" >> $LOGS
sbt assembly
# 1 Executor
spark-submit --class distributed.DistributedBaseline --master $SPARKMASTER --num-executors 1 target/scala-2.11/m1_yourid-assembly-1.0.jar --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/distributed-25m-1.json --num_measurements 3 2>&1 >>$LOGS
# 4 Executors
spark-submit --class distributed.DistributedBaseline --master $SPARKMASTER --num-executors 4 target/scala-2.11/m1_yourid-assembly-1.0.jar --train $ML25Mr2train --test $ML25Mr2test --separator , --json $RUN/distributed-25m-4.json --num_measurements 3 2>&1 >>$LOGS
#!/usr/bin/env bash
# If your default java install does not work, explicitly
# provide the path to the JDK 1.8 installation. On OSX
# with homebrew:
# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282;
export JAVA_OPTS="-Xmx8G";
RUN=./logs/timeOthers-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
mkdir -p $RUN
LOGS=$RUN/log.txt
echo "------------------- BASELINE ---------------------" >> $LOGS
sbt "runMain predict.Baseline --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json $RUN/baseline-100k.json --num_measurements 3" 2>&1 >>$LOGS
echo "------------------- DISTRIBUTED ---------------------" >> $LOGS
sbt "runMain predict.Baseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --separator , --json $RUN/baseline-25m.json --num_measurements 3" 2>&1 >> $LOGS
sbt "runMain distributed.DistributedBaseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --separator , --json $RUN/distributed-25m-1.json --num_measurements 3 --master local[1]" 2>&1 >>$LOGS
sbt "runMain distributed.DistributedBaseline --train data/ml-25m/r2.train --test data/ml-25m/r2.test --separator , --json $RUN/distributed-25m-4.json --num_measurements 3 --master local[4]" 2>&1 >>$LOGS
#!/usr/bin/env bash
# If your default java install does not work, explicitly
# provide the path to the JDK 1.8 installation. On OSX
# with homebrew:
# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282;
export JAVA_OPTS="-Xmx8G";
RUN=./logs/timetrials-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
mkdir -p $RUN
LOGS=$RUN/log.txt
echo "------------------- KNN -----------------------------" >> $LOGS
sbt "runMain predict.kNN --train data/ml-100k/u2.base --test data/ml-100k/u2.test --json $RUN/knn-100k.json --num_measurements 3" 2>&1 >>$LOGS
#!/usr/bin/env bash
# If your default java install does not work, explicitly
# provide the path to the JDK 1.8 installation. On OSX
# with homebrew:
# export JAVA_HOME=/usr/local/Cellar/openjdk@8/1.8.0+282; ./run.sh
export JAVA_OPTS="-Xmx8G";
RUN=./logs/timing-$(date "+%Y-%m-%d-%H:%M:%S")-$(hostname)
mkdir -p $RUN
LOGS=$RUN/log.txt
source ./config.sh
echo "------------------- OPTIMIZING ---------------------" >> $LOGS
sbt "runMain scaling.Optimizing --train $ML100Ku2base --test $ML100Ku2test --json $RUN/optimized-100k.json --users 943 --movies 1682 --num_measurements 3" 2>&1 >>$LOGS
echo "------------------- DISTRIBUTED EXACT ---------------------" >> $LOGS
for W in 1 2 4; do
sbt "runMain distributed.Exact --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/exact-1m-$W.json --k 300 --master local[$W] --users 6040 --movies 3952 --num_measurements 3" 2>&1 >>$LOGS;
done
echo "------------------- APPROXIMATE EXACT ---------------------" >> $LOGS
for W in 1 2 4; do
sbt "runMain distributed.Approximate --train $ML1Mrbtrain --test $ML1Mrbtest --separator :: --json $RUN/approximate-1m-$W.json --k 300 --master local[$W] --users 6040 --movies 3952 --num_measurements 3" 2>&1 >>$LOGS;
done