Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • lara/cs206-demos
  • gcharles/cs206-demos
  • gambhir/cs206-demos
3 results
Show changes
Showing
with 2644 additions and 0 deletions
package concpar22final02.instrumentation
import scala.annotation.tailrec
import concpar22final02.*
import scala.collection.mutable.ArrayBuffer
class SchedulableBarrier(val scheduler: Scheduler, size: Int)
extends Barrier(size)
with MockedMonitor
class SchedulableProblem2(
val scheduler: Scheduler,
imageSize: Int,
threadCount: Int,
numFilters: Int
) extends Problem2(imageSize, threadCount, numFilters):
self =>
override val barrier =
ArrayBuffer.fill(numFilters)(SchedulableBarrier(scheduler, threadCount))
package concpar22final02.instrumentation
import java.util.concurrent.*
import scala.concurrent.duration.*
import scala.collection.mutable.*
import Stats.*
import java.util.concurrent.atomic.AtomicInteger
sealed abstract class Result
case class RetVal(rets: List[Any]) extends Result
case class Except(msg: String, stackTrace: Array[StackTraceElement])
extends Result
case class Timeout(msg: String) extends Result
/** A class that maintains schedule and a set of thread ids. The schedules are
* advanced after an operation of a SchedulableBuffer is performed. Note: the
* real schedule that is executed may deviate from the input schedule due to
* the adjustments that had to be made for locks
*/
class Scheduler(sched: List[Int]):
val maxOps =
500 // a limit on the maximum number of operations the code is allowed to perform
var waitCount: AtomicInteger = new AtomicInteger(0)
var notifyCount: AtomicInteger = new AtomicInteger(0)
var notifyAllCount: AtomicInteger = new AtomicInteger(0)
private var schedule = sched
var numThreads = 0
private val realToFakeThreadId = Map[Long, Int]()
private val opLog =
ListBuffer[String]() // a mutable list (used for efficient concat)
private val threadStates = Map[Int, ThreadState]()
/** Runs a set of operations in parallel as per the schedule. Each operation
* may consist of many primitive operations like reads or writes to shared
* data structure each of which should be executed using the function `exec`.
* @timeout
* in milliseconds
* @return
* true - all threads completed on time, false -some tests timed out.
*/
def runInParallel(timeout: Long, ops: List[() => Any]): Result =
numThreads = ops.length
val threadRes = Array.fill(numThreads) { None: Any }
var exception: Option[(Throwable, Int)] = None
val syncObject = new Object()
var completed = new AtomicInteger(0)
// create threads
val threads = ops.zipWithIndex.map { case (op, i) =>
new Thread(
new Runnable():
def run(): Unit =
val fakeId = i + 1
setThreadId(fakeId)
try
updateThreadState(Start)
val res = op()
updateThreadState(End)
threadRes(i) = res
// notify the main thread if all threads have completed
if completed.incrementAndGet() == ops.length then
syncObject.synchronized { syncObject.notifyAll() }
catch
case e: Throwable
if exception != None => // do nothing here and silently fail
case e: Throwable =>
log(s"throw ${e.toString}")
exception = Some((e, fakeId))
syncObject.synchronized { syncObject.notifyAll() }
// println(s"$fakeId: ${e.toString}")
// Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads)
)
}
// start all threads
threads.foreach(_.start())
// wait for all threads to complete, or for an exception to be thrown, or for the time out to expire
var remTime = timeout
syncObject.synchronized {
timed {
if completed.get() != ops.length then syncObject.wait(timeout)
} { time =>
remTime -= time
}
}
if exception.isDefined then
Except(
s"Thread ${exception.get._2} crashed on the following schedule: \n" + opLog.mkString(
"\n"
),
exception.get._1.getStackTrace
)
else if remTime <= 1
then // timeout ? using 1 instead of zero to allow for some errors
Timeout(opLog.mkString("\n"))
else
// every thing executed normally
RetVal(threadRes.toList)
// Updates the state of the current thread
def updateThreadState(state: ThreadState): Unit =
val tid = threadId
synchronized {
threadStates(tid) = state
}
state match
case Sync(lockToAquire, locks) =>
if locks.indexOf(lockToAquire) < 0 then waitForTurn
else
// Re-aqcuiring the same lock
updateThreadState(Running(lockToAquire +: locks))
case Start => waitStart()
case End => removeFromSchedule(tid)
case Running(_) =>
case _ => waitForTurn // Wait, SyncUnique, VariableReadWrite
def waitStart(): Unit =
// while (threadStates.size < numThreads) {
// Thread.sleep(1)
// }
synchronized {
if threadStates.size < numThreads then wait()
else notifyAll()
}
def threadLocks =
synchronized {
threadStates(threadId).locks
}
def threadState =
synchronized {
threadStates(threadId)
}
def mapOtherStates(f: ThreadState => ThreadState) =
val exception = threadId
synchronized {
for k <- threadStates.keys if k != exception do
threadStates(k) = f(threadStates(k))
}
def log(str: String) =
if (realToFakeThreadId contains Thread.currentThread().getId()) then
val space = (" " * ((threadId - 1) * 2))
val s =
space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + " ")
opLog += s
/** Executes a read or write operation to a global data structure as per the
* given schedule
* @param msg
* a message corresponding to the operation that will be logged
*/
def exec[T](primop: => T)(
msg: => String,
postMsg: => Option[T => String] = None
): T =
if !(realToFakeThreadId contains Thread.currentThread().getId()) then primop
else
updateThreadState(VariableReadWrite(threadLocks))
val m = msg
if m != "" then log(m)
if opLog.size > maxOps then
throw new Exception(
s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!"
)
val res = primop
postMsg match
case Some(m) => log(m(res))
case None =>
res
private def setThreadId(fakeId: Int) = synchronized {
realToFakeThreadId(Thread.currentThread.getId) = fakeId
}
def threadId =
try realToFakeThreadId(Thread.currentThread().getId())
catch
case e: NoSuchElementException =>
throw new Exception(
"You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!"
)
private def isTurn(tid: Int) = synchronized {
(!schedule.isEmpty && schedule.head != tid)
}
def canProceed(): Boolean =
val tid = threadId
canContinue match
case Some((i, state)) if i == tid =>
// println(s"$tid: Runs ! Was in state $state")
canContinue = None
state match
case Sync(lockToAquire, locks) =>
updateThreadState(Running(lockToAquire +: locks))
case SyncUnique(lockToAquire, locks) =>
mapOtherStates {
_ match
case SyncUnique(lockToAquire2, locks2)
if lockToAquire2 == lockToAquire =>
Wait(lockToAquire2, locks2)
case e => e
}
updateThreadState(Running(lockToAquire +: locks))
case VariableReadWrite(locks) => updateThreadState(Running(locks))
true
case Some((i, state)) =>
// println(s"$tid: not my turn but $i !")
false
case None =>
false
var threadPreference =
0 // In the case the schedule is over, which thread should have the preference to execute.
/** returns true if the thread can continue to execute, and false otherwise */
def decide(): Option[(Int, ThreadState)] =
if !threadStates.isEmpty
then // The last thread who enters the decision loop takes the decision.
// println(s"$threadId: I'm taking a decision")
if threadStates.values.forall {
case e: Wait => true
case _ => false
}
then
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
throw new Exception(
s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them."
)
else
// Threads can be in Wait, Sync, SyncUnique, and VariableReadWrite mode.
// Let's determine which ones can continue.
val notFree = threadStates.collect { case (id, state) =>
state.locks
}.flatten.toSet
val threadsNotBlocked = threadStates.toSeq.filter {
case (id, v: VariableReadWrite) => true
case (id, v: CanContinueIfAcquiresLock) =>
!notFree(v.lockToAquire) || (v.locks contains v.lockToAquire)
case _ => false
}
if threadsNotBlocked.isEmpty then
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
val whoHasLock = threadStates.toSeq.flatMap { case (id, state) =>
state.locks.map(lock => (lock, id))
}.toMap
val reason = threadStates
.collect {
case (id, state: CanContinueIfAcquiresLock)
if !notFree(state.lockToAquire) =>
s"Thread $id is waiting on lock ${state.lockToAquire} held by thread ${whoHasLock(state.lockToAquire)}"
}
.mkString("\n")
throw new Exception(
s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason"
)
else if threadsNotBlocked.size == 1
then // Do not consume the schedule if only one thread can execute.
Some(threadsNotBlocked(0))
else
val next =
schedule.indexWhere(t =>
threadsNotBlocked.exists { case (id, state) => id == t }
)
if next != -1 then
// println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}")
val chosenOne =
schedule(next) // TODO: Make schedule a mutable list.
schedule = schedule.take(next) ++ schedule.drop(next + 1)
Some((chosenOne, threadStates(chosenOne)))
else
threadPreference = (threadPreference + 1) % threadsNotBlocked.size
val chosenOne =
threadsNotBlocked(threadPreference) // Maybe another strategy
Some(chosenOne)
// threadsNotBlocked.indexOf(threadId) >= 0
/*
val tnb = threadsNotBlocked.map(_._1).mkString(",")
val s = if (schedule.isEmpty) "empty" else schedule.mkString(",")
val only = if (schedule.isEmpty) "" else " only"
throw new Exception(s"The schedule is $s but$only threads ${tnb} can continue")*/
else canContinue
/** This will be called before a schedulable operation begins. This should not
* use synchronized
*/
var numThreadsWaiting = new AtomicInteger(0)
// var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice.
var canContinue: Option[(Int, ThreadState)] =
None // The result of the decision thread Id of the thread authorized to continue.
private def waitForTurn =
synchronized {
if numThreadsWaiting.incrementAndGet() == threadStates.size then
canContinue = decide()
notifyAll()
// waitingForDecision(threadId) = Some(numThreadsWaiting)
// println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}")
while !canProceed() do wait()
}
numThreadsWaiting.decrementAndGet()
/** To be invoked when a thread is about to complete
*/
private def removeFromSchedule(fakeid: Int) = synchronized {
// println(s"$fakeid: I'm taking a decision because I finished")
schedule = schedule.filterNot(_ == fakeid)
threadStates -= fakeid
if numThreadsWaiting.get() == threadStates.size then
canContinue = decide()
notifyAll()
}
def getOperationLog() = opLog
package concpar22final02.instrumentation
import scala.util.Random
import scala.collection.mutable.{Map as MutableMap}
import Stats.*
object TestHelper:
val noOfSchedules = 10000 // set this to 100k during deployment
val readWritesPerThread =
20 // maximum number of read/writes possible in one thread
val contextSwitchBound = 10
val testTimeout = 240 // the total time out for a test in seconds
val schedTimeout =
15 // the total time out for execution of a schedule in secs
// Helpers
/*def testManySchedules(op1: => Any): Unit = testManySchedules(List(() => op1))
def testManySchedules(op1: => Any, op2: => Any): Unit = testManySchedules(List(() => op1, () => op2))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any, op4: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3, () => op4))*/
def testSequential[T](ops: Scheduler => Any)(assertions: T => (
Boolean,
String
)) =
testManySchedules(
1,
(sched: Scheduler) =>
(
List(() => ops(sched)),
(res: List[Any]) => assertions(res.head.asInstanceOf[T])
)
)
/** @numThreads
* number of threads
* @ops
* operations to be executed, one per thread
* @assertion
* as condition that will executed after all threads have completed
* (without exceptions) the arguments are the results of the threads
*/
def testManySchedules(
numThreads: Int,
ops: Scheduler => (
List[() => Any], // Threads
List[Any] => (Boolean, String)
) // Assertion
) =
var timeout = testTimeout * 1000L
val threadIds = (1 to numThreads)
// (1 to scheduleLength).flatMap(_ => threadIds).toList.permutations.take(noOfSchedules).foreach {
val schedules = (new ScheduleGenerator(numThreads)).schedules()
var schedsExplored = 0
schedules.takeWhile(_ =>
schedsExplored <= noOfSchedules && timeout > 0
).foreach {
// case _ if timeout <= 0 => // break
case schedule =>
schedsExplored += 1
val schedr = new Scheduler(schedule)
// println("Exploring Sched: "+schedule)
val (threadOps, assertion) = ops(schedr)
if threadOps.size != numThreads then
throw new IllegalStateException(
s"Number of threads: $numThreads, do not match operations of threads: $threadOps"
)
timed { schedr.runInParallel(schedTimeout * 1000, threadOps) } { t =>
timeout -= t
} match
case Timeout(msg) =>
throw new java.lang.AssertionError(
"assertion failed\n" + "The schedule took too long to complete. A possible deadlock! \n" + msg
)
case Except(msg, stkTrace) =>
val traceStr =
"Thread Stack trace: \n" + stkTrace.map(
" at " + _.toString
).mkString("\n")
throw new java.lang.AssertionError(
"assertion failed\n" + msg + "\n" + traceStr
)
case RetVal(threadRes) =>
// check the assertion
val (success, custom_msg) = assertion(threadRes)
if !success then
val msg =
"The following schedule resulted in wrong results: \n" + custom_msg + "\n" + schedr
.getOperationLog()
.mkString("\n")
throw new java.lang.AssertionError("Assertion failed: " + msg)
}
if timeout <= 0 then
throw new java.lang.AssertionError(
"Test took too long to complete! Cannot check all schedules as your code is too slow!"
)
/** A schedule generator that is based on the context bound
*/
class ScheduleGenerator(numThreads: Int):
val scheduleLength = readWritesPerThread * numThreads
val rands =
(1 to scheduleLength).map(i =>
new Random(0xcafe * i)
) // random numbers for choosing a thread at each position
def schedules(): LazyList[List[Int]] =
var contextSwitches = 0
var contexts =
List[Int]() // a stack of thread ids in the order of context-switches
val remainingOps = MutableMap[Int, Int]()
remainingOps ++= (1 to numThreads).map(i =>
(i, readWritesPerThread)
) // num ops remaining in each thread
val liveThreads = (1 to numThreads).toSeq.toBuffer
/** Updates remainingOps and liveThreads once a thread is chosen for a
* position in the schedule
*/
def updateState(tid: Int): Unit =
val remOps = remainingOps(tid)
if remOps == 0 then liveThreads -= tid
else remainingOps += (tid -> (remOps - 1))
val schedule = rands.foldLeft(List[Int]()) {
case (acc, r) if contextSwitches < contextSwitchBound =>
val tid = liveThreads(r.nextInt(liveThreads.size))
contexts match
case prev :: tail
if prev != tid => // we have a new context switch here
contexts +:= tid
contextSwitches += 1
case prev :: tail =>
case _ => // init case
contexts +:= tid
updateState(tid)
acc :+ tid
case (
acc,
_
) => // here context-bound has been reached so complete the schedule without any more context switches
if !contexts.isEmpty then
contexts = contexts.dropWhile(remainingOps(_) == 0)
val tid = contexts match
case top :: tail => top
case _ =>
liveThreads(
0
) // here, there has to be threads that have not even started
updateState(tid)
acc :+ tid
}
schedule #:: schedules()
package concpar22final02.instrumentation
import scala.concurrent.*
import scala.concurrent.duration.*
import scala.concurrent.ExecutionContext.Implicits.global
object TestUtils:
def failsOrTimesOut[T](action: => T): Boolean =
val asyncAction = Future {
action
}
try Await.result(asyncAction, 2000.millisecond)
catch case _: Throwable => return true
return false
package concpar22final03
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.Random
trait EconomicsTest extends Economics:
val ownedCards: collection.mutable.Set[Card] = collection.mutable.Set[Card]()
def owned(c: Card): Boolean = ownedCards(c)
def isMine(c: Card): Boolean = ownedCards(c)
override def valueOf(cardName: String): Int = List(1, cardName.length).max
/** This is a container for an exact amount of money that can be hold, spent,
* or put in the bank
*/
val moneyInMoneyBag = collection.mutable.Map[MoneyBag, Int]()
def moneyIn(m: MoneyBag): Int = moneyInMoneyBag.getOrElse(m, 0)
/** If you sell a card, at some point in the future you will get some money
* (in a bag).
*/
def sellCard(c: Card): Future[MoneyBag] =
Future {
Thread.sleep(sellWaitTime())
synchronized(
if owned(c) then
ownedCards.remove(c)
getMoneyBag(valueOf(c.name))
else
throw Exception(
"This card doesn't belong to you or has already been sold, you can't sell it."
)
)
}
/** You can buy any "Scala: The Programming" card by providing a bag of money
* with the appropriate amount and waiting for the transaction to take place
*/
def buyCard(bag: MoneyBag, name: String): Future[Card] =
Future {
Thread.sleep(buyWaitTime())
synchronized {
if moneyIn(bag) != valueOf(name) then
throw Exception(
"You didn't provide the exact amount of money necessary to buy this card."
)
else moneyInMoneyBag.update(bag, 0)
getCard(name)
}
}
/** This simple bank account hold money for you. You can bring a money bag to
* increase your account, or withdraw a money bag of any size not greater
* than your account's balance.
*/
private var balance_ = initialBalance()
def balance: Int = balance_
def withdraw(amount: Int): Future[MoneyBag] =
Future {
Thread.sleep(withdrawWaitTime())
synchronized(
if balance_ >= amount then
balance_ -= amount
getMoneyBag(amount)
else
throw new Exception(
"You try to withdraw more money than you have on your account"
)
)
}
def deposit(bag: MoneyBag): Future[Unit] =
Future {
Thread.sleep(depositWaitTime())
synchronized {
if moneyInMoneyBag(bag) == 0 then
throw new Exception("You are depositing en empty bag!")
else
balance_ += moneyIn(bag)
moneyInMoneyBag.update(bag, 0)
}
}
def sellWaitTime(): Int
def buyWaitTime(): Int
def withdrawWaitTime(): Int
def depositWaitTime(): Int
def initialBalance(): Int
def getMoneyBag(i: Int) =
val m = MoneyBag()
synchronized(moneyInMoneyBag.update(m, i))
m
def getCard(n: String): Card =
val c = Card(n)
synchronized(ownedCards.update(c, true))
c
package concpar22final03
import scala.concurrent.duration.*
import scala.concurrent.{Await, Future}
import scala.util.{Try, Success, Failure}
import scala.concurrent.ExecutionContext.Implicits.global
class Problem3Suite extends munit.FunSuite:
trait Prob3Test extends Problem3:
override val economics: EconomicsTest
class Test1 extends Prob3Test:
override val economics: EconomicsTest = new EconomicsTest:
override def sellWaitTime() = 10
override def buyWaitTime() = 20
override def depositWaitTime() = 30
override def withdrawWaitTime() = 40
override def initialBalance() = 0
class Test2 extends Prob3Test:
override val economics: EconomicsTest = new EconomicsTest:
override def sellWaitTime() = 100
override def buyWaitTime() = 5
override def depositWaitTime() = 50
override def withdrawWaitTime() = 5
override def initialBalance() = 0
class Test3 extends Prob3Test:
override val economics: EconomicsTest = new EconomicsTest:
val rgen = new scala.util.Random(666)
override def sellWaitTime() = rgen.nextInt(100)
override def buyWaitTime() = rgen.nextInt(100)
override def depositWaitTime() = rgen.nextInt(100)
override def withdrawWaitTime() = rgen.nextInt(100)
override def initialBalance() = 0
class Test4 extends Prob3Test:
var counter = 5
def next(): Int =
counter = counter + 5 % 119
counter
override val economics: EconomicsTest = new EconomicsTest:
override def sellWaitTime() = next()
override def buyWaitTime() = next()
override def depositWaitTime() = next()
override def withdrawWaitTime() = next()
override def initialBalance() = next()
def testCases = List(new Test1, new Test2)
def unevenTestCases = List(new Test3, new Test4)
def tot(cards: List[String]): Int =
cards.map[Int]((n: String) => n.length).sum
def testOk(
t: Prob3Test,
money: Int,
sold: List[String],
wanted: List[String]
): Unit =
import t.*
import economics.*
val f = orderDeck(getMoneyBag(money), sold.map(getCard), wanted)
val r = Await.ready(f, 3.seconds).value.get
assert(r.isSuccess)
r match
case Success(d) =>
assertEquals(d.map(_.name).sorted, wanted.sorted)
assertEquals(d.length, wanted.length)
assertEquals(isMine(d.head), true)
case Failure(e) => ()
def testFailure(
t: Prob3Test,
money: Int,
sold: List[String],
wanted: List[String]
): Unit =
import t.*
import economics.*
val f = orderDeck(getMoneyBag(money), sold.map(getCard), wanted)
val r = Await.ready(f, 3.seconds).value.get
assert(r.isFailure)
r match
case Failure(e: NotEnoughMoneyException) => ()
case _ => fail(
"Should have thrown a NotEnoughMoneyException exception, but did not"
)
// --- Without sold cards ---
test(
"Should work correctly when a single card is asked with enough money (no card sold) (20pts)"
) {
testCases.foreach(t => testOk(t, 7, Nil, List("Tefeiri")))
}
test(
"Should work correctly when a single card is asked with enough money (no card sold, uneven waiting time) (10pts)"
) {
unevenTestCases.foreach(t => testOk(t, 7, Nil, List("Tefeiri")))
}
test(
"Should work correctly when multiple cards are asked with enough money (no card sold) (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
testCases.foreach(t => testOk(t, tot(cards), Nil, cards))
}
test(
"Should work correctly when multiple cards are asked with enough money (no card sold, uneven waiting time) (10pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
unevenTestCases.foreach(t => testOk(t, tot(cards), Nil, cards))
}
test(
"Should work correctly when asked duplicates of cards, with enough money (no card sold) (20pts)"
) {
val cards = List("aaaa", "aaaa", "aaaa", "dd", "dd", "dd", "dd")
testCases.foreach(t => testOk(t, tot(cards), Nil, cards))
}
test(
"Should work correctly when asked duplicates of cards, with enough money (no card sold, uneven waiting time) (10pts)"
) {
val cards = List("aaaa", "aaaa", "aaaa", "dd", "dd", "dd", "dd")
unevenTestCases.foreach(t => testOk(t, tot(cards), Nil, cards))
}
// --- With sold cards ---
test(
"Should work correctly when a single card is bought and a single of the same price is sold (20pts)"
) {
testCases.foreach(t => testOk(t, 0, List("Chandra"), List("Tefeiri")))
}
test(
"Should work correctly when a single card is bought and a single of the same price is sold (uneven waiting time) (10pts)"
) {
unevenTestCases.foreach(t => testOk(t, 0, List("Chandra"), List("Tefeiri")))
}
test(
"Should work correctly when multiple cards are asked and multiple of matching values are sold (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("1111111", "2", "3333", "44", "55555", "666", "7777")
testCases.foreach(t => testOk(t, 0, sold, cards))
}
test(
"Should work correctly when multiple cards are asked and multiple of matching values are sold (uneven waiting time) (10pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("1111111", "2", "3333", "44", "55555", "666", "7777")
unevenTestCases.foreach(t => testOk(t, 0, sold, cards))
}
test(
"Should work correctly when multiple cards are asked and multiple of the same total value are sold (20pts)"
) {
val cards2 = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold2 = List("111111111", "22", "3", "44", "555555", "666", "777")
assert(tot(sold2) == tot(cards2))
testCases.foreach(t => testOk(t, 0, sold2, cards2))
}
test(
"Should work correctly when multiple cards are asked and multiple of the same total value are sold (uneven waiting time) (10pts)"
) {
val cards2 = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold2 = List("111111111", "22", "3", "44", "555555", "666", "777")
assert(tot(sold2) == tot(cards2))
unevenTestCases.foreach(t => testOk(t, 0, sold2, cards2))
}
test(
"Should work correctly when given money and sold cards are sufficient for the wanted cards (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("11111", "2", "33", "44", "5555", "666", "777")
val bagMoney = tot(cards) - tot(sold)
testCases.foreach(t => testOk(t, bagMoney, sold, cards))
}
test(
"Should work correctly when given money and sold cards are sufficient for the wanted cards (uneven waiting time) (10pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("11111", "2", "33", "44", "5555", "666", "777")
val bagMoney = tot(cards) - tot(sold)
unevenTestCases.foreach(t => testOk(t, bagMoney, sold, cards))
}
// --- Failures ---
test(
"Should return a failure when too little money is provided (no card sold) (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
testCases.foreach(t => testFailure(t, tot(cards) - 1, Nil, cards))
testCases.foreach(t => testFailure(t, tot(cards) - 50, Nil, cards))
}
test(
"Should return a failure when too little money or sold cards are provided (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("11111", "2", "33", "44", "5555", "666", "777")
val bagMoney = tot(cards) - tot(sold)
testCases.foreach(t => testFailure(t, bagMoney - 2, sold, cards))
}
package concpar22final04
import akka.actor.*
import akka.testkit.*
import akka.pattern.*
import akka.util.Timeout
import concurrent.duration.*
import User.Protocol.*
import User.Responses.*
import SongsStore.Protocol.*
import SongsStore.Responses.*
import scala.util.{Try, Success, Failure}
import com.typesafe.config.ConfigFactory
import java.util.Date
import scala.util.Random
class Problem4Suite extends munit.FunSuite:
//---
Random.setSeed(42178263)
/*+++
Random.setSeed(42)
++*/
test("after receiving GetInfo, should answer with Info (20pts)") {
new MyTestKit:
def tests() =
ada ! GetInfo
expectMsg(Info("1", "Ada"))
}
test(
"after receiving GetHomepageData, should answer with the correct HomepageData when there is no liked songs and no activity items (30pts)"
) {
new MyTestKit:
def tests() =
ada ! GetHomepageData
expectMsg(HomepageData(List(), List()))
}
test(
"after receiving Like(1), should add 1 to the list of liked songs (20pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(1), List()))
}
test(
"after receiving Like(1) and then Like(2), the list of liked songs should start with List(2, 1) (20pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
expectNoMessage()
ada ! Like(2)
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(2, 1), List()))
}
test(
"after receiving Like(1) and then Like(1), song 1 should be in the list of liked songs only once (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
expectNoMessage()
ada ! Like(1)
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(1), List()))
}
test(
"after receiving Like(1), Like(2) and then Like(1), the list of liked songs should start with List(2, 1) (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
expectNoMessage()
ada ! Like(2)
expectNoMessage()
ada ! Like(1)
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(2, 1), List()))
}
test(
"after receiving Like(1), Unlike(1) and then Unlike(1), the list of liked songs should not contain song 1 (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
expectNoMessage()
ada ! Like(2)
expectNoMessage()
ada ! Like(1)
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(2, 1), List()))
}
test(
"after receiving Subscribe(aUser) and then Play(5), should send AddActivity(Activity(\"1\", 5)) to aUser (20pts)"
) {
new MyTestKit:
def tests() =
ada ! Subscribe(self)
expectNoMessage()
ada ! Play(5)
expectMsg(AddActivity(Activity("1", "Ada", 5)))
}
test(
"after receiving Subscribe(aUser), Subscribe(bUser) and then Play(5), should send AddActivity(Activity(\"1\", 5)) to aUser (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Subscribe(self)
expectNoMessage()
val donald = new TestProbe(system)
ada ! Subscribe(donald.ref)
expectNoMessage()
ada ! Play(5)
expectMsg(AddActivity(Activity("1", "Ada", 5)))
donald.expectMsg(AddActivity(Activity("1", "Ada", 5)))
}
test(
"after receiving Subscribe(aUser), Subscribe(aUser) and then Play(5), should send AddActivity(Activity(\"1\", 5)) to aUser only once (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Subscribe(self)
expectNoMessage()
ada ! Subscribe(self)
expectNoMessage()
ada ! Play(5)
expectMsg(AddActivity(Activity("1", "Ada", 5)))
expectNoMessage()
}
test(
"after receiving Subscribe(aUser), Unsubscribe(aUser) and then Play(5), should not send AddActivity(Activity(\"1\", 5)) to aUser (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Subscribe(self)
expectNoMessage()
ada ! Play(5)
expectMsg(AddActivity(Activity("1", "Ada", 5)))
ada ! Unsubscribe(self)
expectNoMessage()
ada ! Play(5)
expectNoMessage()
}
test(
"after receiving AddActivity(Activity(\"1\", 5)), Activity(\"1\", 5) should be in the activity feed (10pts)"
) {
new MyTestKit:
def tests() =
ada ! AddActivity(Activity("0", "Self", 5))
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(), List(Activity("0", "Self", 5))))
}
test(
"after receiving AddActivity(Activity(\"1\", 5)) and AddActivity(Activity(\"1\", 6)), Activity(\"1\", 6) should be in the activity feed and Activity(\"1\", 5) should not (10pts)"
) {
new MyTestKit:
def tests() =
ada ! AddActivity(Activity("0", "Self", 5))
expectNoMessage()
ada ! AddActivity(Activity("0", "Self", 6))
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(), List(Activity("0", "Self", 6))))
}
test(
"after receiving GetHomepageText, should answer with a result containing \"Howdy $name!\" where $name is the user's name (10pts)"
) {
new MyTestKit:
def tests() =
val name = Random.alphanumeric.take(5).mkString
val randomUser =
system.actorOf(Props(classOf[User], "5", name, songsStore), "user-5")
randomUser ! GetHomepageText
expectMsgClass(classOf[HomepageText]).result.contains(f"Howdy $name!")
}
test(
"after receiving GetHomepageText, should answer with the correct names of liked songs (1) (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(8)
expectNoMessage()
ada ! Like(3)
expectNoMessage()
ada ! Like(2)
expectNoMessage()
ada ! GetHomepageText
assertEquals(
expectMsgClass(classOf[HomepageText]).result.linesIterator
.drop(2)
.take(4)
.mkString("\n")
.trim,
"""
|Liked Songs:
|* Sunny by Boney M.
|* J'irai où tu iras by Céline Dion & Jean-Jacques Goldman
|* Hold the line by TOTO
""".stripMargin.trim
)
}
test(
"after receiving GetHomepageText, should answer with the correct names of liked songs (2) (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(9)
expectNoMessage()
ada ! Like(7)
expectNoMessage()
ada ! GetHomepageText
assertEquals(
expectMsgClass(classOf[HomepageText]).result.linesIterator
.drop(2)
.take(3)
.mkString("\n")
.trim,
"""
|Liked Songs:
|* Straight Edge by Minor Threat
|* Anarchy in the UK by Sex Pistols
""".stripMargin.trim
)
}
test(
"after receiving GetHomepageText, should answer with the correct activity feed (1) (10pts)"
) {
new MyTestKit:
def tests() =
bob ! Subscribe(ada)
expectNoMessage()
carol ! Subscribe(ada)
expectNoMessage()
donald ! Subscribe(ada)
expectNoMessage()
bob ! Play(3)
expectNoMessage()
carol ! Play(8)
expectNoMessage()
ada ! GetHomepageText
assertEquals(
expectMsgClass(classOf[HomepageText]).result.linesIterator
.drop(4)
.take(10)
.mkString("\n")
.trim,
"""
|Activity Feed:
|* Carol is listening to Hold the line by TOTO
|* Bob is listening to J'irai où tu iras by Céline Dion & Jean-Jacques Goldman
""".stripMargin.trim
)
}
test(
"after receiving GetHomepageText, should answer with the correct activity feed (2) (10pts)"
) {
new MyTestKit:
def tests() =
bob ! Subscribe(ada)
expectNoMessage()
carol ! Subscribe(ada)
expectNoMessage()
donald ! Subscribe(ada)
expectNoMessage()
bob ! Play(9)
expectNoMessage()
carol ! Play(10)
expectNoMessage()
donald ! Play(6)
expectNoMessage()
bob ! Play(7)
expectNoMessage()
ada ! GetHomepageText
assertEquals(
expectMsgClass(classOf[HomepageText]).result.linesIterator
.drop(4)
.take(10)
.mkString("\n")
.trim,
"""
|Activity Feed:
|* Bob is listening to Straight Edge by Minor Threat
|* Donald is listening to Désenchantée by Mylène Farmer
|* Carol is listening to Breakfast in America by Supertramp
""".stripMargin.trim
)
}
test(
"after receiving GetHomepageText, should answer with the correct text (full test) (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
expectNoMessage()
ada ! Like(2)
expectNoMessage()
bob ! Subscribe(ada)
expectNoMessage()
carol ! Subscribe(ada)
expectNoMessage()
donald ! Subscribe(ada)
expectNoMessage()
donald ! Play(3)
expectNoMessage()
bob ! Play(4)
expectNoMessage()
carol ! Play(5)
expectNoMessage()
ada ! GetHomepageText
assertEquals(
expectMsgClass(classOf[HomepageText]).result.linesIterator
.mkString("\n")
.trim,
"""
|Howdy Ada!
|
|Liked Songs:
|* Sunny by Boney M.
|* High Hopes by Pink Floyd
|
|Activity Feed:
|* Carol is listening to Strobe by deadmau5
|* Bob is listening to Ce monde est cruel by Vald
|* Donald is listening to J'irai où tu iras by Céline Dion & Jean-Jacques Goldman
""".stripMargin.trim
)
}
abstract class MyTestKit
extends TestKit(ActorSystem("TestSystem"))
with ImplicitSender:
val songsStore = system.actorOf(Props(MockSongsStore()), "songsStore")
def makeAda() =
system.actorOf(Props(classOf[User], "1", "Ada", songsStore), "user-1")
val ada = makeAda()
val bob =
system.actorOf(Props(classOf[User], "2", "Bob", songsStore), "user-2")
val carol =
system.actorOf(Props(classOf[User], "3", "Carol", songsStore), "user-3")
val donald =
system.actorOf(Props(classOf[User], "4", "Donald", songsStore), "user-4")
def tests(): Unit
try tests()
finally shutdown(system)
package instrumentation
trait MockedMonitor extends Monitor:
def scheduler: Scheduler
// Can be overriden.
override def waitDefault() =
scheduler.log("wait")
scheduler updateThreadState Wait(this, scheduler.threadLocks.tail)
override def synchronizedDefault[T](toExecute: => T): T =
scheduler.log("synchronized check")
val prevLocks = scheduler.threadLocks
scheduler updateThreadState Sync(
this,
prevLocks
) // If this belongs to prevLocks, should just continue.
scheduler.log("synchronized -> enter")
try
toExecute
finally
scheduler updateThreadState Running(prevLocks)
scheduler.log("synchronized -> out")
override def notifyDefault() =
scheduler mapOtherStates {
state =>
state match
case Wait(lockToAquire, locks) if lockToAquire == this =>
SyncUnique(this, state.locks)
case e => e
}
scheduler.log("notify")
override def notifyAllDefault() =
scheduler mapOtherStates {
state =>
state match
case Wait(lockToAquire, locks) if lockToAquire == this =>
Sync(this, state.locks)
case SyncUnique(lockToAquire, locks) if lockToAquire == this =>
Sync(this, state.locks)
case e => e
}
scheduler.log("notifyAll")
abstract class ThreadState:
def locks: Seq[AnyRef]
trait CanContinueIfAcquiresLock extends ThreadState:
def lockToAquire: AnyRef
case object Start extends ThreadState:
def locks: Seq[AnyRef] = Seq.empty
case object End extends ThreadState:
def locks: Seq[AnyRef] = Seq.empty
case class Wait(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState
case class SyncUnique(lockToAquire: AnyRef, locks: Seq[AnyRef])
extends ThreadState with CanContinueIfAcquiresLock
case class Sync(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState
with CanContinueIfAcquiresLock
case class Running(locks: Seq[AnyRef]) extends ThreadState
case class VariableReadWrite(locks: Seq[AnyRef]) extends ThreadState
package instrumentation
import java.util.concurrent.*;
import scala.concurrent.duration.*
import scala.collection.mutable.*
import Stats.*
import java.util.concurrent.atomic.AtomicInteger
sealed abstract class Result
case class RetVal(rets: List[Any]) extends Result
case class Except(msg: String, stackTrace: Array[StackTraceElement])
extends Result
case class Timeout(msg: String) extends Result
/** A class that maintains schedule and a set of thread ids. The schedules are
* advanced after an operation of a SchedulableBuffer is performed. Note: the
* real schedule that is executed may deviate from the input schedule due to
* the adjustments that had to be made for locks
*/
class Scheduler(sched: List[Int]):
val maxOps =
500 // a limit on the maximum number of operations the code is allowed to perform
private var schedule = sched
private var numThreads = 0
private val realToFakeThreadId = Map[Long, Int]()
private val opLog =
ListBuffer[String]() // a mutable list (used for efficient concat)
private val threadStates = Map[Int, ThreadState]()
/** Runs a set of operations in parallel as per the schedule. Each operation
* may consist of many primitive operations like reads or writes to shared
* data structure each of which should be executed using the function `exec`.
* @timeout
* in milliseconds
* @return
* true - all threads completed on time, false -some tests timed out.
*/
def runInParallel(timeout: Long, ops: List[() => Any]): Result =
numThreads = ops.length
val threadRes = Array.fill(numThreads) { None: Any }
var exception: Option[Except] = None
val syncObject = new Object()
var completed = new AtomicInteger(0)
// create threads
val threads = ops.zipWithIndex.map {
case (op, i) =>
new Thread(new Runnable():
def run(): Unit =
val fakeId = i + 1
setThreadId(fakeId)
try
updateThreadState(Start)
val res = op()
updateThreadState(End)
threadRes(i) = res
// notify the main thread if all threads have completed
if completed.incrementAndGet() == ops.length then
syncObject.synchronized { syncObject.notifyAll() }
catch
case e: Throwable
if exception != None => // do nothing here and silently fail
case e: Throwable =>
log(s"throw ${e.toString}")
exception = Some(Except(
s"Thread $fakeId crashed on the following schedule: \n" + opLog.mkString(
"\n"
),
e.getStackTrace
))
syncObject.synchronized { syncObject.notifyAll() }
// println(s"$fakeId: ${e.toString}")
// Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads)
)
}
// start all threads
threads.foreach(_.start())
// wait for all threads to complete, or for an exception to be thrown, or for the time out to expire
var remTime = timeout
syncObject.synchronized {
timed { if completed.get() != ops.length then syncObject.wait(timeout) } {
time => remTime -= time
}
}
if exception.isDefined then
exception.get
else if remTime <= 1
then // timeout ? using 1 instead of zero to allow for some errors
Timeout(opLog.mkString("\n"))
else
// every thing executed normally
RetVal(threadRes.toList)
// Updates the state of the current thread
def updateThreadState(state: ThreadState): Unit =
val tid = threadId
synchronized {
threadStates(tid) = state
}
state match
case Sync(lockToAquire, locks) =>
if locks.indexOf(lockToAquire) < 0 then waitForTurn
else
// Re-aqcuiring the same lock
updateThreadState(Running(lockToAquire +: locks))
case Start => waitStart()
case End => removeFromSchedule(tid)
case Running(_) =>
case _ => waitForTurn // Wait, SyncUnique, VariableReadWrite
def waitStart(): Unit =
// while (threadStates.size < numThreads) {
// Thread.sleep(1)
// }
synchronized {
if threadStates.size < numThreads then
wait()
else
notifyAll()
}
def threadLocks =
synchronized {
threadStates(threadId).locks
}
def threadState =
synchronized {
threadStates(threadId)
}
def mapOtherStates(f: ThreadState => ThreadState) =
val exception = threadId
synchronized {
for k <- threadStates.keys if k != exception do
threadStates(k) = f(threadStates(k))
}
def log(str: String) =
if (realToFakeThreadId contains Thread.currentThread().getId()) then
val space = (" " * ((threadId - 1) * 2))
val s =
space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + " ")
// println(s)
opLog += s
/** Executes a read or write operation to a global data structure as per the
* given schedule
* @param msg
* a message corresponding to the operation that will be logged
*/
def exec[T](primop: => T)(
msg: => String,
postMsg: => Option[T => String] = None
): T =
if !(realToFakeThreadId contains Thread.currentThread().getId()) then
primop
else
updateThreadState(VariableReadWrite(threadLocks))
val m = msg
if m != "" then log(m)
if opLog.size > maxOps then
throw new Exception(
s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!"
)
val res = primop
postMsg match
case Some(m) => log(m(res))
case None =>
res
private def setThreadId(fakeId: Int) = synchronized {
realToFakeThreadId(Thread.currentThread.getId) = fakeId
}
def threadId =
try
realToFakeThreadId(Thread.currentThread().getId())
catch
case e: NoSuchElementException =>
throw new Exception(
"You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!"
)
private def isTurn(tid: Int) = synchronized {
(!schedule.isEmpty && schedule.head != tid)
}
def canProceed(): Boolean =
val tid = threadId
canContinue match
case Some((i, state)) if i == tid =>
// println(s"$tid: Runs ! Was in state $state")
canContinue = None
state match
case Sync(lockToAquire, locks) =>
updateThreadState(Running(lockToAquire +: locks))
case SyncUnique(lockToAquire, locks) =>
mapOtherStates {
_ match
case SyncUnique(lockToAquire2, locks2)
if lockToAquire2 == lockToAquire =>
Wait(lockToAquire2, locks2)
case e => e
}
updateThreadState(Running(lockToAquire +: locks))
case VariableReadWrite(locks) => updateThreadState(Running(locks))
true
case Some((i, state)) =>
// println(s"$tid: not my turn but $i !")
false
case None =>
false
var threadPreference =
0 // In the case the schedule is over, which thread should have the preference to execute.
/** returns true if the thread can continue to execute, and false otherwise */
def decide(): Option[(Int, ThreadState)] =
if !threadStates.isEmpty
then // The last thread who enters the decision loop takes the decision.
// println(s"$threadId: I'm taking a decision")
if threadStates.values.forall {
case e: Wait => true
case _ => false
}
then
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
throw new Exception(
s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them."
)
else
// Threads can be in Wait, Sync, SyncUnique, and VariableReadWrite mode.
// Let's determine which ones can continue.
val notFree = threadStates.collect { case (id, state) =>
state.locks
}.flatten.toSet
val threadsNotBlocked = threadStates.toSeq.filter {
case (id, v: VariableReadWrite) => true
case (id, v: CanContinueIfAcquiresLock) =>
!notFree(v.lockToAquire) || (v.locks contains v.lockToAquire)
case _ => false
}
if threadsNotBlocked.isEmpty then
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
val whoHasLock = threadStates.toSeq.flatMap { case (id, state) =>
state.locks.map(lock => (lock, id))
}.toMap
val reason = threadStates.collect {
case (id, state: CanContinueIfAcquiresLock)
if !notFree(state.lockToAquire) =>
s"Thread $id is waiting on lock ${state.lockToAquire} held by thread ${whoHasLock(state.lockToAquire)}"
}.mkString("\n")
throw new Exception(
s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason"
)
else if threadsNotBlocked.size == 1
then // Do not consume the schedule if only one thread can execute.
Some(threadsNotBlocked(0))
else
val next = schedule.indexWhere(t =>
threadsNotBlocked.exists { case (id, state) => id == t }
)
if next != -1 then
// println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}")
val chosenOne =
schedule(next) // TODO: Make schedule a mutable list.
schedule = schedule.take(next) ++ schedule.drop(next + 1)
Some((chosenOne, threadStates(chosenOne)))
else
threadPreference = (threadPreference + 1) % threadsNotBlocked.size
val chosenOne =
threadsNotBlocked(threadPreference) // Maybe another strategy
Some(chosenOne)
// threadsNotBlocked.indexOf(threadId) >= 0
/*
val tnb = threadsNotBlocked.map(_._1).mkString(",")
val s = if (schedule.isEmpty) "empty" else schedule.mkString(",")
val only = if (schedule.isEmpty) "" else " only"
throw new Exception(s"The schedule is $s but$only threads ${tnb} can continue")*/
else canContinue
/** This will be called before a schedulable operation begins. This should not
* use synchronized
*/
var numThreadsWaiting = new AtomicInteger(0)
// var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice.
var canContinue: Option[(Int, ThreadState)] =
None // The result of the decision thread Id of the thread authorized to continue.
private def waitForTurn =
synchronized {
if numThreadsWaiting.incrementAndGet() == threadStates.size then
canContinue = decide()
notifyAll()
// waitingForDecision(threadId) = Some(numThreadsWaiting)
// println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}")
while !canProceed() do wait()
}
numThreadsWaiting.decrementAndGet()
/** To be invoked when a thread is about to complete
*/
private def removeFromSchedule(fakeid: Int) = synchronized {
// println(s"$fakeid: I'm taking a decision because I finished")
schedule = schedule.filterNot(_ == fakeid)
threadStates -= fakeid
if numThreadsWaiting.get() == threadStates.size then
canContinue = decide()
notifyAll()
}
def getOperationLog() = opLog
package instrumentation
import java.lang.management.*
/** A collection of methods that can be used to collect run-time statistics */
object Stats:
def timed[T](code: => T)(cont: Long => Unit): T =
var t1 = System.currentTimeMillis()
val r = code
cont((System.currentTimeMillis() - t1))
r
def withTime[T](code: => T): (T, Long) =
var t1 = System.currentTimeMillis()
val r = code
(r, (System.currentTimeMillis() - t1))
package instrumentation
import scala.util.Random
import scala.collection.mutable.{Map as MutableMap}
import Stats.*
object TestHelper:
val noOfSchedules = 10000 // set this to 100k during deployment
val readWritesPerThread =
20 // maximum number of read/writes possible in one thread
val contextSwitchBound = 10
val testTimeout = 240 // the total time out for a test in seconds
val schedTimeout =
15 // the total time out for execution of a schedule in secs
// Helpers
/*def testManySchedules(op1: => Any): Unit = testManySchedules(List(() => op1))
def testManySchedules(op1: => Any, op2: => Any): Unit = testManySchedules(List(() => op1, () => op2))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any, op4: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3, () => op4))*/
def testSequential[T](ops: Scheduler => Any)(assertions: T => (
Boolean,
String
)) =
testManySchedules(
1,
(sched: Scheduler) =>
(
List(() => ops(sched)),
(res: List[Any]) => assertions(res.head.asInstanceOf[T])
)
)
/** @numThreads
* number of threads
* @ops
* operations to be executed, one per thread
* @assertion
* as condition that will executed after all threads have completed
* (without exceptions) the arguments are the results of the threads
*/
def testManySchedules(
numThreads: Int,
ops: Scheduler => (
List[() => Any], // Threads
List[Any] => (Boolean, String)
) // Assertion
) =
var timeout = testTimeout * 1000L
val threadIds = (1 to numThreads)
// (1 to scheduleLength).flatMap(_ => threadIds).toList.permutations.take(noOfSchedules).foreach {
val schedules = (new ScheduleGenerator(numThreads)).schedules()
var schedsExplored = 0
schedules.takeWhile(_ =>
schedsExplored <= noOfSchedules && timeout > 0
).foreach {
// case _ if timeout <= 0 => // break
case schedule =>
schedsExplored += 1
val schedr = new Scheduler(schedule)
// println("Exploring Sched: "+schedule)
val (threadOps, assertion) = ops(schedr)
if threadOps.size != numThreads then
throw new IllegalStateException(
s"Number of threads: $numThreads, do not match operations of threads: $threadOps"
)
timed { schedr.runInParallel(schedTimeout * 1000, threadOps) } { t =>
timeout -= t
} match
case Timeout(msg) =>
throw new java.lang.AssertionError(
"assertion failed\n" + "The schedule took too long to complete. A possible deadlock! \n" + msg
)
case Except(msg, stkTrace) =>
val traceStr = "Thread Stack trace: \n" + stkTrace.map(
" at " + _.toString
).mkString("\n")
throw new java.lang.AssertionError(
"assertion failed\n" + msg + "\n" + traceStr
)
case RetVal(threadRes) =>
// check the assertion
val (success, custom_msg) = assertion(threadRes)
if !success then
val msg =
"The following schedule resulted in wrong results: \n" + custom_msg + "\n" + schedr.getOperationLog().mkString(
"\n"
)
throw new java.lang.AssertionError("Assertion failed: " + msg)
}
if timeout <= 0 then
throw new java.lang.AssertionError(
"Test took too long to complete! Cannot check all schedules as your code is too slow!"
)
/** A schedule generator that is based on the context bound
*/
class ScheduleGenerator(numThreads: Int):
val scheduleLength = readWritesPerThread * numThreads
val rands =
(1 to scheduleLength).map(i =>
new Random(0xcafe * i)
) // random numbers for choosing a thread at each position
def schedules(): LazyList[List[Int]] =
var contextSwitches = 0
var contexts =
List[Int]() // a stack of thread ids in the order of context-switches
val remainingOps = MutableMap[Int, Int]()
remainingOps ++= (1 to numThreads).map(i =>
(i, readWritesPerThread)
) // num ops remaining in each thread
val liveThreads = (1 to numThreads).toSeq.toBuffer
/** Updates remainingOps and liveThreads once a thread is chosen for a
* position in the schedule
*/
def updateState(tid: Int): Unit =
val remOps = remainingOps(tid)
if remOps == 0 then
liveThreads -= tid
else
remainingOps += (tid -> (remOps - 1))
val schedule = rands.foldLeft(List[Int]()) {
case (acc, r) if contextSwitches < contextSwitchBound =>
val tid = liveThreads(r.nextInt(liveThreads.size))
contexts match
case prev :: tail
if prev != tid => // we have a new context switch here
contexts +:= tid
contextSwitches += 1
case prev :: tail =>
case _ => // init case
contexts +:= tid
updateState(tid)
acc :+ tid
case (
acc,
_
) => // here context-bound has been reached so complete the schedule without any more context switches
if !contexts.isEmpty then
contexts = contexts.dropWhile(remainingOps(_) == 0)
val tid = contexts match
case top :: tail => top
case _ =>
liveThreads(
0
) // here, there has to be threads that have not even started
updateState(tid)
acc :+ tid
}
schedule #:: schedules()
package instrumentation
import scala.concurrent.*
import scala.concurrent.duration.*
import scala.concurrent.ExecutionContext.Implicits.global
import org.junit.Assert.*
object TestUtils:
def failsOrTimesOut[T](action: => T): Boolean =
val asyncAction = Future {
action
}
try
Await.result(asyncAction, 2000.millisecond)
catch
case _: Throwable => return true
return false
def assertDeadlock[T](action: => T): Unit =
try
action
throw new AssertionError("No error detected.")
catch
case e: AssertionError =>
assert(e.getMessage.contains("Deadlock"), "No deadlock detected.")
def assertMaybeDeadlock[T](action: => T): Unit =
try
action
throw new AssertionError("No error detected.")
catch
case e: AssertionError =>
assert(
e.getMessage.contains("A possible deadlock!"),
"No deadlock detected."
)
package midterm22
import org.junit.*
import org.junit.Assert.*
import instrumentation.*
class Mock2Test:
@Test
def test() =
TestUtils.assertDeadlock(
TestHelper.testManySchedules(
2,
scheduler =>
val a = new ScheduledAccount(50, scheduler)
val b = new ScheduledAccount(70, scheduler)
(
List(
() => a.transfer(b, 10),
() => b.transfer(a, 10)
),
results => (true, "")
)
)
)
class ScheduledAccount(n: Int, val scheduler: Scheduler)
extends Account(n)
with MockedMonitor
package midterm22
import org.junit.*
import org.junit.Assert.*
class Part1Test:
val testArray =
Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19)
@Test
def testQuestion1Pos() =
val tasksCreatedBefore = tasksCreated.get
assertEquals(Some(18), find(testArray, 18, 3))
assertEquals(10, tasksCreated.get - tasksCreatedBefore)
@Test
def testQuestion1Neg() =
assertEquals(find(testArray, 20, 3), None)
@Test
def testQuestion2Pos(): Unit =
assertEquals(findAggregated(testArray, 18), Some(18))
@Test
def testQuestion2Neg(): Unit =
assertEquals(findAggregated(testArray, 20), None)
package midterm22
import org.junit.*
import org.junit.Assert.*
class Part2Test:
val testArray2 = Array(0, 50, 7, 1, 28, 42)
val testList2 = List(0, 50, 7, 1, 28, 42)
@Test
def testQuestion4Pos() =
assert(contains(testArray2, 7))
@Test
def testQuestion4Neg() =
assert(!contains(testArray2, 8))
@Test
def testQuestion6Pos() =
assert(contains(testList2, 7))
@Test
def testQuestion6Neg() =
assert(!contains(testList2, 8))
package midterm22
import instrumentation.Monitor
import instrumentation.MockedMonitor
import org.junit.*
import org.junit.Assert.*
import instrumentation.*
import java.util.concurrent.atomic.AtomicInteger
class Part4Test:
// This test can result in a deadlock because locks can be called in any
// order. Here, Thread 1 locks Node 3 first and then Node 2, whereas Thread 2
// locks Node 2 first and then Node 3. This will lead to a deadlock.
@Test
def testQuestion9() =
TestUtils.assertDeadlock(
TestHelper.testManySchedules(
2,
scheduler =>
val allNodes =
(for i <- 0 to 6 yield ScheduledNode(i, scheduler)).toList
// Shared by all threads
var sum: Int = 0
def increment(e: Int) = sum += e
(
List(
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes
lockFun(nodes, increment)
,
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes
lockFun(nodes, increment),
),
results => (true, "")
)
)
)
// This will not lead to a deadlock because the lock acquire happens in a
// particular order. Thread 1 acquires locks in order 1->2->3->4, whereas
// Thread 2 acquires locks in order 2->3->5.
@Test
def testQuestion10() =
TestHelper.testManySchedules(
2,
scheduler =>
val allNodes =
(for i <- 0 to 6 yield ScheduledNode(i, scheduler)).toList
// Shared by all threads
var sum: Int = 0
def increment(e: Int) = sum += e
(
List(
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes.sortWith((x, y) => x.guid > y.guid)
lockFun(nodes, increment)
,
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes.sortWith((x, y) => x.guid > y.guid)
lockFun(nodes, increment),
),
results => (true, "")
)
)
// This will not lead to a deadlock because the lock acquire happens in a
// particular order. Thread 1 acquires locks in order 4->3->2->1, whereas
// Thread 2 acquires locks in order 5->3->2.
@Test
def testQuestion11() =
TestHelper.testManySchedules(
2,
scheduler =>
val allNodes =
(for i <- 0 to 6 yield ScheduledNode(i, scheduler)).toList
// Shared by all threads
var sum: Int = 0
def increment(e: Int) = sum += e
(
List(
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment)
,
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment),
),
results => (true, "")
)
)
// This test can result in a deadlock because locks are not called in any
// order. Thread 1 acquire order (3->2->4->1), Thread 2 acquire order
// (2->3->5). Thread 1 locks Node3 first and then Node2, whereas Thread 2
// locks Node 2 first and then Node3. This will lead to a deadlock.
@Test
def testQuestion12() =
TestUtils.assertDeadlock(
TestHelper.testManySchedules(
2,
scheduler =>
val allNodes =
(for i <- 0 to 6 yield ScheduledNode(i, scheduler)).toList
// Shared by all threads
var sum: Int = 0
def increment(e: Int) = sum += e
(
List(
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes.tail.appended(nodes(0))
lockFun(nodes, increment)
,
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes.tail.appended(nodes(0))
lockFun(nodes, increment),
),
results => (true, "")
)
)
)
// sum returns wrong answer because there is a data race on the sum variable.
@Test(expected = classOf[AssertionError])
def testQuestion13() =
TestHelper.testManySchedules(
2,
scheduler =>
val allNodes =
(for i <- 0 to 6 yield ScheduledNode(i, scheduler)).toList
// Shared by all threads
var sum: Int = 0
def increment(e: Int) =
val previousSum = scheduler.exec { sum }("Get sum")
scheduler.exec { sum = previousSum + e }("Write sum")
(
List(
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment)
,
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment),
),
results =>
if sum != 20 then
(false, f"Wrong sum: expected 20 but got $sum")
else
(true, "")
)
)
// sum value will be correct here because "sum += e" is protected by a lock.
@Test
def testQuestion14() =
TestHelper.testManySchedules(
2,
sched =>
val allNodes = (for i <- 0 to 6 yield ScheduledNode(i, sched)).toList
val monitor = new MockedMonitor: // Monitor is a type of a lock.
def scheduler = sched
// Shared by all threads
var sum: Int = 0
def increment(e: Int) =
monitor.synchronized { sum += e }
(
List(
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment)
,
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment),
),
results =>
if sum != 20 then
(false, f"Wrong sum: expected 20 but got $sum")
else
(true, "")
)
)
// total will give correct output here as it is an atomic instruction.
@Test
def testQuestion15() =
TestHelper.testManySchedules(
2,
sched =>
val allNodes = (for i <- 0 to 6 yield ScheduledNode(i, sched)).toList
// Shared by all threads
var total: AtomicInteger = new AtomicInteger(0)
def increment(e: Int) =
total.addAndGet(e)
(
List(
() =>
// Thread 1
var nodes: List[Node] =
List(allNodes(1), allNodes(3), allNodes(2), allNodes(4))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment)
,
() =>
// Thread 2
var nodes: List[Node] =
List(allNodes(5), allNodes(2), allNodes(3))
nodes = nodes.sortWith((x, y) => x.guid < y.guid)
lockFun(nodes, increment),
),
results =>
if total.get != 20 then
(false, f"Wrong total: expected 20 but got $total")
else
(true, "")
)
)
class ScheduledNode(value: Int, val scheduler: Scheduler) extends Node(value)
with MockedMonitor
package midterm22
import org.junit.*
import org.junit.Assert.*
import instrumentation.*
class Part6Test:
@Test(expected = classOf[AssertionError])
def testQuestion21() =
TestHelper.testManySchedules(
2,
scheduler =>
val ticketsManager = ScheduledTicketsManager(1, scheduler)
(
List(
() =>
// Thread 1
ticketsManager.getTicket(),
() =>
// Thread 2
ticketsManager.getTicket()
),
results =>
if ticketsManager.remainingTickets < 0 then
(false, "Sold more tickets than available!")
else (true, "")
)
)
class ScheduledTicketsManager(totalTickets: Int, val scheduler: Scheduler)
extends TicketsManager(totalTickets)
with MockedMonitor
package midterm22
import org.junit.*
import org.junit.Assert.*
import instrumentation.*
class Part7Test:
@Test
def testNicManagerSequential() =
val nicsManager = NICManager(4)
assertEquals((0, 1), nicsManager.assignNICs())
assertEquals((2, 3), nicsManager.assignNICs())
@Test
def testQuestion22() =
testNicManagerParallel(2, 3)
@Test
def testQuestion23() =
val nicsManager = NICManager(2)
// Thread 1
assertEquals((0, 1), nicsManager.assignNICs())
nicsManager.nics(0).assigned = false
nicsManager.nics(1).assigned = false
// Thread 2
assertEquals((0, 1), nicsManager.assignNICs())
nicsManager.nics(0).assigned = false
nicsManager.nics(1).assigned = false
@Test
def testQuestion24() =
testNicManagerParallel(3, 2, true)
@Test
def testQuestion24NotLimitingRecvNICs() =
TestUtils.assertMaybeDeadlock(
testNicManagerParallel(3, 2)
)
def testNicManagerParallel(
threads: Int,
nics: Int,
limitRecvNICs: Boolean = false
) =
TestHelper.testManySchedules(
threads,
scheduler =>
val nicsManager = ScheduledNicsManager(nics, scheduler)
val tasks =
for i <- 0 until threads yield () =>
// Thread i
val (recvNIC, sendNIC) = nicsManager.assignNICs(limitRecvNICs)
// Do something with NICs...
// Un-assign NICs
nicsManager.nics(recvNIC).assigned = false
nicsManager.nics(sendNIC).assigned = false
(
tasks.toList,
results =>
if nicsManager.nics.count(_.assigned) != 0 then
(false, f"All NICs should have been released.")
else (true, "")
)
)
class ScheduledNicsManager(n: Int, scheduler: Scheduler)
extends NICManager(n):
class ScheduledNIC(
_index: Int,
_assigned: Boolean,
val scheduler: Scheduler
) extends NIC(_index, _assigned)
with MockedMonitor:
override def index = scheduler.exec { super.index }(
"",
Some(res => f"read NIC.index == $res")
)
override def assigned = scheduler.exec { super.assigned }(
"",
Some(res => f"read NIC.assigned == $res")
)
override def assigned_=(v: Boolean) =
scheduler.exec { super.assigned = v }(
f"write NIC.assigned = $v"
)
override val nics =
(for i <- 0 until n yield ScheduledNIC(i, false, scheduler)).toList
package midterm22
import org.junit.*
import org.junit.Assert.*
import instrumentation.*
import annotation.nowarn
import scala.collection.concurrent.TrieMap
import scala.collection.concurrent.{TrieMap, Map}
class Part8Test:
@Test
def usage() =
val insta = Instagram()
assertEquals(1, insta.add())
assertEquals(2, insta.add())
insta.follow(1, 2)
assertEquals(insta.graph, Map(1 -> List(2), 2 -> List()))
insta.follow(2, 1)
insta.unfollow(1, 2)
assertEquals(insta.graph, Map(1 -> List(), 2 -> List(1)))
insta.follow(3, 1) // fails silently
assertEquals(insta.graph, Map(1 -> List(), 2 -> List(1)))
insta.remove(1)
assertEquals(insta.graph, Map(2 -> List()))
insta.unfollow(1, 2) // fails silently
@Test
def testParallelFollowABRemoveA() =
TestHelper.testManySchedules(
2,
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
val u1 = insta.add()
val u2 = insta.add()
(
List(
() =>
// Thread 1
insta.follow(u1, u2),
() =>
// Thread 2
insta.remove(u1)
),
results =>
val size = insta.graph.size
if size != 1 then
(false, f"Wrong number of user: expected 1 but got ${size}")
else validateGraph(insta)
)
)
@Test
def testParallelFollowABRemoveB() =
TestHelper.testManySchedules(
2,
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
val u1 = insta.add()
val u2 = insta.add()
(
List(
() =>
// Thread 1
insta.follow(u1, u2),
() =>
// Thread 2
insta.remove(u2)
),
results =>
val size = insta.graph.size
if size != 1 then
(false, f"Wrong number of user: expected 1 but got ${size}")
else validateGraph(insta)
)
)
@Test
def testParallelFollowACRemoveB() =
TestHelper.testManySchedules(
2,
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
val u1 = insta.add()
val u2 = insta.add()
val u3 = insta.add()
insta.follow(u1, u2)
(
List(
() =>
// Thread 1
insta.follow(u1, u3),
() =>
// Thread 2
insta.remove(u2)
),
results =>
val size = insta.graph.size
if size != 2 then
(false, f"Wrong number of user: expected 2 but got ${size}")
else validateGraph(insta)
)
)
@Test
def testParallelFollow() =
TestHelper.testManySchedules(
2,
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
val u1 = insta.add()
val u2 = insta.add()
val u3 = insta.add()
(
List(
() =>
// Thread 1
insta.follow(u1, u2),
() =>
// Thread 2
insta.follow(u1, u3)
),
results =>
val u1FollowingSize = insta.graph(u1).size
if u1FollowingSize != 2 then
(
false,
f"Wrong number of users followed by user 1: expected 2 but got ${u1FollowingSize}"
)
else validateGraph(insta)
)
)
@Test
def testParallelRemove() =
TestHelper.testManySchedules(
2,
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
// Setup
val u1 = insta.add()
val u2 = insta.add()
val u3 = insta.add()
insta.follow(u1, u2)
insta.follow(u2, u1)
insta.follow(u2, u3)
insta.follow(u3, u1)
(
List(
() =>
// Thread 1
insta.remove(u2),
() =>
// Thread 2
insta.remove(u3)
),
results =>
val size = insta.graph.size
if size != 1 then
(false, f"Wrong number of user: expected 1 but got ${size}")
else validateGraph(insta)
)
)
// We test wrong code here, so we expect an assertion error. You can replace
// the next line by `@Test` if you want to see the error with the failing
// schedule.
@Test(expected = classOf[AssertionError])
def testParallelWrongAdd() =
TestHelper.testManySchedules(
2,
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
// This implementation of `add` is wrong, because two threads might
// allocate the same id.
// Consider the following schedule:
// T1: res = 1
// T2: res = 2
// T2: graph.update(2, Nil)
// T2: 2
// T1: graph.update(2, Nil)
// T1: 2
override def add(): Int =
val res = maxId.incrementAndGet
graph.update(maxId.get, Nil)
res
(
List(
() =>
// Thread 1
insta.add(),
() =>
// Thread 2
insta.add()
),
results =>
if results(0) != results(1) then
(false, f"Allocated twice id ${results(0)}")
else validateGraph(insta)
)
)
// We test wrong code here, so we expect an assertion error. You can replace
// the next line by `@Test` if you want to see the error with the failing
// schedule.
@Test(expected = classOf[AssertionError])
def testParallelWrongRemove() =
TestHelper.testManySchedules(
2,
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
// This implementation of `remove` is wrong because we don't retry to
// call `graph.replace` when it fails. Therefore, user 1 might end up
// following user 2 that has been removed, or not following user 3
// which is concurrently followed.
override def remove(idToRemove: Int): Unit =
graph.remove(idToRemove)
for (key, value) <- graph do
graph.replace(key, value, value.filter(_ != idToRemove))
// Note: writing `graph(key) = value.filter(_ != idToRemove)` would also
// be wrong because it does not check the previous value.
// Therefore, it could erase a concurrent update.
val u1 = insta.add()
val u2 = insta.add()
val u3 = insta.add()
insta.follow(u1, u2)
(
List(
() =>
// Thread 1
insta.follow(u1, u3),
() =>
// Thread 2
insta.remove(u2)
),
results =>
val size = insta.graph.size
if insta.graph(u1).size != 1 then
(
false,
f"Wrong number of users followed by 1: expected 1 but got ${insta.graph(u1)}"
)
else validateGraph(insta)
)
)
// We test wrong code here, so we expect an assertion error. You can replace
// the next line by `@Test` if you want to see the error with the failing
// schedule.
@Test(expected = classOf[AssertionError])
def testParallelWrongUnfollow() =
TestHelper.testManySchedules(
2,
scheduler =>
val insta = new Instagram:
override val graph =
ScheduledTrieMap(TrieMap[Int, List[Int]](), scheduler)
override def unfollow(a: Int, b: Int): Unit =
if !graph.contains(a) then return
val prev = graph(a) // Might throw java.util.NoSuchElementException
if !graph.replace(a, prev, prev.filter(_ != b)) then unfollow(a, b)
val u1 = insta.add()
val u2 = insta.add()
insta.follow(u1, u2)
(
List(
() =>
// Thread 1
insta.unfollow(u1, u2),
() =>
// Thread 2
insta.remove(u1)
),
results =>
val size = insta.graph.size
if size != 1 then
(false, f"Wrong number of user: expected 1 but got ${size}")
else validateGraph(insta)
)
)
@nowarn
def validateGraph(insta: Instagram): (Boolean, String) =
for (a, following) <- insta.graph; b <- following do
if !insta.graph.contains(b) then
return (false, f"User $a follows non-existing user $b")
(true, "")
final class ScheduledIterator[T](
private val myIterator: Iterator[T],
private val scheduler: Scheduler
) extends Iterator[T]:
override def hasNext =
myIterator.hasNext
override def next() =
scheduler.exec(myIterator.next)("", Some(res => f"Iterator.next == $res"))
override def knownSize: Int =
myIterator.knownSize
final class ScheduledTrieMap[K, V](
private val myMap: Map[K, V],
private val scheduler: Scheduler
) extends Map[K, V]:
override def apply(key: K): V =
scheduler.exec(myMap(key))(
"",
Some(res => f"TrieMap.apply($key) == $res")
)
override def contains(key: K): Boolean =
scheduler.exec(myMap.contains(key))(
"",
Some(res => f"TrieMap.contains($key) == $res")
)
override def get(key: K): Option[V] =
scheduler.exec(myMap.get(key))(
"",
Some(res => f"TrieMap.get($key) == $res")
)
override def addOne(kv: (K, V)) =
scheduler.exec(myMap.addOne(kv))(f"TrieMap.addOne($kv)")
this
override def subtractOne(k: K) =
scheduler.exec(myMap.subtractOne(k))(f"TrieMap.subtractOne($k)")
this
override def iterator() =
scheduler.log("TrieMap.iterator")
ScheduledIterator(myMap.iterator, scheduler)
override def replace(k: K, v: V): Option[V] =
scheduler.exec(myMap.replace(k, v))(
"",
Some(res => f"TrieMap.replace($k, $v) == $res")
)
override def replace(k: K, oldvalue: V, newvalue: V): Boolean =
scheduler.exec(myMap.replace(k, oldvalue, newvalue))(
"",
Some(res => f"TrieMap.replace($k, $oldvalue, $newvalue) == $res")
)
override def putIfAbsent(k: K, v: V): Option[V] =
scheduler.exec(myMap.putIfAbsent(k, v))(
"",
Some(res => f"TrieMap.putIfAbsent($k, $v)")
)
override def remove(k: K): Option[V] =
scheduler.exec(myMap.remove(k))(
"",
Some(res => f"TrieMap.remove($k)")
)
override def remove(k: K, v: V): Boolean =
scheduler.exec(myMap.remove(k, v))(
"",
Some(res => f"TrieMap.remove($k, $v)")
)
package midterm23
import instrumentation.*
import java.util.concurrent.atomic.AtomicInteger
class BarberShopSolutionTest extends munit.FunSuite:
val implementations =
Map[String, (Int, Scheduler) => ScheduledBarberShopSolution](
"BarberShopSolution1" -> (new BarberShopSolution1(_)
with ScheduledBarberShopSolution(_)),
"BarberShopSolution2" -> (new BarberShopSolution2(_)
with ScheduledBarberShopSolution(_)),
"BarberShopSolution3" -> (new BarberShopSolution3(_)
with ScheduledBarberShopSolution(_))
)
for (name, makeShop) <- implementations do
for nCustomers <- 1 to 3 do
test(f"$name with $nCustomers customer(s)") {
testBarberShop(nCustomers, makeShop)
}