package concpar22final02.instrumentation
import scala.annotation.tailrec
import concpar22final02.*
import scala.collection.mutable.ArrayBuffer
class SchedulableBarrier(val scheduler: Scheduler, size: Int)
extends Barrier(size)
with MockedMonitor
class SchedulableProblem2(
val scheduler: Scheduler,
imageSize: Int,
threadCount: Int,
numFilters: Int
) extends Problem2(imageSize, threadCount, numFilters):
self =>
override val barrier =
ArrayBuffer.fill(numFilters)(SchedulableBarrier(scheduler, threadCount))
package concpar22final02.instrumentation
import java.util.concurrent.*
import scala.concurrent.duration.*
import scala.collection.mutable.*
import Stats.*
import java.util.concurrent.atomic.AtomicInteger
sealed abstract class Result
case class RetVal(rets: List[Any]) extends Result
case class Except(msg: String, stackTrace: Array[StackTraceElement])
extends Result
case class Timeout(msg: String) extends Result
/** A class that maintains schedule and a set of thread ids. The schedules are
* advanced after an operation of a SchedulableBuffer is performed. Note: the
* real schedule that is executed may deviate from the input schedule due to
* the adjustments that had to be made for locks
class Scheduler(sched: List[Int]):
val maxOps =
500 // a limit on the maximum number of operations the code is allowed to perform
var waitCount: AtomicInteger = new AtomicInteger(0)
var notifyCount: AtomicInteger = new AtomicInteger(0)
var notifyAllCount: AtomicInteger = new AtomicInteger(0)
private var schedule = sched
var numThreads = 0
private val realToFakeThreadId = Map[Long, Int]()
private val opLog =
ListBuffer[String]() // a mutable list (used for efficient concat)
private val threadStates = Map[Int, ThreadState]()
/** Runs a set of operations in parallel as per the schedule. Each operation
* may consist of many primitive operations like reads or writes to shared
* data structure each of which should be executed using the function `exec`.
* @timeout
* in milliseconds
* @return
* true - all threads completed on time, false -some tests timed out.
def runInParallel(timeout: Long, ops: List[() => Any]): Result =
numThreads = ops.length
val threadRes = Array.fill(numThreads) { None: Any }
var exception: Option[(Throwable, Int)] = None
val syncObject = new Object()
var completed = new AtomicInteger(0)
// create threads
val threads = { case (op, i) =>
new Thread(
new Runnable():
def run(): Unit =
val fakeId = i + 1
val res = op()
threadRes(i) = res
// notify the main thread if all threads have completed
if completed.incrementAndGet() == ops.length then
syncObject.synchronized { syncObject.notifyAll() }
case e: Throwable
if exception != None => // do nothing here and silently fail
case e: Throwable =>
log(s"throw ${e.toString}")
exception = Some((e, fakeId))
syncObject.synchronized { syncObject.notifyAll() }
// println(s"$fakeId: ${e.toString}")
// Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads)
// start all threads
// wait for all threads to complete, or for an exception to be thrown, or for the time out to expire
var remTime = timeout
syncObject.synchronized {
timed {
if completed.get() != ops.length then syncObject.wait(timeout)
} { time =>
remTime -= time
if exception.isDefined then
s"Thread ${exception.get._2} crashed on the following schedule: \n" + opLog.mkString(
else if remTime <= 1
then // timeout ? using 1 instead of zero to allow for some errors
// every thing executed normally
// Updates the state of the current thread
def updateThreadState(state: ThreadState): Unit =
val tid = threadId
synchronized {
threadStates(tid) = state
state match
case Sync(lockToAquire, locks) =>
if locks.indexOf(lockToAquire) < 0 then waitForTurn
// Re-aqcuiring the same lock
updateThreadState(Running(lockToAquire +: locks))
case Start => waitStart()
case End => removeFromSchedule(tid)
case Running(_) =>
case _ => waitForTurn // Wait, SyncUnique, VariableReadWrite
def waitStart(): Unit =
// while (threadStates.size < numThreads) {
// Thread.sleep(1)
// }
synchronized {
if threadStates.size < numThreads then wait()
else notifyAll()
def threadLocks =
synchronized {
def threadState =
synchronized {
def mapOtherStates(f: ThreadState => ThreadState) =
val exception = threadId
synchronized {
for k <- threadStates.keys if k != exception do
threadStates(k) = f(threadStates(k))
def log(str: String) =
if (realToFakeThreadId contains Thread.currentThread().getId()) then
val space = (" " * ((threadId - 1) * 2))
val s =
space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + " ")
opLog += s
/** Executes a read or write operation to a global data structure as per the
* given schedule
* @param msg
* a message corresponding to the operation that will be logged
def exec[T](primop: => T)(
msg: => String,
postMsg: => Option[T => String] = None
): T =
if !(realToFakeThreadId contains Thread.currentThread().getId()) then primop
val m = msg
if m != "" then log(m)
if opLog.size > maxOps then
throw new Exception(
s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!"
val res = primop
postMsg match
case Some(m) => log(m(res))
case None =>
private def setThreadId(fakeId: Int) = synchronized {
realToFakeThreadId(Thread.currentThread.getId) = fakeId
def threadId =
try realToFakeThreadId(Thread.currentThread().getId())
case e: NoSuchElementException =>
throw new Exception(
"You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!"
private def isTurn(tid: Int) = synchronized {
(!schedule.isEmpty && schedule.head != tid)
def canProceed(): Boolean =
val tid = threadId
canContinue match
case Some((i, state)) if i == tid =>
// println(s"$tid: Runs ! Was in state $state")
canContinue = None
state match
case Sync(lockToAquire, locks) =>
updateThreadState(Running(lockToAquire +: locks))
case SyncUnique(lockToAquire, locks) =>
mapOtherStates {
_ match
case SyncUnique(lockToAquire2, locks2)
if lockToAquire2 == lockToAquire =>
Wait(lockToAquire2, locks2)
case e => e
updateThreadState(Running(lockToAquire +: locks))
case VariableReadWrite(locks) => updateThreadState(Running(locks))
case Some((i, state)) =>
// println(s"$tid: not my turn but $i !")
case None =>
var threadPreference =
0 // In the case the schedule is over, which thread should have the preference to execute.
/** returns true if the thread can continue to execute, and false otherwise */
def decide(): Option[(Int, ThreadState)] =
if !threadStates.isEmpty
then // The last thread who enters the decision loop takes the decision.
// println(s"$threadId: I'm taking a decision")
if threadStates.values.forall {
case e: Wait => true
case _ => false
val waiting =", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
throw new Exception(
s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them."
// Threads can be in Wait, Sync, SyncUnique, and VariableReadWrite mode.
// Let's determine which ones can continue.
val notFree = threadStates.collect { case (id, state) =>
val threadsNotBlocked = threadStates.toSeq.filter {
case (id, v: VariableReadWrite) => true
case (id, v: CanContinueIfAcquiresLock) =>
!notFree(v.lockToAquire) || (v.locks contains v.lockToAquire)
case _ => false
if threadsNotBlocked.isEmpty then
val waiting =", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
val whoHasLock = threadStates.toSeq.flatMap { case (id, state) => => (lock, id))
val reason = threadStates
.collect {
case (id, state: CanContinueIfAcquiresLock)
if !notFree(state.lockToAquire) =>
s"Thread $id is waiting on lock ${state.lockToAquire} held by thread ${whoHasLock(state.lockToAquire)}"
throw new Exception(
s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason"
else if threadsNotBlocked.size == 1
then // Do not consume the schedule if only one thread can execute.
val next =
schedule.indexWhere(t =>
threadsNotBlocked.exists { case (id, state) => id == t }
if next != -1 then
// println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}")
val chosenOne =
schedule(next) // TODO: Make schedule a mutable list.
schedule = schedule.take(next) ++ schedule.drop(next + 1)
Some((chosenOne, threadStates(chosenOne)))
threadPreference = (threadPreference + 1) % threadsNotBlocked.size
val chosenOne =
threadsNotBlocked(threadPreference) // Maybe another strategy
// threadsNotBlocked.indexOf(threadId) >= 0
val tnb =",")
val s = if (schedule.isEmpty) "empty" else schedule.mkString(",")
val only = if (schedule.isEmpty) "" else " only"
throw new Exception(s"The schedule is $s but$only threads ${tnb} can continue")*/
else canContinue
/** This will be called before a schedulable operation begins. This should not
* use synchronized
var numThreadsWaiting = new AtomicInteger(0)
// var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice.
var canContinue: Option[(Int, ThreadState)] =
None // The result of the decision thread Id of the thread authorized to continue.
private def waitForTurn =
synchronized {
if numThreadsWaiting.incrementAndGet() == threadStates.size then
canContinue = decide()
// waitingForDecision(threadId) = Some(numThreadsWaiting)
// println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}")
while !canProceed() do wait()
/** To be invoked when a thread is about to complete
private def removeFromSchedule(fakeid: Int) = synchronized {
// println(s"$fakeid: I'm taking a decision because I finished")
schedule = schedule.filterNot(_ == fakeid)
threadStates -= fakeid
if numThreadsWaiting.get() == threadStates.size then
canContinue = decide()
def getOperationLog() = opLog
package concpar22final02.instrumentation
import scala.util.Random
import scala.collection.mutable.{Map as MutableMap}
import Stats.*
object TestHelper:
val noOfSchedules = 10000 // set this to 100k during deployment
val readWritesPerThread =
20 // maximum number of read/writes possible in one thread
val contextSwitchBound = 10
val testTimeout = 240 // the total time out for a test in seconds
val schedTimeout =
15 // the total time out for execution of a schedule in secs
// Helpers
/*def testManySchedules(op1: => Any): Unit = testManySchedules(List(() => op1))
def testManySchedules(op1: => Any, op2: => Any): Unit = testManySchedules(List(() => op1, () => op2))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any, op4: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3, () => op4))*/
def testSequential[T](ops: Scheduler => Any)(assertions: T => (
)) =
(sched: Scheduler) =>
List(() => ops(sched)),
(res: List[Any]) => assertions(res.head.asInstanceOf[T])
/** @numThreads
* number of threads
* @ops
* operations to be executed, one per thread
* @assertion
* as condition that will executed after all threads have completed
* (without exceptions) the arguments are the results of the threads
def testManySchedules(
numThreads: Int,
ops: Scheduler => (
List[() => Any], // Threads
List[Any] => (Boolean, String)
) // Assertion
) =
var timeout = testTimeout * 1000L
val threadIds = (1 to numThreads)
// (1 to scheduleLength).flatMap(_ => threadIds).toList.permutations.take(noOfSchedules).foreach {
val schedules = (new ScheduleGenerator(numThreads)).schedules()
var schedsExplored = 0
schedules.takeWhile(_ =>
schedsExplored <= noOfSchedules && timeout > 0
).foreach {
// case _ if timeout <= 0 => // break
case schedule =>
schedsExplored += 1
val schedr = new Scheduler(schedule)
// println("Exploring Sched: "+schedule)
val (threadOps, assertion) = ops(schedr)
if threadOps.size != numThreads then
throw new IllegalStateException(
s"Number of threads: $numThreads, do not match operations of threads: $threadOps"
timed { schedr.runInParallel(schedTimeout * 1000, threadOps) } { t =>
timeout -= t
} match
case Timeout(msg) =>
throw new java.lang.AssertionError(
"assertion failed\n" + "The schedule took too long to complete. A possible deadlock! \n" + msg
case Except(msg, stkTrace) =>
val traceStr =
"Thread Stack trace: \n" +
" at " + _.toString
throw new java.lang.AssertionError(
"assertion failed\n" + msg + "\n" + traceStr
case RetVal(threadRes) =>
// check the assertion
val (success, custom_msg) = assertion(threadRes)
if !success then
val msg =
"The following schedule resulted in wrong results: \n" + custom_msg + "\n" + schedr
throw new java.lang.AssertionError("Assertion failed: " + msg)
if timeout <= 0 then
throw new java.lang.AssertionError(
"Test took too long to complete! Cannot check all schedules as your code is too slow!"
/** A schedule generator that is based on the context bound
class ScheduleGenerator(numThreads: Int):
val scheduleLength = readWritesPerThread * numThreads
val rands =
(1 to scheduleLength).map(i =>
new Random(0xcafe * i)
) // random numbers for choosing a thread at each position
def schedules(): LazyList[List[Int]] =
var contextSwitches = 0
var contexts =
List[Int]() // a stack of thread ids in the order of context-switches
val remainingOps = MutableMap[Int, Int]()
remainingOps ++= (1 to numThreads).map(i =>
(i, readWritesPerThread)
) // num ops remaining in each thread
val liveThreads = (1 to numThreads).toSeq.toBuffer
/** Updates remainingOps and liveThreads once a thread is chosen for a
* position in the schedule
def updateState(tid: Int): Unit =
val remOps = remainingOps(tid)
if remOps == 0 then liveThreads -= tid
else remainingOps += (tid -> (remOps - 1))
val schedule = rands.foldLeft(List[Int]()) {
case (acc, r) if contextSwitches < contextSwitchBound =>
val tid = liveThreads(r.nextInt(liveThreads.size))
contexts match
case prev :: tail
if prev != tid => // we have a new context switch here
contexts +:= tid
contextSwitches += 1
case prev :: tail =>
case _ => // init case
contexts +:= tid
acc :+ tid
case (
) => // here context-bound has been reached so complete the schedule without any more context switches
if !contexts.isEmpty then
contexts = contexts.dropWhile(remainingOps(_) == 0)
val tid = contexts match
case top :: tail => top
case _ =>
) // here, there has to be threads that have not even started
acc :+ tid
schedule #:: schedules()
package concpar22final02.instrumentation
import scala.concurrent.*
import scala.concurrent.duration.*
object TestUtils:
def failsOrTimesOut[T](action: => T): Boolean =
val asyncAction = Future {
try Await.result(asyncAction, 2000.millisecond)
catch case _: Throwable => return true
return false
package concpar22final03
import scala.concurrent.Future
import scala.concurrent.Future
import scala.util.Random
trait EconomicsTest extends Economics:
val ownedCards: collection.mutable.Set[Card] = collection.mutable.Set[Card]()
def owned(c: Card): Boolean = ownedCards(c)
def isMine(c: Card): Boolean = ownedCards(c)
override def valueOf(cardName: String): Int = List(1, cardName.length).max
/** This is a container for an exact amount of money that can be hold, spent,
* or put in the bank
val moneyInMoneyBag = collection.mutable.Map[MoneyBag, Int]()
def moneyIn(m: MoneyBag): Int = moneyInMoneyBag.getOrElse(m, 0)
/** If you sell a card, at some point in the future you will get some money
* (in a bag).
def sellCard(c: Card): Future[MoneyBag] =
Future {
if owned(c) then
throw Exception(
"This card doesn't belong to you or has already been sold, you can't sell it."
/** You can buy any "Scala: The Programming" card by providing a bag of money
* with the appropriate amount and waiting for the transaction to take place
def buyCard(bag: MoneyBag, name: String): Future[Card] =
Future {
synchronized {
if moneyIn(bag) != valueOf(name) then
throw Exception(
"You didn't provide the exact amount of money necessary to buy this card."
else moneyInMoneyBag.update(bag, 0)
/** This simple bank account hold money for you. You can bring a money bag to
* increase your account, or withdraw a money bag of any size not greater
* than your account's balance.
private var balance_ = initialBalance()
def balance: Int = balance_
def withdraw(amount: Int): Future[MoneyBag] =
Future {
if balance_ >= amount then
balance_ -= amount
throw new Exception(
"You try to withdraw more money than you have on your account"
def deposit(bag: MoneyBag): Future[Unit] =
Future {
synchronized {
if moneyInMoneyBag(bag) == 0 then
throw new Exception("You are depositing en empty bag!")
balance_ += moneyIn(bag)
moneyInMoneyBag.update(bag, 0)
def sellWaitTime(): Int
def buyWaitTime(): Int
def withdrawWaitTime(): Int
def depositWaitTime(): Int
def initialBalance(): Int
def getMoneyBag(i: Int) =
val m = MoneyBag()
synchronized(moneyInMoneyBag.update(m, i))
def getCard(n: String): Card =
val c = Card(n)
synchronized(ownedCards.update(c, true))
package concpar22final03
import scala.concurrent.duration.*
import scala.concurrent.{Await, Future}
import scala.util.{Try, Success, Failure}
class Problem3Suite extends munit.FunSuite:
trait Prob3Test extends Problem3:
override val economics: EconomicsTest
class Test1 extends Prob3Test:
override val economics: EconomicsTest = new EconomicsTest:
override def sellWaitTime() = 10
override def buyWaitTime() = 20
override def depositWaitTime() = 30
override def withdrawWaitTime() = 40
override def initialBalance() = 0
class Test2 extends Prob3Test:
override val economics: EconomicsTest = new EconomicsTest:
override def sellWaitTime() = 100
override def buyWaitTime() = 5
override def depositWaitTime() = 50
override def withdrawWaitTime() = 5
override def initialBalance() = 0
class Test3 extends Prob3Test:
override val economics: EconomicsTest = new EconomicsTest:
val rgen = new scala.util.Random(666)
override def sellWaitTime() = rgen.nextInt(100)
override def buyWaitTime() = rgen.nextInt(100)
override def depositWaitTime() = rgen.nextInt(100)
override def withdrawWaitTime() = rgen.nextInt(100)
override def initialBalance() = 0
class Test4 extends Prob3Test:
var counter = 5
def next(): Int =
counter = counter + 5 % 119
override val economics: EconomicsTest = new EconomicsTest:
override def sellWaitTime() = next()
override def buyWaitTime() = next()
override def depositWaitTime() = next()
override def withdrawWaitTime() = next()
override def initialBalance() = next()
def testCases = List(new Test1, new Test2)
def unevenTestCases = List(new Test3, new Test4)
def tot(cards: List[String]): Int =[Int]((n: String) => n.length).sum
def testOk(
t: Prob3Test,
money: Int,
sold: List[String],
wanted: List[String]
): Unit =
import t.*
import economics.*
val f = orderDeck(getMoneyBag(money),, wanted)
val r = Await.ready(f, 3.seconds).value.get
r match
case Success(d) =>
assertEquals(, wanted.sorted)
assertEquals(d.length, wanted.length)
assertEquals(isMine(d.head), true)
case Failure(e) => ()
def testFailure(
t: Prob3Test,
money: Int,
sold: List[String],
wanted: List[String]
): Unit =
import t.*
import economics.*
val f = orderDeck(getMoneyBag(money),, wanted)
val r = Await.ready(f, 3.seconds).value.get
r match
case Failure(e: NotEnoughMoneyException) => ()
case _ => fail(
"Should have thrown a NotEnoughMoneyException exception, but did not"
// --- Without sold cards ---
"Should work correctly when a single card is asked with enough money (no card sold) (20pts)"
) {
testCases.foreach(t => testOk(t, 7, Nil, List("Tefeiri")))
"Should work correctly when a single card is asked with enough money (no card sold, uneven waiting time) (10pts)"
) {
unevenTestCases.foreach(t => testOk(t, 7, Nil, List("Tefeiri")))
"Should work correctly when multiple cards are asked with enough money (no card sold) (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
testCases.foreach(t => testOk(t, tot(cards), Nil, cards))
"Should work correctly when multiple cards are asked with enough money (no card sold, uneven waiting time) (10pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
unevenTestCases.foreach(t => testOk(t, tot(cards), Nil, cards))
"Should work correctly when asked duplicates of cards, with enough money (no card sold) (20pts)"
) {
val cards = List("aaaa", "aaaa", "aaaa", "dd", "dd", "dd", "dd")
testCases.foreach(t => testOk(t, tot(cards), Nil, cards))
"Should work correctly when asked duplicates of cards, with enough money (no card sold, uneven waiting time) (10pts)"
) {
val cards = List("aaaa", "aaaa", "aaaa", "dd", "dd", "dd", "dd")
unevenTestCases.foreach(t => testOk(t, tot(cards), Nil, cards))
// --- With sold cards ---
"Should work correctly when a single card is bought and a single of the same price is sold (20pts)"
) {
testCases.foreach(t => testOk(t, 0, List("Chandra"), List("Tefeiri")))
"Should work correctly when a single card is bought and a single of the same price is sold (uneven waiting time) (10pts)"
) {
unevenTestCases.foreach(t => testOk(t, 0, List("Chandra"), List("Tefeiri")))
"Should work correctly when multiple cards are asked and multiple of matching values are sold (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("1111111", "2", "3333", "44", "55555", "666", "7777")
testCases.foreach(t => testOk(t, 0, sold, cards))
"Should work correctly when multiple cards are asked and multiple of matching values are sold (uneven waiting time) (10pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("1111111", "2", "3333", "44", "55555", "666", "7777")
unevenTestCases.foreach(t => testOk(t, 0, sold, cards))
"Should work correctly when multiple cards are asked and multiple of the same total value are sold (20pts)"
) {
val cards2 = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold2 = List("111111111", "22", "3", "44", "555555", "666", "777")
assert(tot(sold2) == tot(cards2))
testCases.foreach(t => testOk(t, 0, sold2, cards2))
"Should work correctly when multiple cards are asked and multiple of the same total value are sold (uneven waiting time) (10pts)"
) {
val cards2 = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold2 = List("111111111", "22", "3", "44", "555555", "666", "777")
assert(tot(sold2) == tot(cards2))
unevenTestCases.foreach(t => testOk(t, 0, sold2, cards2))
"Should work correctly when given money and sold cards are sufficient for the wanted cards (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("11111", "2", "33", "44", "5555", "666", "777")
val bagMoney = tot(cards) - tot(sold)
testCases.foreach(t => testOk(t, bagMoney, sold, cards))
"Should work correctly when given money and sold cards are sufficient for the wanted cards (uneven waiting time) (10pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("11111", "2", "33", "44", "5555", "666", "777")
val bagMoney = tot(cards) - tot(sold)
unevenTestCases.foreach(t => testOk(t, bagMoney, sold, cards))
// --- Failures ---
"Should return a failure when too little money is provided (no card sold) (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
testCases.foreach(t => testFailure(t, tot(cards) - 1, Nil, cards))
testCases.foreach(t => testFailure(t, tot(cards) - 50, Nil, cards))
"Should return a failure when too little money or sold cards are provided (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("11111", "2", "33", "44", "5555", "666", "777")
val bagMoney = tot(cards) - tot(sold)
testCases.foreach(t => testFailure(t, bagMoney - 2, sold, cards))
package concpar22final04
import akka.testkit.*
import akka.pattern.*
import akka.util.Timeout
import concurrent.duration.*
import User.Protocol.*
import User.Responses.*
import SongsStore.Protocol.*
import SongsStore.Responses.*
import scala.util.{Try, Success, Failure}
import com.typesafe.config.ConfigFactory
import java.util.Date
import scala.util.Random
class Problem4Suite extends munit.FunSuite:
test("after receiving GetInfo, should answer with Info (20pts)") {
new MyTestKit:
def tests() =
ada ! GetInfo
expectMsg(Info("1", "Ada"))
"after receiving GetHomepageData, should answer with the correct HomepageData when there is no liked songs and no activity items (30pts)"
) {
new MyTestKit:
def tests() =
ada ! GetHomepageData
expectMsg(HomepageData(List(), List()))
"after receiving Like(1), should add 1 to the list of liked songs (20pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
ada ! GetHomepageData
expectMsg(HomepageData(List(1), List()))
"after receiving Like(1) and then Like(2), the list of liked songs should start with List(2, 1) (20pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
ada ! Like(2)
ada ! GetHomepageData
expectMsg(HomepageData(List(2, 1), List()))
"after receiving Like(1) and then Like(1), song 1 should be in the list of liked songs only once (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
ada ! Like(1)
ada ! GetHomepageData
expectMsg(HomepageData(List(1), List()))
"after receiving Like(1), Like(2) and then Like(1), the list of liked songs should start with List(2, 1) (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
ada ! Like(2)
ada ! Like(1)
ada ! GetHomepageData
expectMsg(HomepageData(List(2, 1), List()))
"after receiving Like(1), Unlike(1) and then Unlike(1), the list of liked songs should not contain song 1 (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
ada ! Like(2)
ada ! Like(1)
ada ! GetHomepageData
expectMsg(HomepageData(List(2, 1), List()))
"after receiving Subscribe(aUser) and then Play(5), should send AddActivity(Activity(\"1\", 5)) to aUser (20pts)"
) {
new MyTestKit:
def tests() =
ada ! Subscribe(self)
ada ! Play(5)
expectMsg(AddActivity(Activity("1", "Ada", 5)))
"after receiving Subscribe(aUser), Subscribe(bUser) and then Play(5), should send AddActivity(Activity(\"1\", 5)) to aUser (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Subscribe(self)
val donald = new TestProbe(system)
ada ! Subscribe(donald.ref)
ada ! Play(5)
expectMsg(AddActivity(Activity("1", "Ada", 5)))
donald.expectMsg(AddActivity(Activity("1", "Ada", 5)))
"after receiving Subscribe(aUser), Subscribe(aUser) and then Play(5), should send AddActivity(Activity(\"1\", 5)) to aUser only once (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Subscribe(self)
ada ! Subscribe(self)
ada ! Play(5)
expectMsg(AddActivity(Activity("1", "Ada", 5)))
"after receiving Subscribe(aUser), Unsubscribe(aUser) and then Play(5), should not send AddActivity(Activity(\"1\", 5)) to aUser (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Subscribe(self)
ada ! Play(5)
expectMsg(AddActivity(Activity("1", "Ada", 5)))
ada ! Unsubscribe(self)
ada ! Play(5)
"after receiving AddActivity(Activity(\"1\", 5)), Activity(\"1\", 5) should be in the activity feed (10pts)"
) {
new MyTestKit:
def tests() =
ada ! AddActivity(Activity("0", "Self", 5))
ada ! GetHomepageData
expectMsg(HomepageData(List(), List(Activity("0", "Self", 5))))
"after receiving AddActivity(Activity(\"1\", 5)) and AddActivity(Activity(\"1\", 6)), Activity(\"1\", 6) should be in the activity feed and Activity(\"1\", 5) should not (10pts)"
) {
new MyTestKit:
def tests() =
ada ! AddActivity(Activity("0", "Self", 5))
ada ! AddActivity(Activity("0", "Self", 6))
ada ! GetHomepageData
expectMsg(HomepageData(List(), List(Activity("0", "Self", 6))))
"after receiving GetHomepageText, should answer with a result containing \"Howdy $name!\" where $name is the user's name (10pts)"
) {
new MyTestKit:
def tests() =
val name = Random.alphanumeric.take(5).mkString
val randomUser =
system.actorOf(Props(classOf[User], "5", name, songsStore), "user-5")
randomUser ! GetHomepageText
expectMsgClass(classOf[HomepageText]).result.contains(f"Howdy $name!")
"after receiving GetHomepageText, should answer with the correct names of liked songs (1) (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(8)
ada ! Like(3)
ada ! Like(2)
ada ! GetHomepageText
|Liked Songs:
|* Sunny by Boney M.
|* J'irai où tu iras by Céline Dion & Jean-Jacques Goldman
|* Hold the line by TOTO
"after receiving GetHomepageText, should answer with the correct names of liked songs (2) (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(9)
ada ! Like(7)
ada ! GetHomepageText
|Liked Songs:
|* Straight Edge by Minor Threat
|* Anarchy in the UK by Sex Pistols
"after receiving GetHomepageText, should answer with the correct activity feed (1) (10pts)"
) {
new MyTestKit:
def tests() =
bob ! Subscribe(ada)
carol ! Subscribe(ada)
donald ! Subscribe(ada)
bob ! Play(3)
carol ! Play(8)
ada ! GetHomepageText
|Activity Feed:
|* Carol is listening to Hold the line by TOTO
|* Bob is listening to J'irai où tu iras by Céline Dion & Jean-Jacques Goldman
"after receiving GetHomepageText, should answer with the correct activity feed (2) (10pts)"
) {
new MyTestKit:
def tests() =
bob ! Subscribe(ada)
carol ! Subscribe(ada)
donald ! Subscribe(ada)
bob ! Play(9)
carol ! Play(10)
donald ! Play(6)
bob ! Play(7)
ada ! GetHomepageText
|Activity Feed:
|* Bob is listening to Straight Edge by Minor Threat
|* Donald is listening to Désenchantée by Mylène Farmer
|* Carol is listening to Breakfast in America by Supertramp
"after receiving GetHomepageText, should answer with the correct text (full test) (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
ada ! Like(2)
bob ! Subscribe(ada)
carol ! Subscribe(ada)
donald ! Subscribe(ada)
donald ! Play(3)
bob ! Play(4)
carol ! Play(5)
ada ! GetHomepageText
|Howdy Ada!
|Liked Songs:
|* Sunny by Boney M.
|* High Hopes by Pink Floyd
|Activity Feed:
|* Carol is listening to Strobe by deadmau5
|* Bob is listening to Ce monde est cruel by Vald
|* Donald is listening to J'irai où tu iras by Céline Dion & Jean-Jacques Goldman
abstract class MyTestKit
extends TestKit(ActorSystem("TestSystem"))
with ImplicitSender:
val songsStore = system.actorOf(Props(MockSongsStore()), "songsStore")
def makeAda() =
system.actorOf(Props(classOf[User], "1", "Ada", songsStore), "user-1")
val ada = makeAda()
val bob =
system.actorOf(Props(classOf[User], "2", "Bob", songsStore), "user-2")
val carol =
system.actorOf(Props(classOf[User], "3", "Carol", songsStore), "user-3")
val donald =
system.actorOf(Props(classOf[User], "4", "Donald", songsStore), "user-4")
def tests(): Unit
try tests()
finally shutdown(system)
package instrumentation
trait MockedMonitor extends Monitor:
def scheduler: Scheduler
// Can be overriden.
override def waitDefault() =
scheduler updateThreadState Wait(this, scheduler.threadLocks.tail)
override def synchronizedDefault[T](toExecute: => T): T =
scheduler.log("synchronized check")
val prevLocks = scheduler.threadLocks
scheduler updateThreadState Sync(
) // If this belongs to prevLocks, should just continue.
scheduler.log("synchronized -> enter")
scheduler updateThreadState Running(prevLocks)
scheduler.log("synchronized -> out")
override def notifyDefault() =
scheduler mapOtherStates {
state =>
state match
case Wait(lockToAquire, locks) if lockToAquire == this =>
SyncUnique(this, state.locks)
case e => e
override def notifyAllDefault() =
scheduler mapOtherStates {
state =>
state match
case Wait(lockToAquire, locks) if lockToAquire == this =>
Sync(this, state.locks)
case SyncUnique(lockToAquire, locks) if lockToAquire == this =>
Sync(this, state.locks)
case e => e
abstract class ThreadState:
def locks: Seq[AnyRef]
trait CanContinueIfAcquiresLock extends ThreadState:
def lockToAquire: AnyRef
case object Start extends ThreadState:
def locks: Seq[AnyRef] = Seq.empty
case object End extends ThreadState:
def locks: Seq[AnyRef] = Seq.empty
case class Wait(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState
case class SyncUnique(lockToAquire: AnyRef, locks: Seq[AnyRef])
extends ThreadState with CanContinueIfAcquiresLock
case class Sync(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState
with CanContinueIfAcquiresLock
case class Running(locks: Seq[AnyRef]) extends ThreadState
case class VariableReadWrite(locks: Seq[AnyRef]) extends ThreadState
package instrumentation
import java.util.concurrent.*;
import scala.concurrent.duration.*
import scala.collection.mutable.*
import Stats.*
import java.util.concurrent.atomic.AtomicInteger
sealed abstract class Result
case class RetVal(rets: List[Any]) extends Result
case class Except(msg: String, stackTrace: Array[StackTraceElement])
extends Result
case class Timeout(msg: String) extends Result
/** A class that maintains schedule and a set of thread ids. The schedules are
* advanced after an operation of a SchedulableBuffer is performed. Note: the
* real schedule that is executed may deviate from the input schedule due to
* the adjustments that had to be made for locks
class Scheduler(sched: List[Int]):
val maxOps =
500 // a limit on the maximum number of operations the code is allowed to perform
private var schedule = sched
private var numThreads = 0
private val realToFakeThreadId = Map[Long, Int]()
private val opLog =
ListBuffer[String]() // a mutable list (used for efficient concat)
private val threadStates = Map[Int, ThreadState]()
/** Runs a set of operations in parallel as per the schedule. Each operation
* may consist of many primitive operations like reads or writes to shared
* data structure each of which should be executed using the function `exec`.
* @timeout
* in milliseconds
* @return
* true - all threads completed on time, false -some tests timed out.
def runInParallel(timeout: Long, ops: List[() => Any]): Result =
numThreads = ops.length
val threadRes = Array.fill(numThreads) { None: Any }
var exception: Option[Except] = None
val syncObject = new Object()
var completed = new AtomicInteger(0)
// create threads
val threads = {
case (op, i) =>
new Thread(new Runnable():
def run(): Unit =
val fakeId = i + 1
val res = op()
threadRes(i) = res
// notify the main thread if all threads have completed
if completed.incrementAndGet() == ops.length then
syncObject.synchronized { syncObject.notifyAll() }
case e: Throwable
if exception != None => // do nothing here and silently fail
case e: Throwable =>
log(s"throw ${e.toString}")
exception = Some(Except(
s"Thread $fakeId crashed on the following schedule: \n" + opLog.mkString(
syncObject.synchronized { syncObject.notifyAll() }
// println(s"$fakeId: ${e.toString}")
// Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads)
// start all threads
// wait for all threads to complete, or for an exception to be thrown, or for the time out to expire
var remTime = timeout
syncObject.synchronized {
timed { if completed.get() != ops.length then syncObject.wait(timeout) } {
time => remTime -= time
if exception.isDefined then
else if remTime <= 1
then // timeout ? using 1 instead of zero to allow for some errors
// every thing executed normally
// Updates the state of the current thread
def updateThreadState(state: ThreadState): Unit =
val tid = threadId
synchronized {
threadStates(tid) = state
state match
case Sync(lockToAquire, locks) =>
if locks.indexOf(lockToAquire) < 0 then waitForTurn
// Re-aqcuiring the same lock
updateThreadState(Running(lockToAquire +: locks))
case Start => waitStart()
case End => removeFromSchedule(tid)
case Running(_) =>
case _ => waitForTurn // Wait, SyncUnique, VariableReadWrite
def waitStart(): Unit =
// while (threadStates.size < numThreads) {
// Thread.sleep(1)
// }
synchronized {
if threadStates.size < numThreads then
def threadLocks =
synchronized {
def threadState =
synchronized {
def mapOtherStates(f: ThreadState => ThreadState) =
val exception = threadId
synchronized {
for k <- threadStates.keys if k != exception do
threadStates(k) = f(threadStates(k))
def log(str: String) =
if (realToFakeThreadId contains Thread.currentThread().getId()) then
val space = (" " * ((threadId - 1) * 2))
val s =
space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + " ")
// println(s)
opLog += s
/** Executes a read or write operation to a global data structure as per the
* given schedule
* @param msg
* a message corresponding to the operation that will be logged
def exec[T](primop: => T)(
msg: => String,
postMsg: => Option[T => String] = None
): T =
if !(realToFakeThreadId contains Thread.currentThread().getId()) then
val m = msg
if m != "" then log(m)
if opLog.size > maxOps then
throw new Exception(
s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!"
val res = primop
postMsg match
case Some(m) => log(m(res))
case None =>
private def setThreadId(fakeId: Int) = synchronized {
realToFakeThreadId(Thread.currentThread.getId) = fakeId
def threadId =
case e: NoSuchElementException =>
throw new Exception(
"You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!"
private def isTurn(tid: Int) = synchronized {
(!schedule.isEmpty && schedule.head != tid)
def canProceed(): Boolean =
val tid = threadId
canContinue match
case Some((i, state)) if i == tid =>
// println(s"$tid: Runs ! Was in state $state")
canContinue = None
state match
case Sync(lockToAquire, locks) =>
updateThreadState(Running(lockToAquire +: locks))
case SyncUnique(lockToAquire, locks) =>
mapOtherStates {
_ match
case SyncUnique(lockToAquire2, locks2)
if lockToAquire2 == lockToAquire =>
Wait(lockToAquire2, locks2)
case e => e
updateThreadState(Running(lockToAquire +: locks))
case VariableReadWrite(locks) => updateThreadState(Running(locks))
case Some((i, state)) =>
// println(s"$tid: not my turn but $i !")
case None =>
var threadPreference =
0 // In the case the schedule is over, which thread should have the preference to execute.
/** returns true if the thread can continue to execute, and false otherwise */
def decide(): Option[(Int, ThreadState)] =
if !threadStates.isEmpty
then // The last thread who enters the decision loop takes the decision.
// println(s"$threadId: I'm taking a decision")
if threadStates.values.forall {
case e: Wait => true
case _ => false
val waiting =", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
throw new Exception(
s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them."
// Threads can be in Wait, Sync, SyncUnique, and VariableReadWrite mode.
// Let's determine which ones can continue.
val notFree = threadStates.collect { case (id, state) =>
val threadsNotBlocked = threadStates.toSeq.filter {
case (id, v: VariableReadWrite) => true
case (id, v: CanContinueIfAcquiresLock) =>
!notFree(v.lockToAquire) || (v.locks contains v.lockToAquire)
case _ => false
if threadsNotBlocked.isEmpty then
val waiting =", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
val whoHasLock = threadStates.toSeq.flatMap { case (id, state) => => (lock, id))
val reason = threadStates.collect {
case (id, state: CanContinueIfAcquiresLock)
if !notFree(state.lockToAquire) =>
s"Thread $id is waiting on lock ${state.lockToAquire} held by thread ${whoHasLock(state.lockToAquire)}"
throw new Exception(
s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason"
else if threadsNotBlocked.size == 1
then // Do not consume the schedule if only one thread can execute.
val next = schedule.indexWhere(t =>
threadsNotBlocked.exists { case (id, state) => id == t }
if next != -1 then
// println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}")
val chosenOne =
schedule(next) // TODO: Make schedule a mutable list.
schedule = schedule.take(next) ++ schedule.drop(next + 1)
Some((chosenOne, threadStates(chosenOne)))
threadPreference = (threadPreference + 1) % threadsNotBlocked.size
val chosenOne =
threadsNotBlocked(threadPreference) // Maybe another strategy
// threadsNotBlocked.indexOf(threadId) >= 0
val tnb =",")
val s = if (schedule.isEmpty) "empty" else schedule.mkString(",")
val only = if (schedule.isEmpty) "" else " only"
throw new Exception(s"The schedule is $s but$only threads ${tnb} can continue")*/
else canContinue
/** This will be called before a schedulable operation begins. This should not
* use synchronized
var numThreadsWaiting = new AtomicInteger(0)
// var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice.
var canContinue: Option[(Int, ThreadState)] =
None // The result of the decision thread Id of the thread authorized to continue.
private def waitForTurn =
synchronized {
if numThreadsWaiting.incrementAndGet() == threadStates.size then
canContinue = decide()
// waitingForDecision(threadId) = Some(numThreadsWaiting)
// println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}")
while !canProceed() do wait()
/** To be invoked when a thread is about to complete
private def removeFromSchedule(fakeid: Int) = synchronized {
// println(s"$fakeid: I'm taking a decision because I finished")
schedule = schedule.filterNot(_ == fakeid)
threadStates -= fakeid
if numThreadsWaiting.get() == threadStates.size then
canContinue = decide()
def getOperationLog() = opLog
package instrumentation
/** A collection of methods that can be used to collect run-time statistics */
object Stats:
def timed[T](code: => T)(cont: Long => Unit): T =
var t1 = System.currentTimeMillis()
val r = code
cont((System.currentTimeMillis() - t1))
def withTime[T](code: => T): (T, Long) =
var t1 = System.currentTimeMillis()
val r = code
(r, (System.currentTimeMillis() - t1))
package instrumentation
import scala.util.Random
import scala.collection.mutable.{Map as MutableMap}
import Stats.*
object TestHelper:
val noOfSchedules = 10000 // set this to 100k during deployment
val readWritesPerThread =
20 // maximum number of read/writes possible in one thread
val contextSwitchBound = 10
val testTimeout = 240 // the total time out for a test in seconds
val schedTimeout =
15 // the total time out for execution of a schedule in secs
// Helpers
/*def testManySchedules(op1: => Any): Unit = testManySchedules(List(() => op1))
def testManySchedules(op1: => Any, op2: => Any): Unit = testManySchedules(List(() => op1, () => op2))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any, op4: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3, () => op4))*/
def testSequential[T](ops: Scheduler => Any)(assertions: T => (
)) =
(sched: Scheduler) =>
List(() => ops(sched)),
(res: List[Any]) => assertions(res.head.asInstanceOf[T])
/** @numThreads
* number of threads
* @ops
* operations to be executed, one per thread
* @assertion
* as condition that will executed after all threads have completed
* (without exceptions) the arguments are the results of the threads
def testManySchedules(
numThreads: Int,
ops: Scheduler => (
List[() => Any], // Threads
List[Any] => (Boolean, String)
) // Assertion
) =
var timeout = testTimeout * 1000L
val threadIds = (1 to numThreads)
// (1 to scheduleLength).flatMap(_ => threadIds).toList.permutations.take(noOfSchedules).foreach {
val schedules = (new ScheduleGenerator(numThreads)).schedules()
var schedsExplored = 0
schedules.takeWhile(_ =>
schedsExplored <= noOfSchedules && timeout > 0
).foreach {
// case _ if timeout <= 0 => // break
case schedule =>
schedsExplored += 1
val schedr = new Scheduler(schedule)
// println("Exploring Sched: "+schedule)
val (threadOps, assertion) = ops(schedr)
if threadOps.size != numThreads then
throw new IllegalStateException(
s"Number of threads: $numThreads, do not match operations of threads: $threadOps"
timed { schedr.runInParallel(schedTimeout * 1000, threadOps) } { t =>
timeout -= t
} match
case Timeout(msg) =>
throw new java.lang.AssertionError(
"assertion failed\n" + "The schedule took too long to complete. A possible deadlock! \n" + msg
case Except(msg, stkTrace) =>
val traceStr = "Thread Stack trace: \n" +
" at " + _.toString
throw new java.lang.AssertionError(
"assertion failed\n" + msg + "\n" + traceStr
case RetVal(threadRes) =>
// check the assertion
val (success, custom_msg) = assertion(threadRes)
if !success then
val msg =
"The following schedule resulted in wrong results: \n" + custom_msg + "\n" + schedr.getOperationLog().mkString(
throw new java.lang.AssertionError("Assertion failed: " + msg)
if timeout <= 0 then
throw new java.lang.AssertionError(
"Test took too long to complete! Cannot check all schedules as your code is too slow!"
/** A schedule generator that is based on the context bound
class ScheduleGenerator(numThreads: Int):
val scheduleLength = readWritesPerThread * numThreads
val rands =
(1 to scheduleLength).map(i =>
new Random(0xcafe * i)
) // random numbers for choosing a thread at each position
def schedules(): LazyList[List[Int]] =
var contextSwitches = 0
var contexts =
List[Int]() // a stack of thread ids in the order of context-switches
val remainingOps = MutableMap[Int, Int]()
remainingOps ++= (1 to numThreads).map(i =>
(i, readWritesPerThread)
) // num ops remaining in each thread
val liveThreads = (1 to numThreads).toSeq.toBuffer
/** Updates remainingOps and liveThreads once a thread is chosen for a
* position in the schedule
def updateState(tid: Int): Unit =
val remOps = remainingOps(tid)
if remOps == 0 then
liveThreads -= tid
remainingOps += (tid -> (remOps - 1))
val schedule = rands.foldLeft(List[Int]()) {
case (acc, r) if contextSwitches < contextSwitchBound =>
val tid = liveThreads(r.nextInt(liveThreads.size))
contexts match
case prev :: tail
if prev != tid => // we have a new context switch here
contexts +:= tid
contextSwitches += 1
case prev :: tail =>
case _ => // init case
contexts +:= tid
acc :+ tid
case (
) => // here context-bound has been reached so complete the schedule without any more context switches
if !contexts.isEmpty then
contexts = contexts.dropWhile(remainingOps(_) == 0)
val tid = contexts match
case top :: tail => top
case _ =>
) // here, there has to be threads that have not even started
acc :+ tid
schedule #:: schedules()
package instrumentation
import scala.concurrent.*
import scala.concurrent.duration.*
import org.junit.Assert.*
object TestUtils:
def failsOrTimesOut[T](action: => T): Boolean =
val asyncAction = Future {
Await.result(asyncAction, 2000.millisecond)
case _: Throwable => return true
return false
def assertDeadlock[T](action: => T): Unit =
throw new AssertionError("No error detected.")
case e: AssertionError =>
assert(e.getMessage.contains("Deadlock"), "No deadlock detected.")
def assertMaybeDeadlock[T](action: => T): Unit =
throw new AssertionError("No error detected.")
case e: AssertionError =>
e.getMessage.contains("A possible deadlock!"),
"No deadlock detected."
package midterm22
import org.junit.*
import org.junit.Assert.*
import instrumentation.*
class Mock2Test:
def test() =
scheduler =>
val a = new ScheduledAccount(50, scheduler)
val b = new ScheduledAccount(70, scheduler)
() => a.transfer(b, 10),
() => b.transfer(a, 10)
results => (true, "")
class ScheduledAccount(n: Int, val scheduler: Scheduler)
extends Account(n)
with MockedMonitor
package midterm22
import org.junit.*
import org.junit.Assert.*
class Part1Test:
val testArray =
Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19)
def testQuestion1Pos() =
val tasksCreatedBefore = tasksCreated.get
assertEquals(Some(18), find(testArray, 18, 3))
assertEquals(10, tasksCreated.get - tasksCreatedBefore)
def testQuestion1Neg() =
assertEquals(find(testArray, 20, 3), None)
def testQuestion2Pos(): Unit =
assertEquals(findAggregated(testArray, 18), Some(18))
def testQuestion2Neg(): Unit =
assertEquals(findAggregated(testArray, 20), None)
package midterm22
import org.junit.*
import org.junit.Assert.*
class Part2Test:
val testArray2 = Array(0, 50, 7, 1, 28, 42)
val testList2 = List(0, 50, 7, 1, 28, 42)
def testQuestion4Pos() =
assert(contains(testArray2, 7))
def testQuestion4Neg() =
assert(!contains(testArray2, 8))
def testQuestion6Pos() =
assert(contains(testList2, 7))
def testQuestion6Neg() =
assert(!contains(testList2, 8))
package midterm22
import instrumentation.Monitor
import instrumentation.MockedMonitor
import org.junit.*
import org.junit.Assert.*
import instrumentation.*
import java.util.concurrent.atomic.AtomicInteger
class Part4Test:
// This test can result in a deadlock because locks can be called in any
// order. Here, Thread 1 locks Node 3 first and then Node 2, whereas Thread 2
// locks Node 2 first and then Node 3. This will lead to a deadlock.
def testQuestion9() =
scheduler =>
val allNodes =
(for i <- 0 to 6 yield ScheduledNode(i, scheduler)).toList
// Shared by all threads
var sum: Int = 0
def increment(e: Int) = sum += e
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes
lockFun(nodes, increment)
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes
lockFun(nodes, increment),
results => (true, "")
// This will not lead to a deadlock because the lock acquire happens in a
// particular order. Thread 1 acquires locks in order 1->2->3->4, whereas
// Thread 2 acquires locks in order 2->3->5.
def testQuestion10() =
scheduler =>
val allNodes =
(for i <- 0 to 6 yield ScheduledNode(i, scheduler)).toList
// Shared by all threads
var sum: Int = 0
def increment(e: Int) = sum += e
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes.sortWith((x, y) => x.guid > y.guid)
lockFun(nodes, increment)
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes.sortWith((x, y) => x.guid > y.guid)
lockFun(nodes, increment),
results => (true, "")
// This will not lead to a deadlock because the lock acquire happens in a
// particular order. Thread 1 acquires locks in order 4->3->2->1, whereas
// Thread 2 acquires locks in order 5->3->2.
def testQuestion11() =
scheduler =>
val allNodes =
(for i <- 0 to 6 yield ScheduledNode(i, scheduler)).toList
// Shared by all threads
var sum: Int = 0
def increment(e: Int) = sum += e
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment)
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment),
results => (true, "")
// This test can result in a deadlock because locks are not called in any
// order. Thread 1 acquire order (3->2->4->1), Thread 2 acquire order
// (2->3->5). Thread 1 locks Node3 first and then Node2, whereas Thread 2
// locks Node 2 first and then Node3. This will lead to a deadlock.
def testQuestion12() =
scheduler =>
val allNodes =
(for i <- 0 to 6 yield ScheduledNode(i, scheduler)).toList
// Shared by all threads
var sum: Int = 0
def increment(e: Int) = sum += e
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes.tail.appended(nodes(0))
lockFun(nodes, increment)
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes.tail.appended(nodes(0))
lockFun(nodes, increment),
results => (true, "")
// sum returns wrong answer because there is a data race on the sum variable.
@Test(expected = classOf[AssertionError])
def testQuestion13() =
scheduler =>
val allNodes =
(for i <- 0 to 6 yield ScheduledNode(i, scheduler)).toList
// Shared by all threads
var sum: Int = 0
def increment(e: Int) =
val previousSum = scheduler.exec { sum }("Get sum")
scheduler.exec { sum = previousSum + e }("Write sum")
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment)
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment),
results =>
if sum != 20 then
(false, f"Wrong sum: expected 20 but got $sum")
(true, "")
// sum value will be correct here because "sum += e" is protected by a lock.
def testQuestion14() =
sched =>
val allNodes = (for i <- 0 to 6 yield ScheduledNode(i, sched)).toList
val monitor = new MockedMonitor: // Monitor is a type of a lock.
def scheduler = sched
// Shared by all threads
var sum: Int = 0
def increment(e: Int) =
monitor.synchronized { sum += e }
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment)
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment),
results =>
if sum != 20 then
(false, f"Wrong sum: expected 20 but got $sum")
(true, "")
// total will give correct output here as it is an atomic instruction.
def testQuestion15() =
sched =>
val allNodes = (for i <- 0 to 6 yield ScheduledNode(i, sched)).toList
// Shared by all threads
var total: AtomicInteger = new AtomicInteger(0)
def increment(e: Int) =
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment)
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment),
results =>
if total.get != 20 then
(false, f"Wrong total: expected 20 but got $total")
(true, "")
class ScheduledNode(value: Int, val scheduler: Scheduler) extends Node(value)
with MockedMonitor
package midterm22
import org.junit.*
import org.junit.Assert.*
import instrumentation.*
class Part6Test:
@Test(expected = classOf[AssertionError])
def testQuestion21() =
scheduler =>
val ticketsManager = ScheduledTicketsManager(1, scheduler)
() =>
// Thread 1
() =>
// Thread 2
results =>
if ticketsManager.remainingTickets < 0 then
(false, "Sold more tickets than available!")
else (true, "")
class ScheduledTicketsManager(totalTickets: Int, val scheduler: Scheduler)
extends TicketsManager(totalTickets)
with MockedMonitor
package midterm22
import org.junit.*
import org.junit.Assert.*
import instrumentation.*
class Part7Test:
def testNicManagerSequential() =
val nicsManager = NICManager(4)
assertEquals((0, 1), nicsManager.assignNICs())
assertEquals((2, 3), nicsManager.assignNICs())
def testQuestion22() =
testNicManagerParallel(2, 3)
def testQuestion23() =
val nicsManager = NICManager(2)
// Thread 1
assertEquals((0, 1), nicsManager.assignNICs())
nicsManager.nics(0).assigned = false
nicsManager.nics(1).assigned = false
// Thread 2
assertEquals((0, 1), nicsManager.assignNICs())
nicsManager.nics(0).assigned = false
nicsManager.nics(1).assigned = false
def testQuestion24() =
testNicManagerParallel(3, 2, true)
def testQuestion24NotLimitingRecvNICs() =
testNicManagerParallel(3, 2)
def testNicManagerParallel(
threads: Int,
nics: Int,
limitRecvNICs: Boolean = false
) =
scheduler =>
val nicsManager = ScheduledNicsManager(nics, scheduler)
val tasks =
for i <- 0 until threads yield () =>
// Thread i
val (recvNIC, sendNIC) = nicsManager.assignNICs(limitRecvNICs)
// Do something with NICs...
// Un-assign NICs
nicsManager.nics(recvNIC).assigned = false
nicsManager.nics(sendNIC).assigned = false
results =>
if nicsManager.nics.count(_.assigned) != 0 then
(false, f"All NICs should have been released.")
else (true, "")
class ScheduledNicsManager(n: Int, scheduler: Scheduler)
extends NICManager(n):
class ScheduledNIC(
_index: Int,
_assigned: Boolean,
val scheduler: Scheduler
) extends NIC(_index, _assigned)
with MockedMonitor:
override def index = scheduler.exec { super.index }(
Some(res => f"read NIC.index == $res")
override def assigned = scheduler.exec { super.assigned }(
Some(res => f"read NIC.assigned == $res")
override def assigned_=(v: Boolean) =
scheduler.exec { super.assigned = v }(
f"write NIC.assigned = $v"
override val nics =
(for i <- 0 until n yield ScheduledNIC(i, false, scheduler)).toList
package midterm22
import org.junit.*
import org.junit.Assert.*
import instrumentation.*
import annotation.nowarn
import scala.collection.concurrent.TrieMap
import scala.collection.concurrent.{TrieMap, Map}
class Part8Test:
def usage() =
val insta = Instagram()
assertEquals(1, insta.add())
assertEquals(2, insta.add())
insta.follow(1, 2)
assertEquals(insta.graph, Map(1 -> List(2), 2 -> List()))
insta.follow(2, 1)
insta.unfollow(1, 2)
assertEquals(insta.graph, Map(1 -> List(), 2 -> List(1)))
insta.follow(3, 1) // fails silently
assertEquals(insta.graph, Map(1 -> List(), 2 -> List(1)))
assertEquals(insta.graph, Map(2 -> List()))
insta.unfollow(1, 2) // fails silently
def testParallelFollowABRemoveA() =
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
val u1 = insta.add()
val u2 = insta.add()
() =>
// Thread 1
insta.follow(u1, u2),
() =>
// Thread 2
results =>
val size = insta.graph.size
if size != 1 then
(false, f"Wrong number of user: expected 1 but got ${size}")
else validateGraph(insta)
def testParallelFollowABRemoveB() =
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
val u1 = insta.add()
val u2 = insta.add()
() =>
// Thread 1
insta.follow(u1, u2),
() =>
// Thread 2
results =>
val size = insta.graph.size
if size != 1 then
(false, f"Wrong number of user: expected 1 but got ${size}")
else validateGraph(insta)
def testParallelFollowACRemoveB() =
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
val u1 = insta.add()
val u2 = insta.add()
val u3 = insta.add()
insta.follow(u1, u2)
() =>
// Thread 1
insta.follow(u1, u3),
() =>
// Thread 2
results =>
val size = insta.graph.size
if size != 2 then
(false, f"Wrong number of user: expected 2 but got ${size}")
else validateGraph(insta)
def testParallelFollow() =
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
val u1 = insta.add()
val u2 = insta.add()
val u3 = insta.add()
() =>
// Thread 1
insta.follow(u1, u2),
() =>
// Thread 2
insta.follow(u1, u3)
results =>
val u1FollowingSize = insta.graph(u1).size
if u1FollowingSize != 2 then
f"Wrong number of users followed by user 1: expected 2 but got ${u1FollowingSize}"
else validateGraph(insta)
def testParallelRemove() =
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
// Setup
val u1 = insta.add()
val u2 = insta.add()
val u3 = insta.add()
insta.follow(u1, u2)
insta.follow(u2, u1)
insta.follow(u2, u3)
insta.follow(u3, u1)
() =>
// Thread 1
() =>
// Thread 2
results =>
val size = insta.graph.size
if size != 1 then
(false, f"Wrong number of user: expected 1 but got ${size}")
else validateGraph(insta)
// We test wrong code here, so we expect an assertion error. You can replace
// the next line by `@Test` if you want to see the error with the failing
// schedule.
@Test(expected = classOf[AssertionError])
def testParallelWrongAdd() =
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
// This implementation of `add` is wrong, because two threads might
// allocate the same id.
// Consider the following schedule:
// T1: res = 1
// T2: res = 2
// T2: graph.update(2, Nil)
// T2: 2
// T1: graph.update(2, Nil)
// T1: 2
override def add(): Int =
val res = maxId.incrementAndGet
graph.update(maxId.get, Nil)
() =>
// Thread 1
() =>
// Thread 2
results =>
if results(0) != results(1) then
(false, f"Allocated twice id ${results(0)}")
else validateGraph(insta)
// We test wrong code here, so we expect an assertion error. You can replace
// the next line by `@Test` if you want to see the error with the failing
// schedule.
@Test(expected = classOf[AssertionError])
def testParallelWrongRemove() =
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
// This implementation of `remove` is wrong because we don't retry to
// call `graph.replace` when it fails. Therefore, user 1 might end up
// following user 2 that has been removed, or not following user 3
// which is concurrently followed.
override def remove(idToRemove: Int): Unit =
for (key, value) <- graph do
graph.replace(key, value, value.filter(_ != idToRemove))
// Note: writing `graph(key) = value.filter(_ != idToRemove)` would also
// be wrong because it does not check the previous value.
// Therefore, it could erase a concurrent update.
val u1 = insta.add()
val u2 = insta.add()
val u3 = insta.add()
insta.follow(u1, u2)
() =>
// Thread 1
insta.follow(u1, u3),
() =>
// Thread 2
results =>
val size = insta.graph.size
if insta.graph(u1).size != 1 then
f"Wrong number of users followed by 1: expected 1 but got ${insta.graph(u1)}"
else validateGraph(insta)
// We test wrong code here, so we expect an assertion error. You can replace
// the next line by `@Test` if you want to see the error with the failing
// schedule.
@Test(expected = classOf[AssertionError])
def testParallelWrongUnfollow() =
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
override def unfollow(a: Int, b: Int): Unit =
if !graph.contains(a) then return
val prev = graph(a) // Might throw java.util.NoSuchElementException
if !graph.replace(a, prev, prev.filter(_ != b)) then unfollow(a, b)
val u1 = insta.add()
val u2 = insta.add()
insta.follow(u1, u2)
() =>
// Thread 1
insta.unfollow(u1, u2),
() =>
// Thread 2
results =>
val size = insta.graph.size
if size != 1 then
(false, f"Wrong number of user: expected 1 but got ${size}")
else validateGraph(insta)
def validateGraph(insta: Instagram): (Boolean, String) =
for (a, following) <- insta.graph; b <- following do
if !insta.graph.contains(b) then
return (false, f"User $a follows non-existing user $b")
(true, "")
final class ScheduledIterator[T](
private val myIterator: Iterator[T],
private val scheduler: Scheduler
) extends Iterator[T]:
override def hasNext =
override def next() =
scheduler.exec("", Some(res => f" == $res"))
override def knownSize: Int =
final class ScheduledTrieMap[K, V](
private val myMap: Map[K, V],
private val scheduler: Scheduler
) extends Map[K, V]:
override def apply(key: K): V =
Some(res => f"TrieMap.apply($key) == $res")
override def contains(key: K): Boolean =
Some(res => f"TrieMap.contains($key) == $res")
override def get(key: K): Option[V] =
Some(res => f"TrieMap.get($key) == $res")
override def addOne(kv: (K, V)) =
override def subtractOne(k: K) =
override def iterator() =
ScheduledIterator(myMap.iterator, scheduler)
override def replace(k: K, v: V): Option[V] =
scheduler.exec(myMap.replace(k, v))(
Some(res => f"TrieMap.replace($k, $v) == $res")
override def replace(k: K, oldvalue: V, newvalue: V): Boolean =
scheduler.exec(myMap.replace(k, oldvalue, newvalue))(
Some(res => f"TrieMap.replace($k, $oldvalue, $newvalue) == $res")
override def putIfAbsent(k: K, v: V): Option[V] =
scheduler.exec(myMap.putIfAbsent(k, v))(
Some(res => f"TrieMap.putIfAbsent($k, $v)")
override def remove(k: K): Option[V] =
Some(res => f"TrieMap.remove($k)")
override def remove(k: K, v: V): Boolean =
scheduler.exec(myMap.remove(k, v))(
Some(res => f"TrieMap.remove($k, $v)")
package midterm23
import instrumentation.*
import java.util.concurrent.atomic.AtomicInteger
class BarberShopSolutionTest extends munit.FunSuite:
val implementations =
Map[String, (Int, Scheduler) => ScheduledBarberShopSolution](
"BarberShopSolution1" -> (new BarberShopSolution1(_)
with ScheduledBarberShopSolution(_)),
"BarberShopSolution2" -> (new BarberShopSolution2(_)
with ScheduledBarberShopSolution(_)),
"BarberShopSolution3" -> (new BarberShopSolution3(_)
with ScheduledBarberShopSolution(_))
for (name, makeShop) <- implementations do
for nCustomers <- 1 to 3 do
test(f"$name with $nCustomers customer(s)") {
testBarberShop(nCustomers, makeShop)