Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • lara/cs206-demos
  • gcharles/cs206-demos
  • gambhir/cs206-demos
3 results
Show changes
Showing
with 1219 additions and 16 deletions
......@@ -19,7 +19,7 @@ class ThreadReturning[A](toRun: => A) extends Thread:
join()
result
def thread[A](toRun: => A): ThreadReturning[A] =
def threadStart[A](toRun: => A): ThreadReturning[A] =
val t = ThreadReturning(toRun)
t.start()
t
......@@ -37,5 +37,5 @@ def countLots: Long =
// Run from the SBT shell with `runMain lecture1.scalaThreadWrapper`.
@main def scalaThreadWrapper: Unit =
val (x, y) = (thread(countLots), thread(countLots))
val (x, y) = (threadStart(countLots), threadStart(countLots))
println((x.joinMe, y.joinMe))
......@@ -7,5 +7,7 @@ package lecture1
trait Task[A]:
def join: A
def task[A](toRun: => A): Task[A] = new Task[A]:
override def join: A = thread(toRun).joinMe
def task[A](toRun: => A): Task[A] =
val t = threadStart(toRun)
new Task[A]:
override def join: A = t.joinMe
package lecture13
import concurrent.{ExecutionContext, Future, Promise}
import concurrent.duration.DurationInt
import akka.actor.{Actor, ActorContext, ActorRef, ActorSystem, Props}
import akka.event.LoggingReceive
import akka.pattern.pipe
import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout
/** This demonstrates a simplified implementation of the ask pattern.
*
* @param promise
* the promise to be completed when a message is received
*/
class AskMiniActor(promise: Promise[Any]) extends Actor:
def receive = LoggingReceive {
case msg => promise.success(msg)
}
extension (receiver: ActorRef)
def ?(msg: Any)(using
// In this simplified implementation, we don't use the timeout.
timeout: Timeout,
// This only used for logging purposes (and to get the actor system in the
// real implementation), but is not used otherwise in the implementation.
sender: ActorRef,
// In the real implementation, the actor system is retrieved differently,
// but here we pass it as an additional implicit parameter for simplicity.
context: ActorContext
): Future[Any] =
context.system.log.debug(
s"Create mini actor to ask $msg from $sender to $receiver"
)
// Create a `Promise` that will be completed when a message is received.
val promise = Promise[Any]()
// Create a mini actor that will complete the `Promise` when it receives a
// message.
val miniActor = context.system.actorOf(Props(AskMiniActor(promise)))
// Send the message to the mini actor, *using the mini actor as the sender*.
// This part is important as it is the mini actor that needs to receive the
// response.
receiver.tell(msg, miniActor)
// Using the `Promise` from the Scala standard library, the corresponding
// `Future` can be retrieved with the method `future`.
promise.future
object Person:
enum Protocol:
case GetName
case GetAge
enum Response:
case Name(name: String)
case Age(age: Int)
case UnknownMessage
class Person(name: String, age: Int) extends Actor:
import Person.Protocol.*
import Person.Response.*
def receive = LoggingReceive {
case GetName => sender() ! Name(name)
case GetAge => sender() ! Age(age)
case _ => sender() ! UnknownMessage
}
object PersonsDatabase:
enum Protocol:
case CreatePerson(name: String, age: Int)
case GetPersonNames(ids: List[Int])
enum Response:
case PersonCreated(id: Int)
case PersonNames(names: List[String])
class PersonsDatabase extends Actor:
import Person.Protocol.*
import Person.Response.*
import PersonsDatabase.Protocol.*
import PersonsDatabase.Response.*
var persons: Map[Int, ActorRef] = Map.empty
var maxId = 0
given ExecutionContext = context.system.dispatcher
given Timeout = Timeout(200.millis)
def receive = LoggingReceive {
case CreatePerson(name, age) =>
val personRef = context.actorOf(Props(Person(name, age)))
persons = persons + (maxId -> personRef)
sender() ! PersonCreated(maxId)
maxId += 1
case GetPersonNames(ids) =>
// We ask every person for their name using the ask pattern. This
// gives us a list of `Future`s that will each be completed with a
// `Name` message.
val rawResponsesFutures: List[Future[Any]] =
ids.map(id => (persons(id) ? GetName))
// We first map each `Future[Any]` to a `Future[Name]` using the
// `mapTo` method. Then, we map each `Future[Name]` to a
// `Future[String]` using the `map` method.
val namesFutures: List[Future[String]] =
rawResponsesFutures.map(_.mapTo[Name].map(_.name))
// We use the `Future.sequence` method to convert a
// `List[Future[String]]` to a `Future[List[String]]`. The resulting
// future will be completed once all the futures in the input list
// are completed.
val futureOfNames: Future[List[String]] = Future.sequence(namesFutures)
// Finally, map the `Future[List[String]]` to a `Future[PersonNames]` and
// pipe it to the sender of the `GetPersonNames` message.
futureOfNames.map(PersonNames.apply).pipeTo(sender())
}
@main def askPatternDemo() =
new TestKit(ActorSystem("DebugSystem")) with ImplicitSender:
import Person.Protocol.*
import Person.Response.*
import PersonsDatabase.Protocol.*
import PersonsDatabase.Response.*
try
val personsDb = system.actorOf(Props(PersonsDatabase()))
personsDb ! CreatePerson("Anita", 20)
expectMsg(PersonCreated(0))
personsDb ! CreatePerson("Berta", 30)
expectMsg(PersonCreated(1))
personsDb ! CreatePerson("Cecilia", 40)
expectMsg(PersonCreated(2))
personsDb ! GetPersonNames(List(0, 1, 2))
expectMsg(PersonNames(List("Anita", "Berta", "Cecilia")))
finally shutdown(system)
package lecture13
import akka.actor.{Actor, ActorContext, ActorRef, ActorSystem, Props}
import akka.event.LoggingReceive
import akka.testkit.{ImplicitSender, TestKit}
class CounterRight extends Actor:
def counter(n: Int): Receive = {
case "increment" => context.become(counter(n + 1), discardOld = false)
// ^^^^^^^^^^^^^^^^^^
// This is needed.
case "get" => sender() ! n
case "decrement" => if n != 0 then context.unbecome()
}
def receive = counter(0)
class CounterWrong extends Actor:
def counter(n: Int): Receive = {
case "increment" => context.become(counter(n + 1))
case "get" => sender() ! n
case "decrement" => if n != 0 then context.unbecome()
}
def receive = counter(0)
@main def becomeDemo() =
new TestKit(ActorSystem("DebugSystem")) with ImplicitSender:
try
val counter = system.actorOf(Props(CounterRight()), "counter")
counter ! "increment"
counter ! "get"
expectMsg(1)
counter ! "increment"
counter ! "get"
expectMsg(2)
counter ! "decrement"
counter ! "get"
expectMsg(1)
// This is wrong if we use CounterWrong because it doesn't set the
// discardOld parameter to false. Therefore, it will not remember the
// previous state and reset the behavior to the initial one, which is
// `counter(0)`.
counter ! "decrement"
counter ! "get"
expectMsg(0)
finally shutdown(system)
......@@ -17,9 +17,9 @@ def parIntersectionWrong(a: ParSet[Int], b: ParSet[Int]) =
result
@main def intersectionWrong =
val a = (0 until 1000).toSet
val b = (0 until 1000 by 4).toSet
val a = (0 until 10000).toSet
val b = (0 until 10000 by 4).toSet
val seqRes = a.intersect(b)
val parRes = parIntersectionWrong(a.par.toSet, b.par.toSet)
println(s"Sequential result: ${seqRes.size}")
println(s"Parallel result: ${parRes.size}")
assert(seqRes.size == 2500)
assert(parRes.size != 2500)
......@@ -18,9 +18,9 @@ def parIntersectionCorrect(a: ParSet[Int], b: ParSet[Int]) =
result
@main def intersectionCorrect =
val a = (0 until 1000).toSet
val b = (0 until 1000 by 4).toSet
val a = (0 until 10000).toSet
val b = (0 until 10000 by 4).toSet
val seqRes = a.intersect(b)
val parRes = parIntersectionCorrect(a.par.toSet, b.par.toSet)
println(s"Sequential result: ${seqRes.size}")
println(s"Parallel result: ${parRes.size}")
assert(seqRes.size == 2500)
assert(parRes.size == 2500)
......@@ -15,9 +15,9 @@ def intersectionParNoSideEffect(a: ParSet[Int], b: ParSet[Int]) =
else b.filter(a(_))
@main def intersectionNoSideEffect =
val a = (0 until 1000).toSet
val b = (0 until 1000 by 4).toSet
val a = (0 until 10000).toSet
val b = (0 until 10000 by 4).toSet
val seqRes = a.intersect(b)
val parRes = intersectionParNoSideEffect(a.par.toSet, b.par.toSet)
println(s"Sequential result: ${seqRes.size}")
println(s"Parallel result: ${parRes.size}")
assert(seqRes.size == 2500)
assert(parRes.size == 2500)
package lecture6
import java.util.concurrent.ForkJoinPool
// Run using `running lecture6.ExecutorsCreate`.
@main def ExecutorsCreate =
val executor = new ForkJoinPool
executor.execute(new Runnable:
override def run() = log("This task is run asynchronously.")
)
Thread.sleep(500)
package lecture6
import scala.concurrent.ExecutionContext
@main def ExecutionContextCreate =
val ectx = ExecutionContext.global
ectx.execute(new Runnable:
override def run() = log("This task is run asynchronously.")
)
Thread.sleep(500)
package lecture6
import scala.concurrent.ExecutionContext
def execute(body: => Unit) =
ExecutionContext.global.execute(new Runnable:
override def run() = body
)
@main def SimpleExecute =
execute { log("This task is run asynchronously.") }
Thread.sleep(500)
package lecture6
import scala.concurrent.ExecutionContext
import lecture1.threadStart
class OnePlaceBuffer[Elem]:
var elem: Elem = _
var full = false
def put(e: Elem): Unit = this.synchronized {
while full do wait()
elem = e
full = true
log(f"Put $e in the buffer")
notifyAll()
}
def get(): Elem = this.synchronized {
while !full do wait()
full = false
notifyAll()
log(f"Got $elem from the buffer")
elem
}
@main def OnePlaceBufferDemo =
val buffer = OnePlaceBuffer[Int]()
val getThreads = for i <- 0 until 10 yield threadStart { buffer.get() }
val putThreads = for i <- 0 until 10 yield threadStart { buffer.put(i) }
putThreads.foreach(_.join())
getThreads.foreach(_.join())
package lecture6
class Fork(var index: Int)
def philosopherTurn(i: Int, l: Fork, r: Fork): Unit =
Thread.sleep(10)
var (left, right) =
if i % 2 == 0 then (l, r)
else (r, l)
// if l.index < r.index then (l, r) else (r, l)
left.synchronized {
log(f"Philosopher $i picks up left fork ${left.index}")
right.synchronized {
log(f"Philosopher $i picks up right fork ${right.index} - eating")
Thread.sleep(100)
log(f"Philosopher $i puts down right fork ${right.index}")
}
log(f"Philosopher $i puts down left fork ${left.index}")
}
@main def run() =
val n = 5
val k: Int = 3
val forks = new Array[Fork](n)
val philosophers = new Array[Thread](n)
for p <- 0 to n - 1 do
forks(p) = Fork(p)
for p <- 0 to n - 1 do
philosophers(p) = new Thread:
override def run() =
for _ <- 0 until k do
philosopherTurn(p, forks(p % n), forks((p + 1) % n))
philosophers(p).start
for p <- 0 to n - 1 do
philosophers(p).join()
package lecture6
val log = println(_)
package midterm23
import instrumentation.Monitor
import scala.annotation.tailrec
/** Runs one of the three provided solutions to the barber shop problem.
*
* Run using `sbt "runMain midterm23.barberSolution 2 3"` for example.
*
* @param solutionNumber
* One of the 3 provided solutions to run: 1, 2, or 3.
* @param nCustomers
* The number of customers to simulate.
*/
@main def barberSolution(solutionNumber: Int, nCustomers: Int): Unit =
val barberShop: AbstractBarberShopSolution =
solutionNumber match
case 1 => BarberShopSolution1(nCustomers)
case 2 => BarberShopSolution2(nCustomers)
case 3 => BarberShopSolution3(nCustomers)
barberSolutionMain(barberShop, solutionNumber, nCustomers)
def barberSolutionMain(
barberShop: AbstractBarberShopSolution,
solutionNumber: Int,
nCustomers: Int
): Unit =
val bthread = new Thread:
override def run() =
barberShop.barber()
bthread.start()
val customerThreads =
for i <- 1 to nCustomers yield
val cthread = new Thread:
override def run() =
barberShop.customer(i)
cthread.start()
cthread
bthread.join()
customerThreads.foreach(_.join())
abstract class AbstractBarberShopSolution:
// var notifyBarber: Boolean = false
// var notifyCustomer: Boolean = false
// val lockObj = Object()
// In order to be able to mock the three attributes above in the tests, we
// replace them with three private attributes and public getters and setters.
// Without overrides, this is equivalent to the definitions above.
private var _notifyBarber: Boolean = false
private var _notifyCustomer: Boolean = false
private val _lockObject: Monitor = new Monitor {}
def notifyBarber = _notifyBarber
def notifyBarber_=(v: Boolean) = _notifyBarber = v
def notifyCustomer = _notifyCustomer
def notifyCustomer_=(v: Boolean) = _notifyCustomer = v
def lockObj: Monitor = _lockObject
def hairCut(): Unit =
log("start hair cut")
Thread.sleep(1000)
log("finish hair cut")
def customer(n: Int): Unit
def barber(): Unit
/** Logs a message to the console.
*
* Overridden in the tests to avoid printing gigabytes of output.
*
* @param s
*/
def log(s: String) = println(s)
/** This is the solution we expected for the `customer` method.
*
* However, now that you have learned about `@volatile`, you can see that the
* `customer` method is not thread-safe. Can you spot a problem with this
* solution?
*
* Note that the `log` calls were added to help you understand what is going
* on. They are not part of the expected solution.
*/
trait BarberShopCustomerSolution1 extends AbstractBarberShopSolution:
override def customer(n: Int) =
lockObj.synchronized {
log(f"customer ${n} sits on the chair and waits for the barber")
notifyBarber = true
while !notifyCustomer do {}
notifyCustomer = false
log(f"customer ${n} leaves the chair")
}
/** This is the solution we expected for the `barber` method.
*
* Same question as above: can you spot a problem with this solution?
*
* @param customersNumber
* The number of customers to simulate.
*/
trait BarberShopBarberSolution1(customersNumber: Int)
extends AbstractBarberShopSolution:
override def barber() =
// while true do
// We replace the infinite loop from the instructions with a loop doing a
// limited number of iterations so that we can test the code.
for _ <- 1 to customersNumber do
log("barber waits for a customer")
while !notifyBarber do {}
notifyBarber = false
hairCut()
notifyCustomer = true
log("barber stops working")
class BarberShopSolution1(customersNumber: Int)
extends BarberShopCustomerSolution1,
BarberShopBarberSolution1(customersNumber)
/** The problem with BarberShopSolution1 is that the `notifyBarber` and
* `notifyCustomer` are accessed from multiple threads without any
* synchronization. This means that the compiler is free to reorder the
* instructions in the `customer` and `barber` methods, which can lead to
* unexpected behaviors like infinite loops!
*
* To solve this problem, the solution below redefines the two attributes as
* `@volatile`.
*
* @param customersNumber
* The number of customers to simulate.
*/
class BarberShopSolution2(customersNumber: Int)
extends BarberShopSolution1(customersNumber):
@volatile private var _notifyBarber: Boolean = false
@volatile private var _notifyCustomer: Boolean = false
override def notifyBarber = _notifyBarber
override def notifyBarber_=(v: Boolean) = _notifyBarber = v
override def notifyCustomer = _notifyCustomer
override def notifyCustomer_=(v: Boolean) = _notifyCustomer = v
/** The solution below uses a different approach to solve the problem of
* BarberShopSolution1. Instead of using `@volatile`, it uses the lock object
* to synchronize the accesses to the two attributes. Note how each access to
* the attributes is now wrapped in a `lockObj.synchronized` block.
*
* @param customersNumber
* The number of customers to simulate.
*/
class BarberShopSolution3(customersNumber: Int)
extends AbstractBarberShopSolution:
override def customer(n: Int) =
getHairCut(n)
@tailrec
private def getHairCut(n: Int): Unit =
val sited = lockObj.synchronized {
if notifyBarber then
false
else if notifyCustomer then
false
else
log(f"customer ${n} sits on the chair and waits for the barber")
notifyBarber = true
true
}
if sited then
while !lockObj.synchronized { notifyCustomer } do {}
lockObj.synchronized {
log(f"customer ${n} leaves the chair")
notifyCustomer = false
}
else
getHairCut(n)
override def barber() =
for _ <- 1 to customersNumber do
log("barber waits for a customer")
while !lockObj.synchronized { notifyBarber } do {}
hairCut()
lockObj.synchronized {
notifyCustomer = true
notifyBarber = false
}
log("barber stops working")
package midterm23
import java.util.concurrent.atomic.AtomicInteger
/** This is an example `BarberShopStudentAnswer`, like the one we made for your
* own solution and that you can find on Moodle. The solution below is boringly
* correct as it use the same implementation as the expected solution. See
* BarberShopSolution.scala for explanations and comments.
*
* @param customersNumber
* The number of customers to simulate.
*/
class BarberShopStudentAnswer333(customersNumber: Int)
extends AbstractBarberShopSolution:
/** This attribute is used to count the number of hair cuts. It is needed for
* testing because we do not want to loop forever in the `barber` method!
*/
val hairCutsCounter = AtomicInteger(customersNumber)
override def customer(n: Int) =
lockObj.synchronized {
notifyBarber = true
while !notifyCustomer do {}
notifyCustomer = false
}
override def barber() =
// while true
// We replace the infinite loop from the instructions with a loop doing a
// limited number of iterations so that we can test the code.
while hairCutsCounter.get() != 0 do
while !notifyBarber do {}
notifyBarber = false
hairCut()
hairCutsCounter.decrementAndGet() // You can ignore this line. It has been added for testing.
notifyCustomer = true
/** Version of `BarberShopStudentAnswer333` with the methods `customer`
* overridden to use the expected implementation.
*
* @param n
* The number of customers to simulate.
*/
class BarberShopStudentAnswer333WithCorrectCustomer(n: Int)
extends BarberShopStudentAnswer333(n)
with BarberShopCustomerSolution1
/** Version of `BarberShopStudentAnswer333` with the methods `barber` overridden
* to use the expected implementation.
*
* @param n
* The number of customers to simulate.
*/
class BarberShopStudentAnswer333WithCorrectBarber(n: Int)
extends BarberShopStudentAnswer333(n)
with BarberShopBarberSolution1(n)
// To grade, we ran tests with each of these classes.
import lecture1.parallel
def numContributeTo(input: Array[Int], kernel: Array[Int], output: Array[Int], i: Int): Int =
if i < 0 || i >= output.length then
0
else if (i - kernel.length < 0) then
i+1
else if (i >= input.length) then
output.length - i
else kernel.length
def sequentialConvolve(
input: Array[Int],
kernel: Array[Int],
output: Array[Int],
from: Int,
until: Int
): Unit = {
// Approach A, as described in the clarification announcement for this
// exercise, where we treat `from` and `until` as indices on `output`
// instead of `input` as in the given code.
var iOutput = from
while iOutput < until do
var iKernel = math.max(0, iOutput - input.length + 1)
while iKernel < kernel.length && iKernel <= iOutput do
// `output` is only ever written to between the indices `from` and
// `until`, the range of `iOutput`. The indices for `input` and
// `kernel` are computed accordingly.
output(iOutput) += input(iOutput - iKernel) * kernel(iKernel)
iKernel += 1
iOutput += 1
// ALTERNATE SOLUTION: Approach B, as described in the clarification
// announcement for this exercise, which is unchanged from the given
// code, i.e. we treat `from` and `until` as indices on `input`.
var iInput = from
while iInput < until do
var iKernel = 0
while iKernel < kernel.length do
output(iInput + iKernel) += input(iInput) * kernel(iKernel)
iKernel += 1
iInput += 1
}
def parallelConvolve(
input: Array[Int],
kernel: Array[Int],
output: Array[Int],
from: Int,
until: Int,
threshold: Int
): Unit =
// Approach A, as described in the clarification announcement for this
// exercise, where we treat `from` and `until` as indices on `output`
// instead of `input` as in the given code. This does not require us to
// change anything in this function. Only receives full credit if used
// together with Approach A for `sequentialConvolve`.
if (until - from) <= threshold then
sequentialConvolve(input, kernel, output, from, until)
else
val mid = from + (until - from) / 2
parallel(
parallelConvolve(input, kernel, output, from, mid, threshold),
parallelConvolve(input, kernel, output, mid, until, threshold)
)
// ALTERNATE SOLUTION: Approach B, as described in the clarification
// announcement for this exercise, where we treat `from` and `until` as
// indices on `input` as in the given code. This requires up to leave a
// gap in the parallel calls to be filled in sequentially as described
// below. Only receives full credit if used together with Approach B for
// `sequentialConvolve`.
if (until - from) <= threshold then
sequentialConvolve(input, kernel, output, from, until)
else
val mid = from + (until - from) / 2
val gap = numContributeTo(input, kernel, output, mid)
// Leave a gap large enough that accesses to `output` from the first
// parallel call will not overlap with accesses from the second. This
// gap is of size equal to the number of elements of `input` that will
// contribute to the computation of the result at the lowest index of
// `output` in the second parallel call, namely, at index `mid`.
// However, we are careful not to access indices lower than `from`.
val gapBeforeMid = Math.max(from, mid - gap + 1)
parallel(
parallelConvolve(input, kernel, output, from, gapBeforeMid, threshold),
parallelConvolve(input, kernel, output, mid, until, threshold)
)
// Fill in the gap sequentially.
sequentialConvolve(input, kernel, output, gapBeforeMid, mid)
object Original_WithOutputRace:
def sequentialConvolve(
input: Array[Int],
kernel: Array[Int],
output: Array[Int],
from: Int,
until: Int): Unit =
var iInput = from
while iInput < until do
var iKernel = 0
while iKernel < kernel.length do
output(iInput + iKernel) += input(iInput) * kernel(iKernel)
iKernel += 1
iInput += 1
def parallelConvolve(
input: Array[Int],
kernel: Array[Int],
output: Array[Int],
from: Int,
until: Int,
threshold: Int): Unit =
if (until - from) <= threshold then
sequentialConvolve(input, kernel, output, from, until)
else
val mid = from + (until - from)/2
parallel(
parallelConvolve(input, kernel, output, from, mid, threshold),
parallelConvolve(input, kernel, output, mid, until, threshold))
import lecture1.parallel
import collection.concurrent.TrieMap
import collection.parallel.CollectionConverters.*
// Example run of `groupBy`:
case class Course(semester: String, id: String)
val cs210 = Course("BA3", "CS210")
val cs250 = Course("BA3", "CS250")
val cs206 = Course("BA4", "CS206")
val courses = Vector(cs210, cs250, cs206)
assertEquals(
courses.groupBy(_.semester),
Map(
"BA3" -> Vector(cs210, cs250),
"BA4" -> Vector(cs206)
)
)
/** Asymptotic run time: O(1) */
def appendAtKey[K, V](m: Map[K, Vector[V]], k: K, v: V): Map[K, Vector[V]] =
val prev = m.getOrElse(k, Vector.empty)
m.updated(k, prev :+ v)
// Example runs of `appendAtKey`:
assertEquals(
appendAtKey(Map(), 0, 1),
Map(0 -> Vector(1))
)
assertEquals(
appendAtKey(Map("BA3" -> Vector(cs210)), "BA3", cs250),
Map("BA3" -> List(cs210, cs250))
)
/** Asymptotic run time: O(1) */
def mergeMaps[K, V](a: Map[K, Vector[V]], b: Map[K, Vector[V]]) =
var res = a
for (k, vs) <- b do // O(1) iterations (we assume the number of keys to be O(1))
val prev = res.getOrElse(k, Vector.empty) // O(1) lookup
res = res.updated(k, prev ++ vs) // O(1) update, O(1) concatenation
res
// Example run of `mergeMaps`:
val m1 = Map(1 -> Vector(5, 7))
val m2 = Map(1 -> Vector(6), 2 -> Vector(3))
assertEquals(
mergeMaps(m1, m2),
Map(
1 -> Vector(5, 7, 6),
2 -> Vector(3)
)
)
def parGroupBy1[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
if vs.size == 1 then
Map(key(vs.head) -> Vector(vs.head))
else
val mid = vs.size / 2
val (m1, m2) = parallel(
parGroupBy1(vs.slice(0, mid), key),
parGroupBy1(vs.slice(mid, vs.size), key)
)
mergeMaps(m1, m2)
assertEquals(
parGroupBy1(courses, c => c.semester),
Map(
"BA3" -> List(cs210, cs250),
"BA4" -> List(cs206)
)
)
assertEquals(
parGroupBy1(courses, c => c.semester),
Map(
"BA3" -> List(cs210, cs250),
"BA4" -> List(cs206)
)
)
def parGroupBy2[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
if vs.size == 1 then
Map(key(vs.head) -> Vector(vs.head))
else
val (m1, m2) = parallel(
parGroupBy2(Vector(vs.head), key),
parGroupBy2(vs.tail, key)
)
mergeMaps(m1, m2)
assertEquals(
parGroupBy2(courses, c => c.semester),
Map(
"BA3" -> List(cs210, cs250),
"BA4" -> List(cs206)
)
)
assertEquals(
parGroupBy2(courses, c => c.semester),
Map(
"BA3" -> List(cs210, cs250),
"BA4" -> List(cs206)
)
)
def parGroupBy3[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
val lock = Object()
var res = Map[K, Vector[V]]()
for v <- vs.par do
lock.synchronized { res = appendAtKey(res, key(v), v) }
res
var values3 = Set[Map[String, Vector[Course]]]()
for _ <- 1 to 100000 do
values3 += parGroupBy3(courses, _.semester)
assertEquals(values3.size, 2)
values3
def parGroupBy4[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
val res = TrieMap[K, Vector[V]]()
for v <- vs.par do
val k = key(v)
val prev = res.getOrElse(k, Vector.empty)
res.update(k, prev :+ v)
res.toMap
var values4 = Set[Map[String, Vector[Course]]]()
for _ <- 1 to 100000 do
values4 += parGroupBy4(courses, _.semester)
assertEquals(values4.size, 4)
values4
def parGroupBySolution[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
vs.par.aggregate(Map.empty)(
(m, v) => appendAtKey(m, key(v), v),
mergeMaps
)
var valuesSol = Set[Map[String, Vector[Course]]]()
for _ <- 1 to 100000 do
valuesSol += parGroupBySolution(courses, _.semester)
assertEquals(valuesSol.size, 1)
valuesSol
def assertEquals(a: Any, b: Any) = assert(a == b)
package concpar21final01
import scala.concurrent.duration.*
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
class Problem1Suite extends munit.FunSuite:
test(
"Retrieves grades at the end of the exam (everyone pushed something) (10pts)"
) {
class Problem1Done extends Problem1:
override def getGrade(sciper: Int): Future[Option[Grade]] =
Future {
Thread.sleep(100)
Some(Grade(sciper, sciper))
}
override def getScipers(): Future[List[Int]] =
Future {
Thread.sleep(100)
List(1, 2, 3, 4)
}
val expected: List[Grade] =
List(Grade(1, 1.0), Grade(2, 2.0), Grade(3, 3.0), Grade(4, 4.0))
(new Problem1Done).leaderboard().map { grades =>
assertEquals(grades.toSet, expected.toSet)
}
}
test("Retrieves grades mid exam (some students didn't push yet) (10pts)") {
class Problem1Partial extends Problem1:
override def getGrade(sciper: Int): Future[Option[Grade]] =
Future {
Thread.sleep(100)
if sciper % 2 == 0 then None
else Some(Grade(sciper, sciper))
}
override def getScipers(): Future[List[Int]] =
Future {
Thread.sleep(100)
List(1, 2, 3, 4)
}
val expected: List[Grade] =
List(Grade(1, 1.0), Grade(3, 3.0))
(new Problem1Partial).leaderboard().map { grades =>
assertEquals(grades.toSet, expected.toSet)
}
}
test("The output list is sorted by grade (10pts)") {
(new Problem1MockData).leaderboard().map { grades =>
assert(grades.size >= 176)
assert(grades.zipWithIndex.forall { case (g, i) =>
grades.drop(i).forall(x => g.grade >= x.grade)
})
}
}
test("GitLab API calls are done in parallel (2pts)") {
var inParallel: Boolean = false
class Problem1Par extends Problem1MockData:
var in: Boolean = false
override def getGrade(sciper: Int): Future[Option[Grade]] =
Future {
if in then inParallel = true
in = true
val out = super.getGrade(sciper)
in = false
concurrent.Await.result(out, Duration(10, SECONDS))
}
(new Problem1Par).leaderboard().map { grades =>
assert(grades.size >= 176)
assert(inParallel)
}
}
test("The IS-academia API is called exactly once (2pts)") {
var called: Int = 0
class Problem1Once extends Problem1MockData:
override def getScipers(): Future[List[Int]] =
called += 1
super.getScipers()
(new Problem1Once).leaderboard().map { grades =>
assert(grades.size >= 176)
assert(called == 1)
}
}
package concpar21final02
import akka.actor.*
import akka.testkit.*
import scala.collection.mutable
import concurrent.duration.*
import Problem2.*
class Problem2Suite extends munit.FunSuite:
import NotificationService.Protocol.*
import NotificationService.Responses.*
import DiscordChannel.Protocol.*
import DiscordChannel.Responses.*
test("Notification register (1pts)") {
new MyTestKit:
def tests() =
val actor = system.actorOf(Props[NotificationService]())
actor ! Register
expectMsg(Registered(true))
}
test("Notification register and un-register (1pts)") {
new MyTestKit:
def tests() =
val actor = system.actorOf(Props[NotificationService]())
actor ! Register
expectMsg(Registered(true))
actor ! UnRegister
expectMsg(Registered(false))
actor ! UnRegister
expectMsg(Registered(false))
actor ! Register
expectMsg(Registered(true))
actor ! UnRegister
expectMsg(Registered(false))
}
test("Notification notify (1pts)") {
new MyTestKit:
def tests() =
val actor = system.actorOf(Props[NotificationService]())
actor ! Register
expectMsg(Registered(true))
actor ! NotifyAll
expectMsg(Notification)
actor ! NotifyAll
expectMsg(Notification)
actor ! UnRegister
expectMsg(Registered(false))
actor ! NotifyAll
expectNoMessage()
actor ! Register
expectMsg(Registered(true))
actor ! NotifyAll
expectMsg(Notification)
actor ! UnRegister
expectMsg(Registered(false))
actor ! NotifyAll
expectNoMessage()
}
test("NotifyAll from other actor (1pts)") {
new MyTestKit:
def tests() =
val actor = system.actorOf(Props[NotificationService]())
val otherActor = system.actorOf(Props[DummyActor]())
def notifyFormAllFromOtherActor() =
given ActorRef = otherActor
actor ! NotifyAll
expectNoMessage()
actor ! Register
expectMsg(Registered(true))
notifyFormAllFromOtherActor()
expectMsg(Notification)
}
test("Channel init (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
expectMsg(Active)
}
test("Channel post and get post (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
expectMsg(Active)
channel ! Post("hello")
channel ! GetLastPosts(1)
expectMsg(Posts(List("hello")))
channel ! GetLastPosts(10)
expectMsg(Posts(List("hello")))
channel ! GetLastPosts(0)
expectMsg(Posts(Nil))
}
test("Channel multiple posts (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
expectMsg(Active)
channel ! Post("hello")
channel ! Post("world")
channel ! GetLastPosts(2)
channel ! GetLastPosts(1)
channel ! Post("!")
channel ! GetLastPosts(3)
expectMsg(Posts(List("world", "hello")))
expectMsg(Posts(List("world")))
expectMsg(Posts(List("!", "world", "hello")))
}
test("Channel posts and notify (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
expectMsg(Active)
notificationService ! Register
expectMsg(Registered(true))
channel ! Post("hello")
channel ! Post("world")
expectMsg(Notification)
expectMsg(Notification)
}
test("Channel init twice (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
expectMsg(Active)
channel ! Init(notificationService)
expectMsg(AlreadyActive)
channel ! Init(notificationService)
expectMsg(AlreadyActive)
}
test("Channel not active (1pts)") {
new MyTestKit:
def tests() =
val channel1 = system.actorOf(Props[DiscordChannel]())
channel1 ! Post("hello")
expectMsg(NotActive)
val channel2 = system.actorOf(Props[DiscordChannel]())
channel2 ! GetLastPosts(0)
expectMsg(NotActive)
}
abstract class MyTestKit
extends TestKit(ActorSystem("TestSystem"))
with ImplicitSender:
def tests(): Unit
try tests()
finally shutdown(system)
class DummyActor extends Actor:
def receive: Receive = { case _ =>
()
}
package concpar21final03
import scala.annotation.tailrec
import scala.concurrent.*
import scala.concurrent.duration.*
import scala.collection.mutable.HashMap
import scala.util.Random
import instrumentation.*
import instrumentation.TestHelper.*
import instrumentation.TestUtils.*
class Problem3Suite extends munit.FunSuite:
test("Part 1: ThreadMap (3pts)") {
testManySchedules(
4,
sched =>
val tmap = new SchedulableThreadMap[Int](sched)
def writeThread(): Unit =
tmap.setCurrentThreadValue(0)
tmap.setCurrentThreadValue(-1)
val readBack = tmap.currentThreadValue
assertEquals(readBack, Some(-1))
def writeAndDeleteThread(): Unit =
tmap.setCurrentThreadValue(42)
tmap.deleteCurrentThreadValue()
@tailrec
def waitThread(): Unit =
tmap.waitForall(_ < 0)
val all = tmap.allValues
if all != List(-1) then waitThread()
val threads = List(
() => writeThread(),
() => writeAndDeleteThread(),
() => waitThread(),
() => waitThread()
)
(threads, _ => (true, ""))
)
}
test("Part 2: RCU (5pts)") {
testManySchedules(
3,
sched =>
val rcu = new SchedulableRCU(sched)
case class State(
value: Int,
isDeleted: AtomicLong = SchedulableAtomicLong(0, sched, "isDeleted")
)
val sharedState =
SchedulableAtomicReference(State(0), sched, "sharedState")
def readThread(): Unit =
rcu.startRead()
val state = sharedState.get
val stateWasDeleted = state.isDeleted.get != 0
assert(
!stateWasDeleted,
"RCU shared state deleted in the middle of a read."
)
rcu.stopRead()
def writeThread(): Unit =
val oldState = sharedState.get
sharedState.set(State(oldState.value + 1))
rcu.waitForOldReads()
oldState.isDeleted.set(1)
val threads = List(
() => readThread(),
() => readThread(),
() => writeThread()
)
(threads, _ => (true, ""))
)
}
test("Part 3: UpdateServer (2pts)") {
testManySchedules(
3,
sched =>
val fs = SchedulableInMemoryFileSystem(sched)
val server = new SchedulableUpdateServer(sched, fs)
def writeThread(): Unit =
server.newUpdate("update1.bin", "Update 1")
server.newUpdate("update2.bin", "Update 2")
assertEquals(fs.fsMap.toSet, Set("update2.bin" -> "Update 2"))
def fetchThread(): Unit =
val res = server.fetchUpdate()
assert(
List(None, Some("Update 1"), Some("Update 2")).contains(res),
s"fetchUpdate returned unexpected value $res"
)
val threads = List(
() => writeThread(),
() => fetchThread(),
() => fetchThread()
)
(threads, _ => (true, ""))
)
}
import scala.concurrent.duration.*
override val munitTimeout = 200.seconds
end Problem3Suite