with 1219 additions and 16 deletions
......@@ -19,7 +19,7 @@ class ThreadReturning[A](toRun: => A) extends Thread:
def thread[A](toRun: => A): ThreadReturning[A] =
def threadStart[A](toRun: => A): ThreadReturning[A] =
val t = ThreadReturning(toRun)
......@@ -37,5 +37,5 @@ def countLots: Long =
// Run from the SBT shell with `runMain lecture1.scalaThreadWrapper`.
@main def scalaThreadWrapper: Unit =
val (x, y) = (thread(countLots), thread(countLots))
val (x, y) = (threadStart(countLots), threadStart(countLots))
println((x.joinMe, y.joinMe))
......@@ -7,5 +7,7 @@ package lecture1
trait Task[A]:
def join: A
def task[A](toRun: => A): Task[A] = new Task[A]:
override def join: A = thread(toRun).joinMe
def task[A](toRun: => A): Task[A] =
val t = threadStart(toRun)
new Task[A]:
override def join: A = t.joinMe
package lecture13
import concurrent.{ExecutionContext, Future, Promise}
import concurrent.duration.DurationInt
import{Actor, ActorContext, ActorRef, ActorSystem, Props}
import akka.event.LoggingReceive
import akka.pattern.pipe
import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout
/** This demonstrates a simplified implementation of the ask pattern.
* @param promise
* the promise to be completed when a message is received
class AskMiniActor(promise: Promise[Any]) extends Actor:
def receive = LoggingReceive {
case msg => promise.success(msg)
extension (receiver: ActorRef)
def ?(msg: Any)(using
// In this simplified implementation, we don't use the timeout.
timeout: Timeout,
// This only used for logging purposes (and to get the actor system in the
// real implementation), but is not used otherwise in the implementation.
sender: ActorRef,
// In the real implementation, the actor system is retrieved differently,
// but here we pass it as an additional implicit parameter for simplicity.
context: ActorContext
): Future[Any] =
s"Create mini actor to ask $msg from $sender to $receiver"
// Create a `Promise` that will be completed when a message is received.
val promise = Promise[Any]()
// Create a mini actor that will complete the `Promise` when it receives a
// message.
val miniActor = context.system.actorOf(Props(AskMiniActor(promise)))
// Send the message to the mini actor, *using the mini actor as the sender*.
// This part is important as it is the mini actor that needs to receive the
// response.
receiver.tell(msg, miniActor)
// Using the `Promise` from the Scala standard library, the corresponding
// `Future` can be retrieved with the method `future`.
object Person:
enum Protocol:
case GetName
case GetAge
enum Response:
case Name(name: String)
case Age(age: Int)
case UnknownMessage
class Person(name: String, age: Int) extends Actor:
import Person.Protocol.*
import Person.Response.*
def receive = LoggingReceive {
case GetName => sender() ! Name(name)
case GetAge => sender() ! Age(age)
case _ => sender() ! UnknownMessage
object PersonsDatabase:
enum Protocol:
case CreatePerson(name: String, age: Int)
case GetPersonNames(ids: List[Int])
enum Response:
case PersonCreated(id: Int)
case PersonNames(names: List[String])
class PersonsDatabase extends Actor:
import Person.Protocol.*
import Person.Response.*
import PersonsDatabase.Protocol.*
import PersonsDatabase.Response.*
var persons: Map[Int, ActorRef] = Map.empty
var maxId = 0
given ExecutionContext = context.system.dispatcher
given Timeout = Timeout(200.millis)
def receive = LoggingReceive {
case CreatePerson(name, age) =>
val personRef = context.actorOf(Props(Person(name, age)))
persons = persons + (maxId -> personRef)
sender() ! PersonCreated(maxId)
maxId += 1
case GetPersonNames(ids) =>
// We ask every person for their name using the ask pattern. This
// gives us a list of `Future`s that will each be completed with a
// `Name` message.
val rawResponsesFutures: List[Future[Any]] = => (persons(id) ? GetName))
// We first map each `Future[Any]` to a `Future[Name]` using the
// `mapTo` method. Then, we map each `Future[Name]` to a
// `Future[String]` using the `map` method.
val namesFutures: List[Future[String]] =[Name].map(
// We use the `Future.sequence` method to convert a
// `List[Future[String]]` to a `Future[List[String]]`. The resulting
// future will be completed once all the futures in the input list
// are completed.
val futureOfNames: Future[List[String]] = Future.sequence(namesFutures)
// Finally, map the `Future[List[String]]` to a `Future[PersonNames]` and
// pipe it to the sender of the `GetPersonNames` message.
@main def askPatternDemo() =
new TestKit(ActorSystem("DebugSystem")) with ImplicitSender:
import Person.Protocol.*
import Person.Response.*
import PersonsDatabase.Protocol.*
import PersonsDatabase.Response.*
val personsDb = system.actorOf(Props(PersonsDatabase()))
personsDb ! CreatePerson("Anita", 20)
personsDb ! CreatePerson("Berta", 30)
personsDb ! CreatePerson("Cecilia", 40)
personsDb ! GetPersonNames(List(0, 1, 2))
expectMsg(PersonNames(List("Anita", "Berta", "Cecilia")))
finally shutdown(system)
package lecture13
import{Actor, ActorContext, ActorRef, ActorSystem, Props}
import akka.event.LoggingReceive
import akka.testkit.{ImplicitSender, TestKit}
class CounterRight extends Actor:
def counter(n: Int): Receive = {
case "increment" => context.become(counter(n + 1), discardOld = false)
// ^^^^^^^^^^^^^^^^^^
// This is needed.
case "get" => sender() ! n
case "decrement" => if n != 0 then context.unbecome()
def receive = counter(0)
class CounterWrong extends Actor:
def counter(n: Int): Receive = {
case "increment" => context.become(counter(n + 1))
case "get" => sender() ! n
case "decrement" => if n != 0 then context.unbecome()
def receive = counter(0)
@main def becomeDemo() =
new TestKit(ActorSystem("DebugSystem")) with ImplicitSender:
val counter = system.actorOf(Props(CounterRight()), "counter")
counter ! "increment"
counter ! "get"
counter ! "increment"
counter ! "get"
counter ! "decrement"
counter ! "get"
// This is wrong if we use CounterWrong because it doesn't set the
// discardOld parameter to false. Therefore, it will not remember the
// previous state and reset the behavior to the initial one, which is
// `counter(0)`.
counter ! "decrement"
counter ! "get"
finally shutdown(system)
......@@ -17,9 +17,9 @@ def parIntersectionWrong(a: ParSet[Int], b: ParSet[Int]) =
@main def intersectionWrong =
val a = (0 until 1000).toSet
val b = (0 until 1000 by 4).toSet
val a = (0 until 10000).toSet
val b = (0 until 10000 by 4).toSet
val seqRes = a.intersect(b)
val parRes = parIntersectionWrong(a.par.toSet, b.par.toSet)
println(s"Sequential result: ${seqRes.size}")
println(s"Parallel result: ${parRes.size}")
assert(seqRes.size == 2500)
assert(parRes.size != 2500)
......@@ -18,9 +18,9 @@ def parIntersectionCorrect(a: ParSet[Int], b: ParSet[Int]) =
@main def intersectionCorrect =
val a = (0 until 1000).toSet
val b = (0 until 1000 by 4).toSet
val a = (0 until 10000).toSet
val b = (0 until 10000 by 4).toSet
val seqRes = a.intersect(b)
val parRes = parIntersectionCorrect(a.par.toSet, b.par.toSet)
println(s"Sequential result: ${seqRes.size}")
println(s"Parallel result: ${parRes.size}")
assert(seqRes.size == 2500)
assert(parRes.size == 2500)
......@@ -15,9 +15,9 @@ def intersectionParNoSideEffect(a: ParSet[Int], b: ParSet[Int]) =
else b.filter(a(_))
@main def intersectionNoSideEffect =
val a = (0 until 1000).toSet
val b = (0 until 1000 by 4).toSet
val a = (0 until 10000).toSet
val b = (0 until 10000 by 4).toSet
val seqRes = a.intersect(b)
val parRes = intersectionParNoSideEffect(a.par.toSet, b.par.toSet)
println(s"Sequential result: ${seqRes.size}")
println(s"Parallel result: ${parRes.size}")
assert(seqRes.size == 2500)
assert(parRes.size == 2500)
package lecture6
import java.util.concurrent.ForkJoinPool
// Run using `running lecture6.ExecutorsCreate`.
@main def ExecutorsCreate =
val executor = new ForkJoinPool
executor.execute(new Runnable:
override def run() = log("This task is run asynchronously.")
package lecture6
import scala.concurrent.ExecutionContext
@main def ExecutionContextCreate =
val ectx =
ectx.execute(new Runnable:
override def run() = log("This task is run asynchronously.")
package lecture6
import scala.concurrent.ExecutionContext
def execute(body: => Unit) = Runnable:
override def run() = body
@main def SimpleExecute =
execute { log("This task is run asynchronously.") }
package lecture6
import scala.concurrent.ExecutionContext
import lecture1.threadStart
class OnePlaceBuffer[Elem]:
var elem: Elem = _
var full = false
def put(e: Elem): Unit = this.synchronized {
while full do wait()
elem = e
full = true
log(f"Put $e in the buffer")
def get(): Elem = this.synchronized {
while !full do wait()
full = false
log(f"Got $elem from the buffer")
@main def OnePlaceBufferDemo =
val buffer = OnePlaceBuffer[Int]()
val getThreads = for i <- 0 until 10 yield threadStart { buffer.get() }
val putThreads = for i <- 0 until 10 yield threadStart { buffer.put(i) }
package lecture6
class Fork(var index: Int)
def philosopherTurn(i: Int, l: Fork, r: Fork): Unit =
var (left, right) =
if i % 2 == 0 then (l, r)
else (r, l)
// if l.index < r.index then (l, r) else (r, l)
left.synchronized {
log(f"Philosopher $i picks up left fork ${left.index}")
right.synchronized {
log(f"Philosopher $i picks up right fork ${right.index} - eating")
log(f"Philosopher $i puts down right fork ${right.index}")
log(f"Philosopher $i puts down left fork ${left.index}")
@main def run() =
val n = 5
val k: Int = 3
val forks = new Array[Fork](n)
val philosophers = new Array[Thread](n)
for p <- 0 to n - 1 do
forks(p) = Fork(p)
for p <- 0 to n - 1 do
philosophers(p) = new Thread:
override def run() =
for _ <- 0 until k do
philosopherTurn(p, forks(p % n), forks((p + 1) % n))
for p <- 0 to n - 1 do
package lecture6
val log = println(_)
package midterm23
import instrumentation.Monitor
import scala.annotation.tailrec
/** Runs one of the three provided solutions to the barber shop problem.
* Run using `sbt "runMain midterm23.barberSolution 2 3"` for example.
* @param solutionNumber
* One of the 3 provided solutions to run: 1, 2, or 3.
* @param nCustomers
* The number of customers to simulate.
@main def barberSolution(solutionNumber: Int, nCustomers: Int): Unit =
val barberShop: AbstractBarberShopSolution =
solutionNumber match
case 1 => BarberShopSolution1(nCustomers)
case 2 => BarberShopSolution2(nCustomers)
case 3 => BarberShopSolution3(nCustomers)
barberSolutionMain(barberShop, solutionNumber, nCustomers)
def barberSolutionMain(
barberShop: AbstractBarberShopSolution,
solutionNumber: Int,
nCustomers: Int
): Unit =
val bthread = new Thread:
override def run() =
val customerThreads =
for i <- 1 to nCustomers yield
val cthread = new Thread:
override def run() =
abstract class AbstractBarberShopSolution:
// var notifyBarber: Boolean = false
// var notifyCustomer: Boolean = false
// val lockObj = Object()
// In order to be able to mock the three attributes above in the tests, we
// replace them with three private attributes and public getters and setters.
// Without overrides, this is equivalent to the definitions above.
private var _notifyBarber: Boolean = false
private var _notifyCustomer: Boolean = false
private val _lockObject: Monitor = new Monitor {}
def notifyBarber = _notifyBarber
def notifyBarber_=(v: Boolean) = _notifyBarber = v
def notifyCustomer = _notifyCustomer
def notifyCustomer_=(v: Boolean) = _notifyCustomer = v
def lockObj: Monitor = _lockObject
def hairCut(): Unit =
log("start hair cut")
log("finish hair cut")
def customer(n: Int): Unit
def barber(): Unit
/** Logs a message to the console.
* Overridden in the tests to avoid printing gigabytes of output.
* @param s
def log(s: String) = println(s)
/** This is the solution we expected for the `customer` method.
* However, now that you have learned about `@volatile`, you can see that the
* `customer` method is not thread-safe. Can you spot a problem with this
* solution?
* Note that the `log` calls were added to help you understand what is going
* on. They are not part of the expected solution.
trait BarberShopCustomerSolution1 extends AbstractBarberShopSolution:
override def customer(n: Int) =
lockObj.synchronized {
log(f"customer ${n} sits on the chair and waits for the barber")
notifyBarber = true
while !notifyCustomer do {}
notifyCustomer = false
log(f"customer ${n} leaves the chair")
/** This is the solution we expected for the `barber` method.
* Same question as above: can you spot a problem with this solution?
* @param customersNumber
* The number of customers to simulate.
trait BarberShopBarberSolution1(customersNumber: Int)
extends AbstractBarberShopSolution:
override def barber() =
// while true do
// We replace the infinite loop from the instructions with a loop doing a
// limited number of iterations so that we can test the code.
for _ <- 1 to customersNumber do
log("barber waits for a customer")
while !notifyBarber do {}
notifyBarber = false
notifyCustomer = true
log("barber stops working")
class BarberShopSolution1(customersNumber: Int)
extends BarberShopCustomerSolution1,
/** The problem with BarberShopSolution1 is that the `notifyBarber` and
* `notifyCustomer` are accessed from multiple threads without any
* synchronization. This means that the compiler is free to reorder the
* instructions in the `customer` and `barber` methods, which can lead to
* unexpected behaviors like infinite loops!
* To solve this problem, the solution below redefines the two attributes as
* `@volatile`.
* @param customersNumber
* The number of customers to simulate.
class BarberShopSolution2(customersNumber: Int)
extends BarberShopSolution1(customersNumber):
@volatile private var _notifyBarber: Boolean = false
@volatile private var _notifyCustomer: Boolean = false
override def notifyBarber = _notifyBarber
override def notifyBarber_=(v: Boolean) = _notifyBarber = v
override def notifyCustomer = _notifyCustomer
override def notifyCustomer_=(v: Boolean) = _notifyCustomer = v
/** The solution below uses a different approach to solve the problem of
* BarberShopSolution1. Instead of using `@volatile`, it uses the lock object
* to synchronize the accesses to the two attributes. Note how each access to
* the attributes is now wrapped in a `lockObj.synchronized` block.
* @param customersNumber
* The number of customers to simulate.
class BarberShopSolution3(customersNumber: Int)
extends AbstractBarberShopSolution:
override def customer(n: Int) =
private def getHairCut(n: Int): Unit =
val sited = lockObj.synchronized {
if notifyBarber then
else if notifyCustomer then
log(f"customer ${n} sits on the chair and waits for the barber")
notifyBarber = true
if sited then
while !lockObj.synchronized { notifyCustomer } do {}
lockObj.synchronized {
log(f"customer ${n} leaves the chair")
notifyCustomer = false
override def barber() =
for _ <- 1 to customersNumber do
log("barber waits for a customer")
while !lockObj.synchronized { notifyBarber } do {}
lockObj.synchronized {
notifyCustomer = true
notifyBarber = false
log("barber stops working")
package midterm23
import java.util.concurrent.atomic.AtomicInteger
/** This is an example `BarberShopStudentAnswer`, like the one we made for your
* own solution and that you can find on Moodle. The solution below is boringly
* correct as it use the same implementation as the expected solution. See
* BarberShopSolution.scala for explanations and comments.
* @param customersNumber
* The number of customers to simulate.
class BarberShopStudentAnswer333(customersNumber: Int)
extends AbstractBarberShopSolution:
/** This attribute is used to count the number of hair cuts. It is needed for
* testing because we do not want to loop forever in the `barber` method!
val hairCutsCounter = AtomicInteger(customersNumber)
override def customer(n: Int) =
lockObj.synchronized {
notifyBarber = true
while !notifyCustomer do {}
notifyCustomer = false
override def barber() =
// while true
// We replace the infinite loop from the instructions with a loop doing a
// limited number of iterations so that we can test the code.
while hairCutsCounter.get() != 0 do
while !notifyBarber do {}
notifyBarber = false
hairCutsCounter.decrementAndGet() // You can ignore this line. It has been added for testing.
notifyCustomer = true
/** Version of `BarberShopStudentAnswer333` with the methods `customer`
* overridden to use the expected implementation.
* @param n
* The number of customers to simulate.
class BarberShopStudentAnswer333WithCorrectCustomer(n: Int)
extends BarberShopStudentAnswer333(n)
with BarberShopCustomerSolution1
/** Version of `BarberShopStudentAnswer333` with the methods `barber` overridden
* to use the expected implementation.
* @param n
* The number of customers to simulate.
class BarberShopStudentAnswer333WithCorrectBarber(n: Int)
extends BarberShopStudentAnswer333(n)
with BarberShopBarberSolution1(n)
// To grade, we ran tests with each of these classes.
import lecture1.parallel
def numContributeTo(input: Array[Int], kernel: Array[Int], output: Array[Int], i: Int): Int =
if i < 0 || i >= output.length then
else if (i - kernel.length < 0) then
else if (i >= input.length) then
output.length - i
else kernel.length
def sequentialConvolve(
input: Array[Int],
kernel: Array[Int],
output: Array[Int],
from: Int,
until: Int
): Unit = {
// Approach A, as described in the clarification announcement for this
// exercise, where we treat `from` and `until` as indices on `output`
// instead of `input` as in the given code.
var iOutput = from
while iOutput < until do
var iKernel = math.max(0, iOutput - input.length + 1)
while iKernel < kernel.length && iKernel <= iOutput do
// `output` is only ever written to between the indices `from` and
// `until`, the range of `iOutput`. The indices for `input` and
// `kernel` are computed accordingly.
output(iOutput) += input(iOutput - iKernel) * kernel(iKernel)
iKernel += 1
iOutput += 1
// ALTERNATE SOLUTION: Approach B, as described in the clarification
// announcement for this exercise, which is unchanged from the given
// code, i.e. we treat `from` and `until` as indices on `input`.
var iInput = from
while iInput < until do
var iKernel = 0
while iKernel < kernel.length do
output(iInput + iKernel) += input(iInput) * kernel(iKernel)
iKernel += 1
iInput += 1
def parallelConvolve(
input: Array[Int],
kernel: Array[Int],
output: Array[Int],
from: Int,
until: Int,
threshold: Int
): Unit =
// Approach A, as described in the clarification announcement for this
// exercise, where we treat `from` and `until` as indices on `output`
// instead of `input` as in the given code. This does not require us to
// change anything in this function. Only receives full credit if used
// together with Approach A for `sequentialConvolve`.
if (until - from) <= threshold then
sequentialConvolve(input, kernel, output, from, until)
val mid = from + (until - from) / 2
parallelConvolve(input, kernel, output, from, mid, threshold),
parallelConvolve(input, kernel, output, mid, until, threshold)
// ALTERNATE SOLUTION: Approach B, as described in the clarification
// announcement for this exercise, where we treat `from` and `until` as
// indices on `input` as in the given code. This requires up to leave a
// gap in the parallel calls to be filled in sequentially as described
// below. Only receives full credit if used together with Approach B for
// `sequentialConvolve`.
if (until - from) <= threshold then
sequentialConvolve(input, kernel, output, from, until)
val mid = from + (until - from) / 2
val gap = numContributeTo(input, kernel, output, mid)
// Leave a gap large enough that accesses to `output` from the first
// parallel call will not overlap with accesses from the second. This
// gap is of size equal to the number of elements of `input` that will
// contribute to the computation of the result at the lowest index of
// `output` in the second parallel call, namely, at index `mid`.
// However, we are careful not to access indices lower than `from`.
val gapBeforeMid = Math.max(from, mid - gap + 1)
parallelConvolve(input, kernel, output, from, gapBeforeMid, threshold),
parallelConvolve(input, kernel, output, mid, until, threshold)
// Fill in the gap sequentially.
sequentialConvolve(input, kernel, output, gapBeforeMid, mid)
object Original_WithOutputRace:
def sequentialConvolve(
input: Array[Int],
kernel: Array[Int],
output: Array[Int],
from: Int,
until: Int): Unit =
var iInput = from
while iInput < until do
var iKernel = 0
while iKernel < kernel.length do
output(iInput + iKernel) += input(iInput) * kernel(iKernel)
iKernel += 1
iInput += 1
def parallelConvolve(
input: Array[Int],
kernel: Array[Int],
output: Array[Int],
from: Int,
until: Int,
threshold: Int): Unit =
if (until - from) <= threshold then
sequentialConvolve(input, kernel, output, from, until)
val mid = from + (until - from)/2
parallelConvolve(input, kernel, output, from, mid, threshold),
parallelConvolve(input, kernel, output, mid, until, threshold))
import lecture1.parallel
import collection.concurrent.TrieMap
import collection.parallel.CollectionConverters.*
// Example run of `groupBy`:
case class Course(semester: String, id: String)
val cs210 = Course("BA3", "CS210")
val cs250 = Course("BA3", "CS250")
val cs206 = Course("BA4", "CS206")
val courses = Vector(cs210, cs250, cs206)
"BA3" -> Vector(cs210, cs250),
"BA4" -> Vector(cs206)
/** Asymptotic run time: O(1) */
def appendAtKey[K, V](m: Map[K, Vector[V]], k: K, v: V): Map[K, Vector[V]] =
val prev = m.getOrElse(k, Vector.empty)
m.updated(k, prev :+ v)
// Example runs of `appendAtKey`:
appendAtKey(Map(), 0, 1),
Map(0 -> Vector(1))
appendAtKey(Map("BA3" -> Vector(cs210)), "BA3", cs250),
Map("BA3" -> List(cs210, cs250))
/** Asymptotic run time: O(1) */
def mergeMaps[K, V](a: Map[K, Vector[V]], b: Map[K, Vector[V]]) =
var res = a
for (k, vs) <- b do // O(1) iterations (we assume the number of keys to be O(1))
val prev = res.getOrElse(k, Vector.empty) // O(1) lookup
res = res.updated(k, prev ++ vs) // O(1) update, O(1) concatenation
// Example run of `mergeMaps`:
val m1 = Map(1 -> Vector(5, 7))
val m2 = Map(1 -> Vector(6), 2 -> Vector(3))
mergeMaps(m1, m2),
1 -> Vector(5, 7, 6),
2 -> Vector(3)
def parGroupBy1[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
if vs.size == 1 then
Map(key(vs.head) -> Vector(vs.head))
val mid = vs.size / 2
val (m1, m2) = parallel(
parGroupBy1(vs.slice(0, mid), key),
parGroupBy1(vs.slice(mid, vs.size), key)
mergeMaps(m1, m2)
parGroupBy1(courses, c => c.semester),
"BA3" -> List(cs210, cs250),
"BA4" -> List(cs206)
parGroupBy1(courses, c => c.semester),
"BA3" -> List(cs210, cs250),
"BA4" -> List(cs206)
def parGroupBy2[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
if vs.size == 1 then
Map(key(vs.head) -> Vector(vs.head))
val (m1, m2) = parallel(
parGroupBy2(Vector(vs.head), key),
parGroupBy2(vs.tail, key)
mergeMaps(m1, m2)
parGroupBy2(courses, c => c.semester),
"BA3" -> List(cs210, cs250),
"BA4" -> List(cs206)
parGroupBy2(courses, c => c.semester),
"BA3" -> List(cs210, cs250),
"BA4" -> List(cs206)
def parGroupBy3[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
val lock = Object()
var res = Map[K, Vector[V]]()
for v <- vs.par do
lock.synchronized { res = appendAtKey(res, key(v), v) }
var values3 = Set[Map[String, Vector[Course]]]()
for _ <- 1 to 100000 do
values3 += parGroupBy3(courses, _.semester)
assertEquals(values3.size, 2)
def parGroupBy4[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
val res = TrieMap[K, Vector[V]]()
for v <- vs.par do
val k = key(v)
val prev = res.getOrElse(k, Vector.empty)
res.update(k, prev :+ v)
var values4 = Set[Map[String, Vector[Course]]]()
for _ <- 1 to 100000 do
values4 += parGroupBy4(courses, _.semester)
assertEquals(values4.size, 4)
def parGroupBySolution[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
(m, v) => appendAtKey(m, key(v), v),
var valuesSol = Set[Map[String, Vector[Course]]]()
for _ <- 1 to 100000 do
valuesSol += parGroupBySolution(courses, _.semester)
assertEquals(valuesSol.size, 1)
def assertEquals(a: Any, b: Any) = assert(a == b)
package concpar21final01
import scala.concurrent.duration.*
import scala.concurrent.Future
class Problem1Suite extends munit.FunSuite:
"Retrieves grades at the end of the exam (everyone pushed something) (10pts)"
) {
class Problem1Done extends Problem1:
override def getGrade(sciper: Int): Future[Option[Grade]] =
Future {
Some(Grade(sciper, sciper))
override def getScipers(): Future[List[Int]] =
Future {
List(1, 2, 3, 4)
val expected: List[Grade] =
List(Grade(1, 1.0), Grade(2, 2.0), Grade(3, 3.0), Grade(4, 4.0))
(new Problem1Done).leaderboard().map { grades =>
assertEquals(grades.toSet, expected.toSet)
test("Retrieves grades mid exam (some students didn't push yet) (10pts)") {
class Problem1Partial extends Problem1:
override def getGrade(sciper: Int): Future[Option[Grade]] =
Future {
if sciper % 2 == 0 then None
else Some(Grade(sciper, sciper))
override def getScipers(): Future[List[Int]] =
Future {
List(1, 2, 3, 4)
val expected: List[Grade] =
List(Grade(1, 1.0), Grade(3, 3.0))
(new Problem1Partial).leaderboard().map { grades =>
assertEquals(grades.toSet, expected.toSet)
test("The output list is sorted by grade (10pts)") {
(new Problem1MockData).leaderboard().map { grades =>
assert(grades.size >= 176)
assert(grades.zipWithIndex.forall { case (g, i) =>
grades.drop(i).forall(x => g.grade >= x.grade)
test("GitLab API calls are done in parallel (2pts)") {
var inParallel: Boolean = false
class Problem1Par extends Problem1MockData:
var in: Boolean = false
override def getGrade(sciper: Int): Future[Option[Grade]] =
Future {
if in then inParallel = true
in = true
val out = super.getGrade(sciper)
in = false
concurrent.Await.result(out, Duration(10, SECONDS))
(new Problem1Par).leaderboard().map { grades =>
assert(grades.size >= 176)
test("The IS-academia API is called exactly once (2pts)") {
var called: Int = 0
class Problem1Once extends Problem1MockData:
override def getScipers(): Future[List[Int]] =
called += 1
(new Problem1Once).leaderboard().map { grades =>
assert(grades.size >= 176)
assert(called == 1)
package concpar21final02
import akka.testkit.*
import scala.collection.mutable
import concurrent.duration.*
import Problem2.*
class Problem2Suite extends munit.FunSuite:
import NotificationService.Protocol.*
import NotificationService.Responses.*
import DiscordChannel.Protocol.*
import DiscordChannel.Responses.*
test("Notification register (1pts)") {
new MyTestKit:
def tests() =
val actor = system.actorOf(Props[NotificationService]())
actor ! Register
test("Notification register and un-register (1pts)") {
new MyTestKit:
def tests() =
val actor = system.actorOf(Props[NotificationService]())
actor ! Register
actor ! UnRegister
actor ! UnRegister
actor ! Register
actor ! UnRegister
test("Notification notify (1pts)") {
new MyTestKit:
def tests() =
val actor = system.actorOf(Props[NotificationService]())
actor ! Register
actor ! NotifyAll
actor ! NotifyAll
actor ! UnRegister
actor ! NotifyAll
actor ! Register
actor ! NotifyAll
actor ! UnRegister
actor ! NotifyAll
test("NotifyAll from other actor (1pts)") {
new MyTestKit:
def tests() =
val actor = system.actorOf(Props[NotificationService]())
val otherActor = system.actorOf(Props[DummyActor]())
def notifyFormAllFromOtherActor() =
given ActorRef = otherActor
actor ! NotifyAll
actor ! Register
test("Channel init (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
test("Channel post and get post (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
channel ! Post("hello")
channel ! GetLastPosts(1)
channel ! GetLastPosts(10)
channel ! GetLastPosts(0)
test("Channel multiple posts (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
channel ! Post("hello")
channel ! Post("world")
channel ! GetLastPosts(2)
channel ! GetLastPosts(1)
channel ! Post("!")
channel ! GetLastPosts(3)
expectMsg(Posts(List("world", "hello")))
expectMsg(Posts(List("!", "world", "hello")))
test("Channel posts and notify (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
notificationService ! Register
channel ! Post("hello")
channel ! Post("world")
test("Channel init twice (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
channel ! Init(notificationService)
channel ! Init(notificationService)
test("Channel not active (1pts)") {
new MyTestKit:
def tests() =
val channel1 = system.actorOf(Props[DiscordChannel]())
channel1 ! Post("hello")
val channel2 = system.actorOf(Props[DiscordChannel]())
channel2 ! GetLastPosts(0)
abstract class MyTestKit
extends TestKit(ActorSystem("TestSystem"))
with ImplicitSender:
def tests(): Unit
try tests()
finally shutdown(system)
class DummyActor extends Actor:
def receive: Receive = { case _ =>
package concpar21final03
import scala.annotation.tailrec
import scala.concurrent.*
import scala.concurrent.duration.*
import scala.collection.mutable.HashMap
import scala.util.Random
import instrumentation.*
import instrumentation.TestHelper.*
import instrumentation.TestUtils.*
class Problem3Suite extends munit.FunSuite:
test("Part 1: ThreadMap (3pts)") {
sched =>
val tmap = new SchedulableThreadMap[Int](sched)
def writeThread(): Unit =
val readBack = tmap.currentThreadValue
assertEquals(readBack, Some(-1))
def writeAndDeleteThread(): Unit =
def waitThread(): Unit =
tmap.waitForall(_ < 0)
val all = tmap.allValues
if all != List(-1) then waitThread()
val threads = List(
() => writeThread(),
() => writeAndDeleteThread(),
() => waitThread(),
() => waitThread()
(threads, _ => (true, ""))
test("Part 2: RCU (5pts)") {
sched =>
val rcu = new SchedulableRCU(sched)
case class State(
value: Int,
isDeleted: AtomicLong = SchedulableAtomicLong(0, sched, "isDeleted")
val sharedState =
SchedulableAtomicReference(State(0), sched, "sharedState")
def readThread(): Unit =
val state = sharedState.get
val stateWasDeleted = state.isDeleted.get != 0
"RCU shared state deleted in the middle of a read."
def writeThread(): Unit =
val oldState = sharedState.get
sharedState.set(State(oldState.value + 1))
val threads = List(
() => readThread(),
() => readThread(),
() => writeThread()
(threads, _ => (true, ""))
test("Part 3: UpdateServer (2pts)") {
sched =>
val fs = SchedulableInMemoryFileSystem(sched)
val server = new SchedulableUpdateServer(sched, fs)
def writeThread(): Unit =
server.newUpdate("update1.bin", "Update 1")
server.newUpdate("update2.bin", "Update 2")
assertEquals(fs.fsMap.toSet, Set("update2.bin" -> "Update 2"))
def fetchThread(): Unit =
val res = server.fetchUpdate()
List(None, Some("Update 1"), Some("Update 2")).contains(res),
s"fetchUpdate returned unexpected value $res"
val threads = List(
() => writeThread(),
() => fetchThread(),
() => fetchThread()
(threads, _ => (true, ""))
import scala.concurrent.duration.*
override val munitTimeout = 200.seconds
end Problem3Suite