Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • lara/cs206-demos
  • gcharles/cs206-demos
  • gambhir/cs206-demos
3 results
Show changes
Showing
with 3040 additions and 104 deletions
package concpar21final03.instrumentation
class AtomicReference[T](initial: T):
private val atomic =
new java.util.concurrent.atomic.AtomicReference[T](initial)
def get: T = atomic.get()
def set(value: T): Unit = atomic.set(value)
package concpar21final03.instrumentation
trait MockedMonitor extends Monitor:
def scheduler: Scheduler
// Can be overriden.
override def waitDefault() =
scheduler.log("wait")
scheduler updateThreadState Wait(this, scheduler.threadLocks.tail)
override def synchronizedDefault[T](toExecute: => T): T =
scheduler.log("synchronized check")
val prevLocks = scheduler.threadLocks
scheduler updateThreadState Sync(
this,
prevLocks
) // If this belongs to prevLocks, should just continue.
scheduler.log("synchronized -> enter")
try toExecute
finally
scheduler updateThreadState Running(prevLocks)
scheduler.log("synchronized -> out")
override def notifyDefault() =
scheduler mapOtherStates { state =>
state match
case Wait(lockToAquire, locks) if lockToAquire == this =>
SyncUnique(this, state.locks)
case e => e
}
scheduler.log("notify")
override def notifyAllDefault() =
scheduler mapOtherStates { state =>
state match
case Wait(lockToAquire, locks) if lockToAquire == this =>
Sync(this, state.locks)
case SyncUnique(lockToAquire, locks) if lockToAquire == this =>
Sync(this, state.locks)
case e => e
}
scheduler.log("notifyAll")
trait LockFreeMonitor extends Monitor:
override def waitDefault() =
throw new Exception("Please use lock-free structures and do not use wait()")
override def synchronizedDefault[T](toExecute: => T): T =
throw new Exception(
"Please use lock-free structures and do not use synchronized()"
)
override def notifyDefault() =
throw new Exception(
"Please use lock-free structures and do not use notify()"
)
override def notifyAllDefault() =
throw new Exception(
"Please use lock-free structures and do not use notifyAll()"
)
abstract class ThreadState:
def locks: Seq[AnyRef]
trait CanContinueIfAcquiresLock extends ThreadState:
def lockToAquire: AnyRef
case object Start extends ThreadState:
def locks: Seq[AnyRef] = Seq.empty
case object End extends ThreadState:
def locks: Seq[AnyRef] = Seq.empty
case class Wait(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState
case class SyncUnique(lockToAquire: AnyRef, locks: Seq[AnyRef])
extends ThreadState
with CanContinueIfAcquiresLock
case class Sync(lockToAquire: AnyRef, locks: Seq[AnyRef])
extends ThreadState
with CanContinueIfAcquiresLock
case class Running(locks: Seq[AnyRef]) extends ThreadState
case class VariableReadWrite(locks: Seq[AnyRef]) extends ThreadState
package concpar21final03.instrumentation
import java.util.concurrent.*;
import scala.concurrent.duration.*
import scala.collection.mutable.*
import Stats.*
import java.util.concurrent.atomic.AtomicInteger
sealed abstract class Result
case class RetVal(rets: List[Any]) extends Result
case class Except(msg: String, stackTrace: Array[StackTraceElement])
extends Result
case class Timeout(msg: String) extends Result
/** A class that maintains schedule and a set of thread ids. The schedules are
* advanced after an operation of a SchedulableBuffer is performed. Note: the
* real schedule that is executed may deviate from the input schedule due to
* the adjustments that had to be made for locks
*/
class Scheduler(sched: List[Int]):
val maxOps =
500 // a limit on the maximum number of operations the code is allowed to perform
private var schedule = sched
private var numThreads = 0
private val realToFakeThreadId = Map[Long, Int]()
private val opLog =
ListBuffer[String]() // a mutable list (used for efficient concat)
private val threadStates = Map[Int, ThreadState]()
/** Runs a set of operations in parallel as per the schedule. Each operation
* may consist of many primitive operations like reads or writes to shared
* data structure each of which should be executed using the function `exec`.
* @timeout
* in milliseconds
* @return
* true - all threads completed on time, false -some tests timed out.
*/
def runInParallel(timeout: Long, ops: List[() => Any]): Result =
numThreads = ops.length
val threadRes = Array.fill(numThreads) { None: Any }
var exception: Option[Except] = None
val syncObject = new Object()
var completed = new AtomicInteger(0)
// create threads
val threads = ops.zipWithIndex.map { case (op, i) =>
new Thread(
new Runnable():
def run(): Unit =
val fakeId = i + 1
setThreadId(fakeId)
try
updateThreadState(Start)
val res = op()
updateThreadState(End)
threadRes(i) = res
// notify the master thread if all threads have completed
if completed.incrementAndGet() == ops.length then
syncObject.synchronized { syncObject.notifyAll() }
catch
case e: Throwable
if exception != None => // do nothing here and silently fail
case e: Throwable =>
log(s"throw ${e.toString}")
exception = Some(
Except(
s"Thread $fakeId crashed on the following schedule: \n" + opLog
.mkString("\n"),
e.getStackTrace
)
)
syncObject.synchronized { syncObject.notifyAll() }
// println(s"$fakeId: ${e.toString}")
// Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads)
)
}
// start all threads
threads.foreach(_.start())
// wait for all threads to complete, or for an exception to be thrown, or for the time out to expire
var remTime = timeout
syncObject.synchronized {
timed { if completed.get() != ops.length then syncObject.wait(timeout) } {
time => remTime -= time
}
}
if exception.isDefined then exception.get
else if remTime <= 1
then // timeout ? using 1 instead of zero to allow for some errors
Timeout(opLog.mkString("\n"))
else
// every thing executed normally
RetVal(threadRes.toList)
// Updates the state of the current thread
def updateThreadState(state: ThreadState): Unit =
val tid = threadId
synchronized {
threadStates(tid) = state
}
state match
case Sync(lockToAquire, locks) =>
if locks.indexOf(lockToAquire) < 0 then waitForTurn
else
// Re-aqcuiring the same lock
updateThreadState(Running(lockToAquire +: locks))
case Start => waitStart()
case End => removeFromSchedule(tid)
case Running(_) =>
case _ => waitForTurn // Wait, SyncUnique, VariableReadWrite
def waitStart(): Unit =
// while (threadStates.size < numThreads) {
// Thread.sleep(1)
// }
synchronized {
if threadStates.size < numThreads then wait()
else notifyAll()
}
def threadLocks =
synchronized {
threadStates(threadId).locks
}
def threadState =
synchronized {
threadStates(threadId)
}
def mapOtherStates(f: ThreadState => ThreadState) =
val exception = threadId
synchronized {
for k <- threadStates.keys if k != exception do
threadStates(k) = f(threadStates(k))
}
def log(str: String) =
if (realToFakeThreadId contains Thread.currentThread().getId()) then
val space = (" " * ((threadId - 1) * 2))
val s =
space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + " ")
opLog += s
/** Executes a read or write operation to a global data structure as per the
* given schedule
* @param msg
* a message corresponding to the operation that will be logged
*/
def exec[T](
primop: => T
)(msg: => String, postMsg: => Option[T => String] = None): T =
if !(realToFakeThreadId contains Thread.currentThread().getId()) then primop
else
updateThreadState(VariableReadWrite(threadLocks))
val m = msg
if m != "" then log(m)
if opLog.size > maxOps then
throw new Exception(
s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!"
)
val res = primop
postMsg match
case Some(m) => log(m(res))
case None =>
res
private def setThreadId(fakeId: Int) = synchronized {
realToFakeThreadId(Thread.currentThread.getId) = fakeId
}
def threadId =
try realToFakeThreadId(Thread.currentThread().getId())
catch
case e: NoSuchElementException =>
throw new Exception(
"You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!"
)
private def isTurn(tid: Int) = synchronized {
(!schedule.isEmpty && schedule.head != tid)
}
def canProceed(): Boolean =
val tid = threadId
canContinue match
case Some((i, state)) if i == tid =>
// println(s"$tid: Runs ! Was in state $state")
canContinue = None
state match
case Sync(lockToAquire, locks) =>
updateThreadState(Running(lockToAquire +: locks))
case SyncUnique(lockToAquire, locks) =>
mapOtherStates {
_ match
case SyncUnique(lockToAquire2, locks2)
if lockToAquire2 == lockToAquire =>
Wait(lockToAquire2, locks2)
case e => e
}
updateThreadState(Running(lockToAquire +: locks))
case VariableReadWrite(locks) => updateThreadState(Running(locks))
true
case Some((i, state)) =>
// println(s"$tid: not my turn but $i !")
false
case None =>
false
var threadPreference =
0 // In the case the schedule is over, which thread should have the preference to execute.
/** returns true if the thread can continue to execute, and false otherwise */
def decide(): Option[(Int, ThreadState)] =
if !threadStates.isEmpty
then // The last thread who enters the decision loop takes the decision.
// println(s"$threadId: I'm taking a decision")
if threadStates.values.forall {
case e: Wait => true
case _ => false
}
then
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
throw new Exception(
s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them."
)
else
// Threads can be in Wait, Sync, SyncUnique, and VariableReadWrite mode.
// Let's determine which ones can continue.
val notFree =
threadStates.collect { case (id, state) => state.locks }.flatten.toSet
val threadsNotBlocked = threadStates.toSeq.filter {
case (id, v: VariableReadWrite) => true
case (id, v: CanContinueIfAcquiresLock) =>
!notFree(v.lockToAquire) || (v.locks contains v.lockToAquire)
case _ => false
}
if threadsNotBlocked.isEmpty then
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
val whoHasLock = threadStates.toSeq.flatMap { case (id, state) =>
state.locks.map(lock => (lock, id))
}.toMap
val reason = threadStates
.collect {
case (id, state: CanContinueIfAcquiresLock)
if !notFree(state.lockToAquire) =>
s"Thread $id is waiting on lock ${state.lockToAquire} held by thread ${whoHasLock(state.lockToAquire)}"
}
.mkString("\n")
throw new Exception(
s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason"
)
else if threadsNotBlocked.size == 1
then // Do not consume the schedule if only one thread can execute.
Some(threadsNotBlocked(0))
else
val next = schedule.indexWhere(t =>
threadsNotBlocked.exists { case (id, state) => id == t }
)
if next != -1 then
// println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}")
val chosenOne = schedule(
next
) // TODO: Make schedule a mutable list.
schedule = schedule.take(next) ++ schedule.drop(next + 1)
Some((chosenOne, threadStates(chosenOne)))
else
threadPreference = (threadPreference + 1) % threadsNotBlocked.size
val chosenOne = threadsNotBlocked(
threadPreference
) // Maybe another strategy
Some(chosenOne)
// threadsNotBlocked.indexOf(threadId) >= 0
/*
val tnb = threadsNotBlocked.map(_._1).mkString(",")
val s = if (schedule.isEmpty) "empty" else schedule.mkString(",")
val only = if (schedule.isEmpty) "" else " only"
throw new Exception(s"The schedule is $s but$only threads ${tnb} can continue")*/
else canContinue
/** This will be called before a schedulable operation begins. This should not
* use synchronized
*/
var numThreadsWaiting = new AtomicInteger(0)
// var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice.
var canContinue: Option[(Int, ThreadState)] =
None // The result of the decision thread Id of the thread authorized to continue.
private def waitForTurn =
synchronized {
if numThreadsWaiting.incrementAndGet() == threadStates.size then
canContinue = decide()
notifyAll()
// waitingForDecision(threadId) = Some(numThreadsWaiting)
// println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}")
while !canProceed() do wait()
}
numThreadsWaiting.decrementAndGet()
/** To be invoked when a thread is about to complete
*/
private def removeFromSchedule(fakeid: Int) = synchronized {
// println(s"$fakeid: I'm taking a decision because I finished")
schedule = schedule.filterNot(_ == fakeid)
threadStates -= fakeid
if numThreadsWaiting.get() == threadStates.size then
canContinue = decide()
notifyAll()
}
def getOperationLog() = opLog
/* Copyright 2009-2015 EPFL, Lausanne */
package concpar21final03.instrumentation
import java.lang.management.*
/** A collection of methods that can be used to collect run-time statistics */
object Stats:
def timed[T](code: => T)(cont: Long => Unit): T =
var t1 = System.currentTimeMillis()
val r = code
cont((System.currentTimeMillis() - t1))
r
def withTime[T](code: => T): (T, Long) =
var t1 = System.currentTimeMillis()
val r = code
(r, (System.currentTimeMillis() - t1))
package concpar21final03.instrumentation
import scala.util.Random
import scala.collection.mutable.{Map as MutableMap}
import Stats.*
object TestHelper:
val noOfSchedules = 10000 // set this to 100k during deployment
val readWritesPerThread =
20 // maximum number of read/writes possible in one thread
val contextSwitchBound = 10
val testTimeout = 150 // the total time out for a test in seconds
val schedTimeout =
15 // the total time out for execution of a schedule in secs
// Helpers
/*def testManySchedules(op1: => Any): Unit = testManySchedules(List(() => op1))
def testManySchedules(op1: => Any, op2: => Any): Unit = testManySchedules(List(() => op1, () => op2))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any, op4: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3, () => op4))*/
def testSequential[T](
ops: Scheduler => Any
)(assertions: T => (Boolean, String)) =
testManySchedules(
1,
(sched: Scheduler) =>
(
List(() => ops(sched)),
(res: List[Any]) => assertions(res.head.asInstanceOf[T])
)
)
/** @numThreads
* number of threads
* @ops
* operations to be executed, one per thread
* @assertion
* as condition that will executed after all threads have completed
* (without exceptions) the arguments are the results of the threads
*/
def testManySchedules(
numThreads: Int,
ops: Scheduler => (
List[() => Any], // Threads
List[Any] => (Boolean, String)
) // Assertion
) =
var timeout = testTimeout * 1000L
val threadIds = (1 to numThreads)
// (1 to scheduleLength).flatMap(_ => threadIds).toList.permutations.take(noOfSchedules).foreach {
val schedules = (new ScheduleGenerator(numThreads)).schedules()
var schedsExplored = 0
schedules
.takeWhile(_ => schedsExplored <= noOfSchedules && timeout > 0)
.foreach {
// case _ if timeout <= 0 => // break
case schedule =>
schedsExplored += 1
val schedr = new Scheduler(schedule)
// println("Exploring Sched: "+schedule)
val (threadOps, assertion) = ops(schedr)
if threadOps.size != numThreads then
throw new IllegalStateException(
s"Number of threads: $numThreads, do not match operations of threads: $threadOps"
)
timed { schedr.runInParallel(schedTimeout * 1000, threadOps) } { t =>
timeout -= t
} match
case Timeout(msg) =>
throw new java.lang.AssertionError(
"assertion failed\n" + "The schedule took too long to complete. A possible deadlock! \n" + msg
)
case Except(msg, stkTrace) =>
val traceStr = "Thread Stack trace: \n" + stkTrace
.map(" at " + _.toString)
.mkString("\n")
throw new java.lang.AssertionError(
"assertion failed\n" + msg + "\n" + traceStr
)
case RetVal(threadRes) =>
// check the assertion
val (success, custom_msg) = assertion(threadRes)
if !success then
val msg =
"The following schedule resulted in wrong results: \n" + custom_msg + "\n" + schedr
.getOperationLog()
.mkString("\n")
throw new java.lang.AssertionError("Assertion failed: " + msg)
}
if timeout <= 0 then
throw new java.lang.AssertionError(
"Test took too long to complete! Cannot check all schedules as your code is too slow!"
)
/** A schedule generator that is based on the context bound
*/
class ScheduleGenerator(numThreads: Int):
val scheduleLength = readWritesPerThread * numThreads
val rands =
(1 to scheduleLength).map(i =>
new Random(0xcafe * i)
) // random numbers for choosing a thread at each position
def schedules(): LazyList[List[Int]] =
var contextSwitches = 0
var contexts =
List[Int]() // a stack of thread ids in the order of context-switches
val remainingOps = MutableMap[Int, Int]()
remainingOps ++= (1 to numThreads).map(i =>
(i, readWritesPerThread)
) // num ops remaining in each thread
val liveThreads = (1 to numThreads).toSeq.toBuffer
/** Updates remainingOps and liveThreads once a thread is chosen for a
* position in the schedule
*/
def updateState(tid: Int): Unit =
val remOps = remainingOps(tid)
if remOps == 0 then liveThreads -= tid
else remainingOps += (tid -> (remOps - 1))
val schedule = rands.foldLeft(List[Int]()) {
case (acc, r) if contextSwitches < contextSwitchBound =>
val tid = liveThreads(r.nextInt(liveThreads.size))
contexts match
case prev :: tail
if prev != tid => // we have a new context switch here
contexts +:= tid
contextSwitches += 1
case prev :: tail =>
case _ => // init case
contexts +:= tid
updateState(tid)
acc :+ tid
case (
acc,
_
) => // here context-bound has been reached so complete the schedule without any more context switches
if !contexts.isEmpty then
contexts = contexts.dropWhile(remainingOps(_) == 0)
val tid = contexts match
case top :: tail => top
case _ =>
liveThreads(
0
) // here, there has to be threads that have not even started
updateState(tid)
acc :+ tid
}
schedule #:: schedules()
package concpar21final03.instrumentation
import scala.concurrent.*
import scala.concurrent.duration.*
import scala.concurrent.ExecutionContext.Implicits.global
object TestUtils:
def failsOrTimesOut[T](action: => T): Boolean =
val asyncAction = Future {
action
}
try Await.result(asyncAction, 2000.millisecond)
catch case _: Throwable => return true
return false
package concpar21final03
import instrumentation.*
class SchedulableThreadMap[A](val scheduler: Scheduler)
extends ThreadMap[A]
with MockedMonitor:
override def currentThreadHasValue: Boolean = scheduler.exec {
super.currentThreadHasValue
}("", Some(res => s"currentThreadHasValue is $res"))
override def currentThreadValue: Option[A] = scheduler.exec {
super.currentThreadValue
}("", Some(res => s"currentThreadValue is $res"))
override def setCurrentThreadValue(value: A): Unit = scheduler.exec {
super.setCurrentThreadValue(value)
}(s"setCurrentThreadValue($value)")
override def deleteCurrentThreadValue(): Unit = scheduler.exec {
super.deleteCurrentThreadValue()
}("deleteCurrentThreadValue()")
override def waitForall(predicate: A => Boolean): Unit = scheduler.exec {
super.waitForall(predicate)
}("waitForall")
def allValues: List[A] = synchronized {
theMap.values.toList
}
end SchedulableThreadMap
class SchedulableRCU(scheduler: Scheduler) extends RCU with LockFreeMonitor:
override protected val latestVersion =
SchedulableAtomicLong(0, scheduler, "latestVersion")
override protected val readersVersion: ThreadMap[Long] = SchedulableThreadMap(
scheduler
)
class SchedulableInMemoryFileSystem(scheduler: Scheduler)
extends InMemoryFileSystem:
override def createFile(file: FileName, content: String): Unit =
scheduler.exec {
super.createFile(file, content)
}(s"createFile($file)")
override def readFile(file: FileName): String = scheduler.exec {
super.readFile(file)
}(s"readFile($file)")
override def deleteFile(file: FileName): Unit = scheduler.exec {
super.deleteFile(file)
}(s"deleteFile($file)")
class SchedulableUpdateServer(scheduler: Scheduler, fs: InMemoryFileSystem)
extends UpdateServer(fs)
with LockFreeMonitor:
override val rcu = SchedulableRCU(scheduler)
class SchedulableAtomicLong(initial: Long, scheduler: Scheduler, name: String)
extends AtomicLong(initial):
override def get: Long = scheduler.exec {
super.get
}(s"", Some(res => s"$name: get $res"))
override def set(value: Long): Unit = scheduler.exec {
super.set(value)
}(s"$name: set $value", None)
override def incrementAndGet(): Long = scheduler.exec {
super.incrementAndGet()
}(s"", Some(res => s"$name: incrementAndGet $res"))
override def getAndIncrement(): Long = scheduler.exec {
super.getAndIncrement()
}(s"", Some(res => s"$name: getandIncrement $res"))
override def compareAndSet(expected: Long, newValue: Long): Boolean =
scheduler.exec {
super.compareAndSet(expected, newValue)
}(
s"$name: compareAndSet(expected = $expected, newValue = $newValue)",
Some(res => s"$name: Did it set? $res")
)
end SchedulableAtomicLong
class SchedulableAtomicReference[T](
initial: T,
scheduler: Scheduler,
name: String
) extends AtomicReference(initial):
override def get: T = scheduler.exec {
super.get
}(s"", Some(res => s"$name: get $res"))
override def set(value: T): Unit = scheduler.exec {
super.set(value)
}(s"$name: set $value", None)
package concpar22final01
import java.util.concurrent.*
import scala.util.DynamicVariable
class Problem1Suite extends AbstractProblem1Suite:
test("[Public] fetch simple result without combining (2pts)") {
val combiner1 = new DLLCombinerTest
combiner1 += 7
combiner1 += 2
combiner1 += 3
combiner1 += 8
combiner1 += 1
combiner1 += 2
combiner1 += 3
combiner1 += 8
val result = combiner1.result()
val array = Array(7, 2, 3, 8, 1, 2, 3, 8)
assert(Range(0, array.size).forall(i => array(i) == result(i)))
}
test("[Public] fetch result without combining (2pts)") {
val combiner1 = new DLLCombinerTest
combiner1 += 7
combiner1 += 2
combiner1 += 3
combiner1 += 8
combiner1 += 1
val result = combiner1.result()
val array = Array(7, 2, 3, 8, 1)
assert(Range(0, array.size).forall(i => array(i) == result(i)))
}
test("[Public] fetch result after simple combining (2pts)") {
val combiner1 = new DLLCombinerTest
combiner1 += 7
combiner1 += 2
val combiner2 = new DLLCombinerTest
combiner2 += 3
combiner2 += 8
val combiner3 = new DLLCombinerTest
combiner3 += 1
combiner3 += 9
val combiner4 = new DLLCombinerTest
combiner4 += 3
combiner4 += 2
val result = combiner1.combine(combiner2).combine(combiner3).combine(
combiner4
).result()
val array = Array(7, 2, 3, 8, 1, 9, 3, 2)
assert(Range(0, array.size).forall(i => array(i) == result(i)))
}
test("[Public] fetch result - small combiner (2pts)") {
val combiner1 = new DLLCombinerTest
combiner1 += 4
combiner1 += 2
combiner1 += 6
val result = combiner1.result()
val array = Array(4, 2, 6)
assert(Range(0, array.size).forall(i => array(i) == result(i)))
}
// (25+) 15 / 250 points for correct implementation, don't check parallelism
test("[Correctness] fetch result - simple combiners (2pts)") {
assertCorrectnessSimple()
}
test("[Correctness] fetch result - small combiners (3pts)") {
assertCorrectnessBasic()
}
test("[Correctness] fetch result - small combiners after combining (5pts)") {
assertCorrectnessCombined()
}
test("[Correctness] fetch result - large combiners (5pts)") {
assertCorrectnessLarge()
}
def assertCorrectnessSimple() =
simpleCombiners.foreach(elem => assert(compare(elem._1, elem._2)))
def assertCorrectnessBasic() =
basicCombiners.foreach(elem => assert(compare(elem._1, elem._2)))
def assertCorrectnessCombined() =
combinedCombiners.foreach(elem => assert(compare(elem._1, elem._2)))
def assertCorrectnessLarge() =
largeCombiners.foreach(elem => assert(compare(elem._1, elem._2)))
// (25+15+) 25 / 250 points for correct parallel implementation, don't check if it's exactly 1/4 of the array per task
private var count = 0
private val expected = 3
override def task[T](body: => T): ForkJoinTask[T] =
count += 1
scheduler.value.schedule(body)
test("[TaskCount] number of newly created tasks should be 3 (5pts)") {
assertTaskCountSimple()
}
test(
"[TaskCount] fetch result and check parallel - simple combiners (5pts)"
) {
assertTaskCountSimple()
assertCorrectnessSimple()
}
test("[TaskCount] fetch result and check parallel - small combiners (5pts)") {
assertTaskCountSimple()
assertCorrectnessBasic()
}
test(
"[TaskCount] fetch result and check parallel - small combiners after combining (5pts)"
) {
assertTaskCountSimple()
assertCorrectnessCombined()
}
test("[TaskCount] fetch result and check parallel - large combiners (5pts)") {
assertTaskCountSimple()
assertCorrectnessLarge()
}
def assertTaskCountSimple(): Unit =
simpleCombiners.foreach(elem => assertTaskCount(elem._1, elem._2))
def assertTaskCount(combiner: DLLCombinerTest, array: Array[Int]): Unit =
try
count = 0
build(combiner, array)
combiner.result()
assertEquals(
count,
expected, {
s"ERROR: Expected $expected instead of $count calls to `task(...)`"
}
)
finally count = 0
// (25+15+25+) 50 / 250 points for correct implementation that uses only next2 and previous2, and not next and previous
test(
"[Skip2] fetch parallel result and check skip2 - simple combiners (10pts)"
) {
assertTaskCountSimple()
assertSkipSimple()
assertCorrectnessSimple()
}
test("[Skip2] fetch result and check skip2 - simple combiners (10pts)") {
assertSkipSimple()
assertCorrectnessSimple()
}
test("[Skip2] fetch result and check skip2 - small combiners (10pts)") {
assertSkipSimple()
assertCorrectnessBasic()
}
test(
"[Skip2] fetch result and check skip2 - small combiners after combining (10pts)"
) {
assertSkipSimple()
assertCorrectnessCombined()
}
test("[Skip2] fetch result and check skip2 - large combiners (10pts)") {
assertSkipSimple()
assertCorrectnessLarge()
}
def assertSkipSimple(): Unit =
simpleCombiners.foreach(elem => assertSkip(elem._1, elem._2))
def assertSkip(combiner: DLLCombinerTest, array: Array[Int]): Unit =
build(combiner, array)
combiner.result()
assertEquals(
combiner.nonSkipped,
false, {
s"ERROR: Calls to 'next' and 'previous' are not allowed! You should only use 'next2` and 'previous2' in your solution."
}
)
// (25+15+25+50+) 75 / 250 points for correct parallel implementation, exactly 1/4 of the array per task
test("[TaskFairness] each task should compute 1/4 of the result (15pts)") {
assertTaskFairness(simpleCombiners.unzip._1)
}
test(
"[TaskFairness] each task should correctly compute 1/4 of the result - simple combiners (15pts)"
) {
assertTaskFairness(simpleCombiners.unzip._1)
assertCorrectnessSimple()
}
test(
"[TaskFairness] each task should correctly compute 1/4 of the result - small combiners (15pts)"
) {
assertTaskFairness(basicCombiners.unzip._1)
assertCorrectnessBasic()
}
test(
"[TaskFairness] each task should correctly compute 1/4 of the result - small combiners after combining (15pts)"
) {
assertTaskFairness(combinedCombiners.unzip._1)
assertCorrectnessCombined()
}
test(
"[TaskFairness] each task should correctly compute 1/4 of the result - large combiners (15pts)"
) {
assertTaskFairness(largeCombiners.unzip._1)
assertCorrectnessLarge()
}
def assertTaskFairness(combiners: List[DLLCombiner]): Unit =
def assertNewTaskFairness(
combiner: DLLCombiner,
task: ForkJoinTask[Unit],
data: Array[Int]
) =
var count = 0
var expected = combiner.size / 4
task.join
count = data.count(elem => elem != 0)
assert((count - expected).abs <= 1)
def assertMainTaskFairness(
combiner: DLLCombiner,
task: Unit,
data: Array[Int]
) =
var count = 0
var expected = combiner.size / 4
count = data.count(elem => elem != 0)
assert((count - expected).abs <= 1)
combiners.foreach { elem =>
var data = Array.fill(elem.size)(0)
assertNewTaskFairness(elem, elem.task1(data), data)
data = Array.fill(elem.size)(0)
assertNewTaskFairness(elem, elem.task2(data), data)
data = Array.fill(elem.size)(0)
assertNewTaskFairness(elem, elem.task3(data), data)
data = Array.fill(elem.size)(0)
assertMainTaskFairness(elem, elem.task4(data), data)
}
// (25+15+25+50+75+) 60 / 250 points for correct parallel implementation, exactly 1/4 of the array per task, exactly the specified quarter
test(
"[TaskPrecision] each task should compute specified 1/4 of the result - simple combiners (10pts)"
) {
assertTaskPrecision(simpleCombiners)
}
test(
"[TaskPrecision] task1 should compute specified 1/4 of the result - simple combiners (5pts)"
) {
assertTaskPrecision1(simpleCombiners)
}
test(
"[TaskPrecision] task2 should compute specified 1/4 of the result - simple combiners (5pts)"
) {
assertTaskPrecision2(simpleCombiners)
}
test(
"[TaskPrecision] task3 should compute specified 1/4 of the result - simple combiners (5pts)"
) {
assertTaskPrecision3(simpleCombiners)
}
test(
"[TaskPrecision] task4 should compute specified 1/4 of the result - simple combiners (5pts)"
) {
assertTaskPrecision4(simpleCombiners)
}
test(
"[TaskPrecision] each task should compute specified 1/4 of the result - other combiners (30pts)"
) {
assertTaskPrecision(basicCombiners)
assertTaskPrecision(combinedCombiners)
assertTaskPrecision(largeCombiners)
}
def assertTaskPrecision(combiners: List[(DLLCombiner, Array[Int])]): Unit =
assertTaskPrecision1(combiners)
assertTaskPrecision2(combiners)
assertTaskPrecision3(combiners)
assertTaskPrecision4(combiners)
def assertTaskPrecision1(combiners: List[(DLLCombiner, Array[Int])]): Unit =
combiners.foreach { elem =>
var data = Array.fill(elem._1.size)(0)
var ref = Array.fill(elem._1.size)(0)
val task1 = elem._1.task1(data)
task1.join
Range(0, elem._1.size).foreach(i =>
(if i < elem._1.size / 2 - 1 && i % 2 == 0 then ref(i) = elem._2(i))
)
assert(Range(0, elem._1.size / 2 - 1).forall(i => data(i) == ref(i)))
}
def assertTaskPrecision2(combiners: List[(DLLCombiner, Array[Int])]): Unit =
combiners.foreach { elem =>
var data = Array.fill(elem._1.size)(0)
var ref = Array.fill(elem._1.size)(0)
val task2 = elem._1.task2(data)
task2.join
Range(0, elem._1.size).foreach(i =>
(if i < elem._1.size / 2 - 1 && i % 2 == 1 then ref(i) = elem._2(i))
)
assert(Range(0, elem._1.size / 2 - 1).forall(i => data(i) == ref(i)))
}
def assertTaskPrecision3(combiners: List[(DLLCombiner, Array[Int])]): Unit =
combiners.foreach { elem =>
var data = Array.fill(elem._1.size)(0)
var ref = Array.fill(elem._1.size)(0)
val task3 = elem._1.task3(data)
task3.join
Range(0, elem._1.size).foreach(i =>
(if i > elem._1.size / 2 + 1 && i % 2 == elem._1.size % 2 then
ref(i) = elem._2(i))
)
assert(Range(elem._1.size / 2 + 2, elem._1.size).forall(i =>
data(i) == ref(i)
))
}
def assertTaskPrecision4(combiners: List[(DLLCombiner, Array[Int])]): Unit =
combiners.foreach { elem =>
var data = Array.fill(elem._1.size)(0)
var ref = Array.fill(elem._1.size)(0)
val task4 = elem._1.task4(data)
Range(0, elem._1.size).foreach(i =>
(if i > elem._1.size / 2 + 1 && i % 2 != elem._1.size % 2 then
ref(i) = elem._2(i))
)
assert(Range(elem._1.size / 2 + 2, elem._1.size).forall(i =>
data(i) == ref(i)
))
}
trait AbstractProblem1Suite extends munit.FunSuite with LibImpl:
def simpleCombiners = buildSimpleCombiners()
def basicCombiners = buildBasicCombiners()
def combinedCombiners = buildCombinedCombiners()
def largeCombiners = buildLargeCombiners()
def buildSimpleCombiners() =
val simpleCombiners = List(
(
new DLLCombinerTest,
Array(4, 2, 6, 1, 5, 4, 3, 5, 6, 3, 4, 5, 6, 3, 4, 5)
),
(
new DLLCombinerTest,
Array(7, 2, 2, 9, 3, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 2)
),
(new DLLCombinerTest, Array.fill(16)(5))
)
simpleCombiners.foreach(elem => build(elem._1, elem._2))
simpleCombiners
def buildBasicCombiners() =
val basicCombiners = List(
(new DLLCombinerTest, Array(4, 2, 6)),
(new DLLCombinerTest, Array(4, 1, 6)),
(
new DLLCombinerTest,
Array(7, 2, 2, 9, 3, 2, 11, 12, 5, 14, 15, 1, 17, 23)
),
(
new DLLCombinerTest,
Array(7, 2, 9, 9, 3, 2, 11, 12, 13, 14, 15, 16, 17, 22)
),
(new DLLCombinerTest, Array.fill(16)(7)),
(new DLLCombinerTest, Array.fill(16)(4)),
(new DLLCombinerTest, Array.fill(5)(3)),
(new DLLCombinerTest, Array.fill(5)(7)),
(new DLLCombinerTest, Array.fill(5)(4))
)
basicCombiners.foreach(elem => build(elem._1, elem._2))
basicCombiners
def buildCombinedCombiners() =
var combinedCombiners = List[(DLLCombiner, Array[Int])]()
Range(1, 10).foreach { n =>
val array = basicCombiners.filter(elem => elem._1.size == n).foldLeft(
Array[Int]()
) {
(acc, i) => acc ++ i._2
}
val empty: DLLCombiner = new DLLCombinerTest
val combiner = basicCombiners.filter(elem => elem._1.size == n).map(
_._1
).foldLeft(empty) {
(acc, c) => acc.combine(c)
}
combinedCombiners = combinedCombiners :+ (combiner, array)
}
combinedCombiners
def buildLargeCombiners() =
val largeCombiners = List(
(new DLLCombinerTest, Array.fill(1321)(4) ++ Array.fill(1322)(7)),
(new DLLCombinerTest, Array.fill(1341)(2) ++ Array.fill(1122)(5)),
(
new DLLCombinerTest,
Array.fill(1321)(4) ++ Array.fill(1322)(7) ++ Array.fill(321)(
4
) ++ Array.fill(322)(7)
),
(new DLLCombinerTest, Array.fill(992321)(4) ++ Array.fill(99322)(7)),
(new DLLCombinerTest, Array.fill(953211)(4) ++ Array.fill(999322)(1))
)
largeCombiners.foreach(elem => build(elem._1, elem._2))
largeCombiners
def build(combiner: DLLCombinerTest, array: Array[Int]): DLLCombinerTest =
array.foreach(elem => combiner += elem)
combiner
def compare(combiner: DLLCombiner, array: Array[Int]): Boolean =
val result = combiner.result()
Range(0, array.size).forall(i => array(i) == result(i))
def buildAndCompare(combiner: DLLCombinerTest, array: Array[Int]): Boolean =
array.foreach(elem => combiner += elem)
val result = combiner.result()
Range(0, array.size).forall(i => array(i) == result(i))
trait LibImpl extends Problem1:
val forkJoinPool = new ForkJoinPool
abstract class TaskScheduler:
def schedule[T](body: => T): ForkJoinTask[T]
class DefaultTaskScheduler extends TaskScheduler:
def schedule[T](body: => T): ForkJoinTask[T] =
val t = new RecursiveTask[T]:
def compute = body
Thread.currentThread match
case wt: ForkJoinWorkerThread =>
t.fork()
case _ =>
forkJoinPool.execute(t)
t
val scheduler = new DynamicVariable[TaskScheduler](new DefaultTaskScheduler)
def task[T](body: => T): ForkJoinTask[T] = scheduler.value.schedule(body)
class NodeTest(val v: Int, val myCombiner: DLLCombinerTest) extends Node(v):
override def getNext: Node =
myCombiner.nonSkipped = true
next
override def getNext2: Node = next2
override def getPrevious: Node =
myCombiner.nonSkipped = true
previous
override def getPrevious2: Node = previous2
override def setNext(n: Node): Unit = next = n
override def setNext2(n: Node): Unit = next2 = n
override def setPrevious(n: Node): Unit = previous = n
override def setPrevious2(n: Node): Unit = previous2 = n
class DLLCombinerTest extends DLLCombinerImplementation:
var nonSkipped = false
override def result(): Array[Int] =
nonSkipped = false
super.result()
override def +=(elem: Int): Unit =
val node = new NodeTest(elem, this)
if size == 0 then
first = node
last = node
size = 1
else
last.setNext(node)
node.setPrevious(last)
node.setPrevious2(last.getPrevious)
if size > 1 then last.getPrevious.setNext2(node)
else second = node
secondToLast = last
last = node
size += 1
override def combine(that: DLLCombiner): DLLCombiner =
if this.size == 0 then that
else if that.size == 0 then this
else
this.last.setNext(that.first)
this.last.setNext2(that.first.getNext)
if this.last.getPrevious != null then
this.last.getPrevious.setNext2(that.first) // important
that.first.setPrevious(this.last)
that.first.setPrevious2(this.last.getPrevious)
if that.first.getNext != null then
that.first.getNext.setPrevious2(this.last) // important
if this.size == 1 then second = that.first
this.size = this.size + that.size
this.last = that.last
this.secondToLast = that.secondToLast
this
package concpar22final02
import scala.concurrent.*
import scala.concurrent.duration.*
import scala.collection.mutable.HashMap
import scala.util.Random
import instrumentation.SchedulableProblem2
import instrumentation.TestHelper.*
import instrumentation.TestUtils.*
import scala.collection.mutable.ArrayBuffer
class Problem2Suite extends munit.FunSuite:
val imageSize = 5
val nThreads = 3
def rowsForThread(threadNumber: Int): Array[Int] =
val start: Int = (imageSize * threadNumber) / nThreads
val end: Int = (imageSize * (threadNumber + 1)) / nThreads
(start until end).toArray
test("Should work when barrier is called by a single thread (10pts)") {
testManySchedules(
1,
sched =>
val temp = new Problem2(imageSize, 1, 1)
(
List(() => temp.barrier(0).countDown()),
results =>
if sched.notifyCount == 0 && sched.notifyAllCount == 0 then
val notifyCount = sched.notifyCount
val notifyAllCount = sched.notifyAllCount
(false, s"No notify call $notifyCount $notifyAllCount")
else if temp.barrier(0).count != 0 then
val count = temp.barrier(0).count
(false, s"Barrier count not equal to zero: $count")
else (true, "")
)
)
}
test("Should work when a single thread processes a single filter (10pts)") {
val temp = new Problem2(imageSize, 1, 1)
val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer()
for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i)
temp.imageLib.init(buf)
temp.imagePipeline(
Array(temp.imageLib.Filter.Outline),
Array(0, 1, 2, 3, 4)
)
assertEquals(
temp.imageLib.buffer1,
ArrayBuffer(
ArrayBuffer(0, 0, 0, 0, 0),
ArrayBuffer(1, 1, 1, 1, 1),
ArrayBuffer(2, 2, 2, 2, 2),
ArrayBuffer(3, 3, 3, 3, 3),
ArrayBuffer(4, 4, 4, 4, 4)
)
)
assertEquals(
temp.imageLib.buffer2,
ArrayBuffer(
ArrayBuffer(-2, -3, -3, -3, -2),
ArrayBuffer(3, 0, 0, 0, 3),
ArrayBuffer(6, 0, 0, 0, 6),
ArrayBuffer(9, 0, 0, 0, 9),
ArrayBuffer(22, 15, 15, 15, 22)
)
)
}
test("Should work when a single thread processes a 2 same filters (15pts)") {
val temp = new Problem2(imageSize, 1, 2)
val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer()
for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i)
temp.imageLib.init(buf)
temp.imagePipeline(
Array(temp.imageLib.Filter.Identity, temp.imageLib.Filter.Identity),
Array(0, 1, 2, 3, 4)
)
assertEquals(
temp.imageLib.buffer1,
ArrayBuffer(
ArrayBuffer(0, 0, 0, 0, 0),
ArrayBuffer(1, 1, 1, 1, 1),
ArrayBuffer(2, 2, 2, 2, 2),
ArrayBuffer(3, 3, 3, 3, 3),
ArrayBuffer(4, 4, 4, 4, 4)
)
)
assertEquals(
temp.imageLib.buffer2,
ArrayBuffer(
ArrayBuffer(0, 0, 0, 0, 0),
ArrayBuffer(1, 1, 1, 1, 1),
ArrayBuffer(2, 2, 2, 2, 2),
ArrayBuffer(3, 3, 3, 3, 3),
ArrayBuffer(4, 4, 4, 4, 4)
)
)
}
test(
"Should work when a single thread processes a 2 different filters (15pts)"
) {
val temp = new Problem2(imageSize, 1, 2)
val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer()
for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i)
temp.imageLib.init(buf)
temp.imagePipeline(
Array(temp.imageLib.Filter.Identity, temp.imageLib.Filter.Outline),
Array(0, 1, 2, 3, 4)
)
assertEquals(
temp.imageLib.buffer1,
ArrayBuffer(
ArrayBuffer(-2, -3, -3, -3, -2),
ArrayBuffer(3, 0, 0, 0, 3),
ArrayBuffer(6, 0, 0, 0, 6),
ArrayBuffer(9, 0, 0, 0, 9),
ArrayBuffer(22, 15, 15, 15, 22)
)
)
assertEquals(
temp.imageLib.buffer2,
ArrayBuffer(
ArrayBuffer(0, 0, 0, 0, 0),
ArrayBuffer(1, 1, 1, 1, 1),
ArrayBuffer(2, 2, 2, 2, 2),
ArrayBuffer(3, 3, 3, 3, 3),
ArrayBuffer(4, 4, 4, 4, 4)
)
)
}
test("Should work when barrier is called by two threads (25pts)") {
testManySchedules(
2,
sched =>
val temp = new Problem2(imageSize, 2, 1)
(
List(
() =>
temp.barrier(0).countDown()
temp.barrier(0).awaitZero()
,
() =>
temp.barrier(0).countDown()
temp.barrier(0).awaitZero()
),
results =>
if sched.notifyCount == 0 && sched.notifyAllCount == 0 then
(false, s"No notify call")
else if sched.waitCount == 0 then (false, s"No wait call")
else if temp.barrier(0).count != 0 then
val count = temp.barrier(0).count
(false, s"Barrier count not equal to zero: $count")
else (true, "")
)
)
}
test("Should work when barrier is called by multiple threads (25pts)") {
testManySchedules(
nThreads,
sched =>
val temp = new Problem2(imageSize, nThreads, 1)
(
(for i <- 0 until nThreads yield () =>
temp.barrier(0).countDown()
temp.barrier(0).awaitZero()
).toList,
results =>
if sched.notifyCount == 0 && sched.notifyAllCount == 0 then
(false, s"No notify call")
else if sched.waitCount == 0 then (false, s"No wait call")
else if temp.barrier(0).count != 0 then
val count = temp.barrier(0).count
(false, s"Barrier count not equal to zero: $count")
else (true, "")
)
)
}
test(
"Should work when a single thread processes a multiple same filters (25pts)"
) {
val temp = new Problem2(imageSize, 1, 3)
val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer()
for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i)
temp.imageLib.init(buf)
temp.imagePipeline(
Array(
temp.imageLib.Filter.Outline,
temp.imageLib.Filter.Outline,
temp.imageLib.Filter.Outline
),
Array(0, 1, 2, 3, 4)
)
assertEquals(
temp.imageLib.buffer2,
ArrayBuffer(
ArrayBuffer(-128, -173, -107, -173, -128),
ArrayBuffer(205, -2, 172, -2, 205),
ArrayBuffer(322, -128, 208, -128, 322),
ArrayBuffer(55, -854, -428, -854, 55),
ArrayBuffer(1180, 433, 751, 433, 1180)
)
)
assertEquals(
temp.imageLib.buffer1,
ArrayBuffer(
ArrayBuffer(-16, -22, -18, -22, -16),
ArrayBuffer(23, -1, 9, -1, 23),
ArrayBuffer(36, -18, 0, -18, 36),
ArrayBuffer(29, -67, -45, -67, 29),
ArrayBuffer(152, 74, 90, 74, 152)
)
)
}
test("Should work when a single thread processes multiple filters (25pts)") {
val temp = new Problem2(imageSize, 1, 3)
val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer()
for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i)
temp.imageLib.init(buf)
temp.imagePipeline(
Array(
temp.imageLib.Filter.Identity,
temp.imageLib.Filter.Outline,
temp.imageLib.Filter.Sharpen
),
Array(0, 1, 2, 3, 4)
)
assertEquals(
temp.imageLib.buffer1,
ArrayBuffer(
ArrayBuffer(-2, -3, -3, -3, -2),
ArrayBuffer(3, 0, 0, 0, 3),
ArrayBuffer(6, 0, 0, 0, 6),
ArrayBuffer(9, 0, 0, 0, 9),
ArrayBuffer(22, 15, 15, 15, 22)
)
)
assertEquals(
temp.imageLib.buffer2,
ArrayBuffer(
ArrayBuffer(-10, -10, -9, -10, -10),
ArrayBuffer(11, 0, 3, 0, 11),
ArrayBuffer(18, -6, 0, -6, 18),
ArrayBuffer(17, -24, -15, -24, 17),
ArrayBuffer(86, 38, 45, 38, 86)
)
)
}
test("Should work when multiple thread processes a single filter (25pts)") {
testManySchedules(
nThreads,
sched =>
val temp = new SchedulableProblem2(sched, imageSize, nThreads, 1)
(
(for i <- 0 until nThreads
yield () =>
temp.imagePipeline(
Array(temp.imageLib.Filter.Outline),
rowsForThread(i)
)).toList,
results =>
val expected_buffer1 = ArrayBuffer(
ArrayBuffer(1, 1, 1, 1, 1),
ArrayBuffer(1, 1, 1, 1, 1),
ArrayBuffer(1, 1, 1, 1, 1),
ArrayBuffer(1, 1, 1, 1, 1),
ArrayBuffer(1, 1, 1, 1, 1)
)
val expected_buffer2 = ArrayBuffer(
ArrayBuffer(5, 3, 3, 3, 5),
ArrayBuffer(3, 0, 0, 0, 3),
ArrayBuffer(3, 0, 0, 0, 3),
ArrayBuffer(3, 0, 0, 0, 3),
ArrayBuffer(5, 3, 3, 3, 5)
)
val res_buffer1 = temp.imageLib.buffer1
val res_buffer2 = temp.imageLib.buffer2
if res_buffer1 != expected_buffer1 then
(false, s"Buffer1 expected: $expected_buffer1 , got $res_buffer1")
else if res_buffer2 != expected_buffer2 then
(false, s"Buffer2 expected: $expected_buffer2 , got $res_buffer2")
else (true, "")
)
)
}
test("Should work when multiple thread processes two filters (25pts)") {
testManySchedules(
nThreads,
sched =>
val temp = new SchedulableProblem2(sched, imageSize, nThreads, 2)
(
(for i <- 0 until nThreads
yield () =>
temp.imagePipeline(
Array(temp.imageLib.Filter.Outline, temp.imageLib.Filter.Sharpen),
rowsForThread(i)
)).toList,
results =>
val expected_buffer1 = ArrayBuffer(
ArrayBuffer(19, 7, 9, 7, 19),
ArrayBuffer(7, -6, -3, -6, 7),
ArrayBuffer(9, -3, 0, -3, 9),
ArrayBuffer(7, -6, -3, -6, 7),
ArrayBuffer(19, 7, 9, 7, 19)
)
val expected_buffer2 = ArrayBuffer(
ArrayBuffer(5, 3, 3, 3, 5),
ArrayBuffer(3, 0, 0, 0, 3),
ArrayBuffer(3, 0, 0, 0, 3),
ArrayBuffer(3, 0, 0, 0, 3),
ArrayBuffer(5, 3, 3, 3, 5)
)
val res_buffer1 = temp.imageLib.buffer1
val res_buffer2 = temp.imageLib.buffer2
if res_buffer1 != expected_buffer1 then
(false, s"Buffer1 expected: $expected_buffer1 , got $res_buffer1")
else if res_buffer2 != expected_buffer2 then
(false, s"Buffer2 expected: $expected_buffer2 , got $res_buffer2")
else (true, "")
)
)
}
test(
"Should work when multiple thread processes multiple same filters (25pts)"
) {
testManySchedules(
nThreads,
sched =>
val temp = new SchedulableProblem2(sched, imageSize, nThreads, 4)
val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer()
for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i)
temp.imageLib.init(buf)
(
(for i <- 0 until nThreads
yield () =>
temp.imagePipeline(
Array(
temp.imageLib.Filter.Identity,
temp.imageLib.Filter.Identity,
temp.imageLib.Filter.Identity,
temp.imageLib.Filter.Identity
),
rowsForThread(i)
)).toList,
results =>
val expected_buffer1 = ArrayBuffer(
ArrayBuffer(0, 0, 0, 0, 0),
ArrayBuffer(1, 1, 1, 1, 1),
ArrayBuffer(2, 2, 2, 2, 2),
ArrayBuffer(3, 3, 3, 3, 3),
ArrayBuffer(4, 4, 4, 4, 4)
)
val expected_buffer2 = ArrayBuffer(
ArrayBuffer(0, 0, 0, 0, 0),
ArrayBuffer(1, 1, 1, 1, 1),
ArrayBuffer(2, 2, 2, 2, 2),
ArrayBuffer(3, 3, 3, 3, 3),
ArrayBuffer(4, 4, 4, 4, 4)
)
val res_buffer1 = temp.imageLib.buffer1
val res_buffer2 = temp.imageLib.buffer2
if res_buffer1 != expected_buffer1 then
(false, s"Buffer1 expected: $expected_buffer1 , got $res_buffer1")
else if res_buffer2 != expected_buffer2 then
(false, s"Buffer2 expected: $expected_buffer2 , got $res_buffer2")
else (true, "")
)
)
}
test(
"Should work when multiple thread processes multiple different filters (25pts)"
) {
testManySchedules(
nThreads,
sched =>
val temp = new SchedulableProblem2(sched, imageSize, nThreads, 4)
val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer()
for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i)
temp.imageLib.init(buf)
(
(for i <- 0 until nThreads
yield () =>
temp.imagePipeline(
Array(
temp.imageLib.Filter.Outline,
temp.imageLib.Filter.Sharpen,
temp.imageLib.Filter.Identity,
temp.imageLib.Filter.Sharpen
),
rowsForThread(i)
)).toList,
results =>
val expected_buffer1 = ArrayBuffer(
ArrayBuffer(-51, -31, -28, -31, -51),
ArrayBuffer(47, 2, 24, 2, 47),
ArrayBuffer(68, -24, 24, -24, 68),
ArrayBuffer(5, -154, -72, -154, 5),
ArrayBuffer(375, 83, 164, 83, 375)
)
val expected_buffer2 = ArrayBuffer(
ArrayBuffer(-10, -10, -9, -10, -10),
ArrayBuffer(11, 0, 3, 0, 11),
ArrayBuffer(18, -6, 0, -6, 18),
ArrayBuffer(17, -24, -15, -24, 17),
ArrayBuffer(86, 38, 45, 38, 86)
)
val res_buffer1 = temp.imageLib.buffer1
val res_buffer2 = temp.imageLib.buffer2
if res_buffer1 != expected_buffer1 then
(false, s"Buffer1 expected: $expected_buffer1 , got $res_buffer1")
else if res_buffer2 != expected_buffer2 then
(false, s"Buffer2 expected: $expected_buffer2 , got $res_buffer2")
else (true, "")
)
)
}
package concpar22final02.instrumentation
trait MockedMonitor extends Monitor:
def scheduler: Scheduler
// Can be overriden.
override def waitDefault() =
scheduler.log("wait")
scheduler.waitCount.incrementAndGet()
scheduler updateThreadState Wait(this, scheduler.threadLocks.tail)
override def synchronizedDefault[T](toExecute: => T): T =
scheduler.log("synchronized check")
val prevLocks = scheduler.threadLocks
scheduler updateThreadState Sync(
this,
prevLocks
) // If this belongs to prevLocks, should just continue.
scheduler.log("synchronized -> enter")
try toExecute
finally
scheduler updateThreadState Running(prevLocks)
scheduler.log("synchronized -> out")
override def notifyDefault() =
scheduler mapOtherStates { state =>
state match
case Wait(lockToAquire, locks) if lockToAquire == this =>
SyncUnique(this, state.locks)
case e => e
}
scheduler.notifyCount.incrementAndGet()
scheduler.log("notify")
override def notifyAllDefault() =
scheduler mapOtherStates { state =>
state match
case Wait(lockToAquire, locks) if lockToAquire == this =>
Sync(this, state.locks)
case SyncUnique(lockToAquire, locks) if lockToAquire == this =>
Sync(this, state.locks)
case e => e
}
scheduler.notifyAllCount.incrementAndGet()
scheduler.log("notifyAll")
abstract class ThreadState:
def locks: Seq[AnyRef]
trait CanContinueIfAcquiresLock extends ThreadState:
def lockToAquire: AnyRef
case object Start extends ThreadState:
def locks: Seq[AnyRef] = Seq.empty
case object End extends ThreadState:
def locks: Seq[AnyRef] = Seq.empty
case class Wait(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState
case class SyncUnique(lockToAquire: AnyRef, locks: Seq[AnyRef])
extends ThreadState
with CanContinueIfAcquiresLock
case class Sync(lockToAquire: AnyRef, locks: Seq[AnyRef])
extends ThreadState
with CanContinueIfAcquiresLock
case class Running(locks: Seq[AnyRef]) extends ThreadState
case class VariableReadWrite(locks: Seq[AnyRef]) extends ThreadState
package concpar22final02.instrumentation
import scala.annotation.tailrec
import concpar22final02.*
import scala.collection.mutable.ArrayBuffer
class SchedulableBarrier(val scheduler: Scheduler, size: Int)
extends Barrier(size)
with MockedMonitor
class SchedulableProblem2(
val scheduler: Scheduler,
imageSize: Int,
threadCount: Int,
numFilters: Int
) extends Problem2(imageSize, threadCount, numFilters):
self =>
override val barrier =
ArrayBuffer.fill(numFilters)(SchedulableBarrier(scheduler, threadCount))
package concpar22final02.instrumentation
import java.util.concurrent.*
import scala.concurrent.duration.*
import scala.collection.mutable.*
import Stats.*
import java.util.concurrent.atomic.AtomicInteger
sealed abstract class Result
case class RetVal(rets: List[Any]) extends Result
case class Except(msg: String, stackTrace: Array[StackTraceElement])
extends Result
case class Timeout(msg: String) extends Result
/** A class that maintains schedule and a set of thread ids. The schedules are
* advanced after an operation of a SchedulableBuffer is performed. Note: the
* real schedule that is executed may deviate from the input schedule due to
* the adjustments that had to be made for locks
*/
class Scheduler(sched: List[Int]):
val maxOps =
500 // a limit on the maximum number of operations the code is allowed to perform
var waitCount: AtomicInteger = new AtomicInteger(0)
var notifyCount: AtomicInteger = new AtomicInteger(0)
var notifyAllCount: AtomicInteger = new AtomicInteger(0)
private var schedule = sched
var numThreads = 0
private val realToFakeThreadId = Map[Long, Int]()
private val opLog =
ListBuffer[String]() // a mutable list (used for efficient concat)
private val threadStates = Map[Int, ThreadState]()
/** Runs a set of operations in parallel as per the schedule. Each operation
* may consist of many primitive operations like reads or writes to shared
* data structure each of which should be executed using the function `exec`.
* @timeout
* in milliseconds
* @return
* true - all threads completed on time, false -some tests timed out.
*/
def runInParallel(timeout: Long, ops: List[() => Any]): Result =
numThreads = ops.length
val threadRes = Array.fill(numThreads) { None: Any }
var exception: Option[(Throwable, Int)] = None
val syncObject = new Object()
var completed = new AtomicInteger(0)
// create threads
val threads = ops.zipWithIndex.map { case (op, i) =>
new Thread(
new Runnable():
def run(): Unit =
val fakeId = i + 1
setThreadId(fakeId)
try
updateThreadState(Start)
val res = op()
updateThreadState(End)
threadRes(i) = res
// notify the main thread if all threads have completed
if completed.incrementAndGet() == ops.length then
syncObject.synchronized { syncObject.notifyAll() }
catch
case e: Throwable
if exception != None => // do nothing here and silently fail
case e: Throwable =>
log(s"throw ${e.toString}")
exception = Some((e, fakeId))
syncObject.synchronized { syncObject.notifyAll() }
// println(s"$fakeId: ${e.toString}")
// Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads)
)
}
// start all threads
threads.foreach(_.start())
// wait for all threads to complete, or for an exception to be thrown, or for the time out to expire
var remTime = timeout
syncObject.synchronized {
timed {
if completed.get() != ops.length then syncObject.wait(timeout)
} { time =>
remTime -= time
}
}
if exception.isDefined then
Except(
s"Thread ${exception.get._2} crashed on the following schedule: \n" + opLog.mkString(
"\n"
),
exception.get._1.getStackTrace
)
else if remTime <= 1
then // timeout ? using 1 instead of zero to allow for some errors
Timeout(opLog.mkString("\n"))
else
// every thing executed normally
RetVal(threadRes.toList)
// Updates the state of the current thread
def updateThreadState(state: ThreadState): Unit =
val tid = threadId
synchronized {
threadStates(tid) = state
}
state match
case Sync(lockToAquire, locks) =>
if locks.indexOf(lockToAquire) < 0 then waitForTurn
else
// Re-aqcuiring the same lock
updateThreadState(Running(lockToAquire +: locks))
case Start => waitStart()
case End => removeFromSchedule(tid)
case Running(_) =>
case _ => waitForTurn // Wait, SyncUnique, VariableReadWrite
def waitStart(): Unit =
// while (threadStates.size < numThreads) {
// Thread.sleep(1)
// }
synchronized {
if threadStates.size < numThreads then wait()
else notifyAll()
}
def threadLocks =
synchronized {
threadStates(threadId).locks
}
def threadState =
synchronized {
threadStates(threadId)
}
def mapOtherStates(f: ThreadState => ThreadState) =
val exception = threadId
synchronized {
for k <- threadStates.keys if k != exception do
threadStates(k) = f(threadStates(k))
}
def log(str: String) =
if (realToFakeThreadId contains Thread.currentThread().getId()) then
val space = (" " * ((threadId - 1) * 2))
val s =
space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + " ")
opLog += s
/** Executes a read or write operation to a global data structure as per the
* given schedule
* @param msg
* a message corresponding to the operation that will be logged
*/
def exec[T](primop: => T)(
msg: => String,
postMsg: => Option[T => String] = None
): T =
if !(realToFakeThreadId contains Thread.currentThread().getId()) then primop
else
updateThreadState(VariableReadWrite(threadLocks))
val m = msg
if m != "" then log(m)
if opLog.size > maxOps then
throw new Exception(
s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!"
)
val res = primop
postMsg match
case Some(m) => log(m(res))
case None =>
res
private def setThreadId(fakeId: Int) = synchronized {
realToFakeThreadId(Thread.currentThread.getId) = fakeId
}
def threadId =
try realToFakeThreadId(Thread.currentThread().getId())
catch
case e: NoSuchElementException =>
throw new Exception(
"You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!"
)
private def isTurn(tid: Int) = synchronized {
(!schedule.isEmpty && schedule.head != tid)
}
def canProceed(): Boolean =
val tid = threadId
canContinue match
case Some((i, state)) if i == tid =>
// println(s"$tid: Runs ! Was in state $state")
canContinue = None
state match
case Sync(lockToAquire, locks) =>
updateThreadState(Running(lockToAquire +: locks))
case SyncUnique(lockToAquire, locks) =>
mapOtherStates {
_ match
case SyncUnique(lockToAquire2, locks2)
if lockToAquire2 == lockToAquire =>
Wait(lockToAquire2, locks2)
case e => e
}
updateThreadState(Running(lockToAquire +: locks))
case VariableReadWrite(locks) => updateThreadState(Running(locks))
true
case Some((i, state)) =>
// println(s"$tid: not my turn but $i !")
false
case None =>
false
var threadPreference =
0 // In the case the schedule is over, which thread should have the preference to execute.
/** returns true if the thread can continue to execute, and false otherwise */
def decide(): Option[(Int, ThreadState)] =
if !threadStates.isEmpty
then // The last thread who enters the decision loop takes the decision.
// println(s"$threadId: I'm taking a decision")
if threadStates.values.forall {
case e: Wait => true
case _ => false
}
then
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
throw new Exception(
s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them."
)
else
// Threads can be in Wait, Sync, SyncUnique, and VariableReadWrite mode.
// Let's determine which ones can continue.
val notFree = threadStates.collect { case (id, state) =>
state.locks
}.flatten.toSet
val threadsNotBlocked = threadStates.toSeq.filter {
case (id, v: VariableReadWrite) => true
case (id, v: CanContinueIfAcquiresLock) =>
!notFree(v.lockToAquire) || (v.locks contains v.lockToAquire)
case _ => false
}
if threadsNotBlocked.isEmpty then
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
val whoHasLock = threadStates.toSeq.flatMap { case (id, state) =>
state.locks.map(lock => (lock, id))
}.toMap
val reason = threadStates
.collect {
case (id, state: CanContinueIfAcquiresLock)
if !notFree(state.lockToAquire) =>
s"Thread $id is waiting on lock ${state.lockToAquire} held by thread ${whoHasLock(state.lockToAquire)}"
}
.mkString("\n")
throw new Exception(
s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason"
)
else if threadsNotBlocked.size == 1
then // Do not consume the schedule if only one thread can execute.
Some(threadsNotBlocked(0))
else
val next =
schedule.indexWhere(t =>
threadsNotBlocked.exists { case (id, state) => id == t }
)
if next != -1 then
// println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}")
val chosenOne =
schedule(next) // TODO: Make schedule a mutable list.
schedule = schedule.take(next) ++ schedule.drop(next + 1)
Some((chosenOne, threadStates(chosenOne)))
else
threadPreference = (threadPreference + 1) % threadsNotBlocked.size
val chosenOne =
threadsNotBlocked(threadPreference) // Maybe another strategy
Some(chosenOne)
// threadsNotBlocked.indexOf(threadId) >= 0
/*
val tnb = threadsNotBlocked.map(_._1).mkString(",")
val s = if (schedule.isEmpty) "empty" else schedule.mkString(",")
val only = if (schedule.isEmpty) "" else " only"
throw new Exception(s"The schedule is $s but$only threads ${tnb} can continue")*/
else canContinue
/** This will be called before a schedulable operation begins. This should not
* use synchronized
*/
var numThreadsWaiting = new AtomicInteger(0)
// var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice.
var canContinue: Option[(Int, ThreadState)] =
None // The result of the decision thread Id of the thread authorized to continue.
private def waitForTurn =
synchronized {
if numThreadsWaiting.incrementAndGet() == threadStates.size then
canContinue = decide()
notifyAll()
// waitingForDecision(threadId) = Some(numThreadsWaiting)
// println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}")
while !canProceed() do wait()
}
numThreadsWaiting.decrementAndGet()
/** To be invoked when a thread is about to complete
*/
private def removeFromSchedule(fakeid: Int) = synchronized {
// println(s"$fakeid: I'm taking a decision because I finished")
schedule = schedule.filterNot(_ == fakeid)
threadStates -= fakeid
if numThreadsWaiting.get() == threadStates.size then
canContinue = decide()
notifyAll()
}
def getOperationLog() = opLog
package concpar22final02.instrumentation
import scala.util.Random
import scala.collection.mutable.{Map as MutableMap}
import Stats.*
object TestHelper:
val noOfSchedules = 10000 // set this to 100k during deployment
val readWritesPerThread =
20 // maximum number of read/writes possible in one thread
val contextSwitchBound = 10
val testTimeout = 240 // the total time out for a test in seconds
val schedTimeout =
15 // the total time out for execution of a schedule in secs
// Helpers
/*def testManySchedules(op1: => Any): Unit = testManySchedules(List(() => op1))
def testManySchedules(op1: => Any, op2: => Any): Unit = testManySchedules(List(() => op1, () => op2))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any, op4: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3, () => op4))*/
def testSequential[T](ops: Scheduler => Any)(assertions: T => (
Boolean,
String
)) =
testManySchedules(
1,
(sched: Scheduler) =>
(
List(() => ops(sched)),
(res: List[Any]) => assertions(res.head.asInstanceOf[T])
)
)
/** @numThreads
* number of threads
* @ops
* operations to be executed, one per thread
* @assertion
* as condition that will executed after all threads have completed
* (without exceptions) the arguments are the results of the threads
*/
def testManySchedules(
numThreads: Int,
ops: Scheduler => (
List[() => Any], // Threads
List[Any] => (Boolean, String)
) // Assertion
) =
var timeout = testTimeout * 1000L
val threadIds = (1 to numThreads)
// (1 to scheduleLength).flatMap(_ => threadIds).toList.permutations.take(noOfSchedules).foreach {
val schedules = (new ScheduleGenerator(numThreads)).schedules()
var schedsExplored = 0
schedules.takeWhile(_ =>
schedsExplored <= noOfSchedules && timeout > 0
).foreach {
// case _ if timeout <= 0 => // break
case schedule =>
schedsExplored += 1
val schedr = new Scheduler(schedule)
// println("Exploring Sched: "+schedule)
val (threadOps, assertion) = ops(schedr)
if threadOps.size != numThreads then
throw new IllegalStateException(
s"Number of threads: $numThreads, do not match operations of threads: $threadOps"
)
timed { schedr.runInParallel(schedTimeout * 1000, threadOps) } { t =>
timeout -= t
} match
case Timeout(msg) =>
throw new java.lang.AssertionError(
"assertion failed\n" + "The schedule took too long to complete. A possible deadlock! \n" + msg
)
case Except(msg, stkTrace) =>
val traceStr =
"Thread Stack trace: \n" + stkTrace.map(
" at " + _.toString
).mkString("\n")
throw new java.lang.AssertionError(
"assertion failed\n" + msg + "\n" + traceStr
)
case RetVal(threadRes) =>
// check the assertion
val (success, custom_msg) = assertion(threadRes)
if !success then
val msg =
"The following schedule resulted in wrong results: \n" + custom_msg + "\n" + schedr
.getOperationLog()
.mkString("\n")
throw new java.lang.AssertionError("Assertion failed: " + msg)
}
if timeout <= 0 then
throw new java.lang.AssertionError(
"Test took too long to complete! Cannot check all schedules as your code is too slow!"
)
/** A schedule generator that is based on the context bound
*/
class ScheduleGenerator(numThreads: Int):
val scheduleLength = readWritesPerThread * numThreads
val rands =
(1 to scheduleLength).map(i =>
new Random(0xcafe * i)
) // random numbers for choosing a thread at each position
def schedules(): LazyList[List[Int]] =
var contextSwitches = 0
var contexts =
List[Int]() // a stack of thread ids in the order of context-switches
val remainingOps = MutableMap[Int, Int]()
remainingOps ++= (1 to numThreads).map(i =>
(i, readWritesPerThread)
) // num ops remaining in each thread
val liveThreads = (1 to numThreads).toSeq.toBuffer
/** Updates remainingOps and liveThreads once a thread is chosen for a
* position in the schedule
*/
def updateState(tid: Int): Unit =
val remOps = remainingOps(tid)
if remOps == 0 then liveThreads -= tid
else remainingOps += (tid -> (remOps - 1))
val schedule = rands.foldLeft(List[Int]()) {
case (acc, r) if contextSwitches < contextSwitchBound =>
val tid = liveThreads(r.nextInt(liveThreads.size))
contexts match
case prev :: tail
if prev != tid => // we have a new context switch here
contexts +:= tid
contextSwitches += 1
case prev :: tail =>
case _ => // init case
contexts +:= tid
updateState(tid)
acc :+ tid
case (
acc,
_
) => // here context-bound has been reached so complete the schedule without any more context switches
if !contexts.isEmpty then
contexts = contexts.dropWhile(remainingOps(_) == 0)
val tid = contexts match
case top :: tail => top
case _ =>
liveThreads(
0
) // here, there has to be threads that have not even started
updateState(tid)
acc :+ tid
}
schedule #:: schedules()
package concpar22final02.instrumentation
import scala.concurrent.*
import scala.concurrent.duration.*
import scala.concurrent.ExecutionContext.Implicits.global
object TestUtils:
def failsOrTimesOut[T](action: => T): Boolean =
val asyncAction = Future {
action
}
try Await.result(asyncAction, 2000.millisecond)
catch case _: Throwable => return true
return false
package concpar22final03
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.Random
trait EconomicsTest extends Economics:
val ownedCards: collection.mutable.Set[Card] = collection.mutable.Set[Card]()
def owned(c: Card): Boolean = ownedCards(c)
def isMine(c: Card): Boolean = ownedCards(c)
override def valueOf(cardName: String): Int = List(1, cardName.length).max
/** This is a container for an exact amount of money that can be hold, spent,
* or put in the bank
*/
val moneyInMoneyBag = collection.mutable.Map[MoneyBag, Int]()
def moneyIn(m: MoneyBag): Int = moneyInMoneyBag.getOrElse(m, 0)
/** If you sell a card, at some point in the future you will get some money
* (in a bag).
*/
def sellCard(c: Card): Future[MoneyBag] =
Future {
Thread.sleep(sellWaitTime())
synchronized(
if owned(c) then
ownedCards.remove(c)
getMoneyBag(valueOf(c.name))
else
throw Exception(
"This card doesn't belong to you or has already been sold, you can't sell it."
)
)
}
/** You can buy any "Scala: The Programming" card by providing a bag of money
* with the appropriate amount and waiting for the transaction to take place
*/
def buyCard(bag: MoneyBag, name: String): Future[Card] =
Future {
Thread.sleep(buyWaitTime())
synchronized {
if moneyIn(bag) != valueOf(name) then
throw Exception(
"You didn't provide the exact amount of money necessary to buy this card."
)
else moneyInMoneyBag.update(bag, 0)
getCard(name)
}
}
/** This simple bank account hold money for you. You can bring a money bag to
* increase your account, or withdraw a money bag of any size not greater
* than your account's balance.
*/
private var balance_ = initialBalance()
def balance: Int = balance_
def withdraw(amount: Int): Future[MoneyBag] =
Future {
Thread.sleep(withdrawWaitTime())
synchronized(
if balance_ >= amount then
balance_ -= amount
getMoneyBag(amount)
else
throw new Exception(
"You try to withdraw more money than you have on your account"
)
)
}
def deposit(bag: MoneyBag): Future[Unit] =
Future {
Thread.sleep(depositWaitTime())
synchronized {
if moneyInMoneyBag(bag) == 0 then
throw new Exception("You are depositing en empty bag!")
else
balance_ += moneyIn(bag)
moneyInMoneyBag.update(bag, 0)
}
}
def sellWaitTime(): Int
def buyWaitTime(): Int
def withdrawWaitTime(): Int
def depositWaitTime(): Int
def initialBalance(): Int
def getMoneyBag(i: Int) =
val m = MoneyBag()
synchronized(moneyInMoneyBag.update(m, i))
m
def getCard(n: String): Card =
val c = Card(n)
synchronized(ownedCards.update(c, true))
c
package concpar22final03
import scala.concurrent.duration.*
import scala.concurrent.{Await, Future}
import scala.util.{Try, Success, Failure}
import scala.concurrent.ExecutionContext.Implicits.global
class Problem3Suite extends munit.FunSuite:
trait Prob3Test extends Problem3:
override val economics: EconomicsTest
class Test1 extends Prob3Test:
override val economics: EconomicsTest = new EconomicsTest:
override def sellWaitTime() = 10
override def buyWaitTime() = 20
override def depositWaitTime() = 30
override def withdrawWaitTime() = 40
override def initialBalance() = 0
class Test2 extends Prob3Test:
override val economics: EconomicsTest = new EconomicsTest:
override def sellWaitTime() = 100
override def buyWaitTime() = 5
override def depositWaitTime() = 50
override def withdrawWaitTime() = 5
override def initialBalance() = 0
class Test3 extends Prob3Test:
override val economics: EconomicsTest = new EconomicsTest:
val rgen = new scala.util.Random(666)
override def sellWaitTime() = rgen.nextInt(100)
override def buyWaitTime() = rgen.nextInt(100)
override def depositWaitTime() = rgen.nextInt(100)
override def withdrawWaitTime() = rgen.nextInt(100)
override def initialBalance() = 0
class Test4 extends Prob3Test:
var counter = 5
def next(): Int =
counter = counter + 5 % 119
counter
override val economics: EconomicsTest = new EconomicsTest:
override def sellWaitTime() = next()
override def buyWaitTime() = next()
override def depositWaitTime() = next()
override def withdrawWaitTime() = next()
override def initialBalance() = next()
def testCases = List(new Test1, new Test2)
def unevenTestCases = List(new Test3, new Test4)
def tot(cards: List[String]): Int =
cards.map[Int]((n: String) => n.length).sum
def testOk(
t: Prob3Test,
money: Int,
sold: List[String],
wanted: List[String]
): Unit =
import t.*
import economics.*
val f = orderDeck(getMoneyBag(money), sold.map(getCard), wanted)
val r = Await.ready(f, 3.seconds).value.get
assert(r.isSuccess)
r match
case Success(d) =>
assertEquals(d.map(_.name).sorted, wanted.sorted)
assertEquals(d.length, wanted.length)
assertEquals(isMine(d.head), true)
case Failure(e) => ()
def testFailure(
t: Prob3Test,
money: Int,
sold: List[String],
wanted: List[String]
): Unit =
import t.*
import economics.*
val f = orderDeck(getMoneyBag(money), sold.map(getCard), wanted)
val r = Await.ready(f, 3.seconds).value.get
assert(r.isFailure)
r match
case Failure(e: NotEnoughMoneyException) => ()
case _ => fail(
"Should have thrown a NotEnoughMoneyException exception, but did not"
)
// --- Without sold cards ---
test(
"Should work correctly when a single card is asked with enough money (no card sold) (20pts)"
) {
testCases.foreach(t => testOk(t, 7, Nil, List("Tefeiri")))
}
test(
"Should work correctly when a single card is asked with enough money (no card sold, uneven waiting time) (10pts)"
) {
unevenTestCases.foreach(t => testOk(t, 7, Nil, List("Tefeiri")))
}
test(
"Should work correctly when multiple cards are asked with enough money (no card sold) (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
testCases.foreach(t => testOk(t, tot(cards), Nil, cards))
}
test(
"Should work correctly when multiple cards are asked with enough money (no card sold, uneven waiting time) (10pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
unevenTestCases.foreach(t => testOk(t, tot(cards), Nil, cards))
}
test(
"Should work correctly when asked duplicates of cards, with enough money (no card sold) (20pts)"
) {
val cards = List("aaaa", "aaaa", "aaaa", "dd", "dd", "dd", "dd")
testCases.foreach(t => testOk(t, tot(cards), Nil, cards))
}
test(
"Should work correctly when asked duplicates of cards, with enough money (no card sold, uneven waiting time) (10pts)"
) {
val cards = List("aaaa", "aaaa", "aaaa", "dd", "dd", "dd", "dd")
unevenTestCases.foreach(t => testOk(t, tot(cards), Nil, cards))
}
// --- With sold cards ---
test(
"Should work correctly when a single card is bought and a single of the same price is sold (20pts)"
) {
testCases.foreach(t => testOk(t, 0, List("Chandra"), List("Tefeiri")))
}
test(
"Should work correctly when a single card is bought and a single of the same price is sold (uneven waiting time) (10pts)"
) {
unevenTestCases.foreach(t => testOk(t, 0, List("Chandra"), List("Tefeiri")))
}
test(
"Should work correctly when multiple cards are asked and multiple of matching values are sold (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("1111111", "2", "3333", "44", "55555", "666", "7777")
testCases.foreach(t => testOk(t, 0, sold, cards))
}
test(
"Should work correctly when multiple cards are asked and multiple of matching values are sold (uneven waiting time) (10pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("1111111", "2", "3333", "44", "55555", "666", "7777")
unevenTestCases.foreach(t => testOk(t, 0, sold, cards))
}
test(
"Should work correctly when multiple cards are asked and multiple of the same total value are sold (20pts)"
) {
val cards2 = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold2 = List("111111111", "22", "3", "44", "555555", "666", "777")
assert(tot(sold2) == tot(cards2))
testCases.foreach(t => testOk(t, 0, sold2, cards2))
}
test(
"Should work correctly when multiple cards are asked and multiple of the same total value are sold (uneven waiting time) (10pts)"
) {
val cards2 = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold2 = List("111111111", "22", "3", "44", "555555", "666", "777")
assert(tot(sold2) == tot(cards2))
unevenTestCases.foreach(t => testOk(t, 0, sold2, cards2))
}
test(
"Should work correctly when given money and sold cards are sufficient for the wanted cards (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("11111", "2", "33", "44", "5555", "666", "777")
val bagMoney = tot(cards) - tot(sold)
testCases.foreach(t => testOk(t, bagMoney, sold, cards))
}
test(
"Should work correctly when given money and sold cards are sufficient for the wanted cards (uneven waiting time) (10pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("11111", "2", "33", "44", "5555", "666", "777")
val bagMoney = tot(cards) - tot(sold)
unevenTestCases.foreach(t => testOk(t, bagMoney, sold, cards))
}
// --- Failures ---
test(
"Should return a failure when too little money is provided (no card sold) (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
testCases.foreach(t => testFailure(t, tot(cards) - 1, Nil, cards))
testCases.foreach(t => testFailure(t, tot(cards) - 50, Nil, cards))
}
test(
"Should return a failure when too little money or sold cards are provided (20pts)"
) {
val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg")
val sold = List("11111", "2", "33", "44", "5555", "666", "777")
val bagMoney = tot(cards) - tot(sold)
testCases.foreach(t => testFailure(t, bagMoney - 2, sold, cards))
}
package concpar22final04
import akka.actor.*
import akka.testkit.*
import akka.pattern.*
import akka.util.Timeout
import concurrent.duration.*
import User.Protocol.*
import User.Responses.*
import SongsStore.Protocol.*
import SongsStore.Responses.*
import scala.util.{Try, Success, Failure}
import com.typesafe.config.ConfigFactory
import java.util.Date
import scala.util.Random
class Problem4Suite extends munit.FunSuite:
//---
Random.setSeed(42178263)
/*+++
Random.setSeed(42)
++*/
test("after receiving GetInfo, should answer with Info (20pts)") {
new MyTestKit:
def tests() =
ada ! GetInfo
expectMsg(Info("1", "Ada"))
}
test(
"after receiving GetHomepageData, should answer with the correct HomepageData when there is no liked songs and no activity items (30pts)"
) {
new MyTestKit:
def tests() =
ada ! GetHomepageData
expectMsg(HomepageData(List(), List()))
}
test(
"after receiving Like(1), should add 1 to the list of liked songs (20pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(1), List()))
}
test(
"after receiving Like(1) and then Like(2), the list of liked songs should start with List(2, 1) (20pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
expectNoMessage()
ada ! Like(2)
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(2, 1), List()))
}
test(
"after receiving Like(1) and then Like(1), song 1 should be in the list of liked songs only once (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
expectNoMessage()
ada ! Like(1)
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(1), List()))
}
test(
"after receiving Like(1), Like(2) and then Like(1), the list of liked songs should start with List(2, 1) (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
expectNoMessage()
ada ! Like(2)
expectNoMessage()
ada ! Like(1)
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(2, 1), List()))
}
test(
"after receiving Like(1), Unlike(1) and then Unlike(1), the list of liked songs should not contain song 1 (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
expectNoMessage()
ada ! Like(2)
expectNoMessage()
ada ! Like(1)
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(2, 1), List()))
}
test(
"after receiving Subscribe(aUser) and then Play(5), should send AddActivity(Activity(\"1\", 5)) to aUser (20pts)"
) {
new MyTestKit:
def tests() =
ada ! Subscribe(self)
expectNoMessage()
ada ! Play(5)
expectMsg(AddActivity(Activity("1", "Ada", 5)))
}
test(
"after receiving Subscribe(aUser), Subscribe(bUser) and then Play(5), should send AddActivity(Activity(\"1\", 5)) to aUser (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Subscribe(self)
expectNoMessage()
val donald = new TestProbe(system)
ada ! Subscribe(donald.ref)
expectNoMessage()
ada ! Play(5)
expectMsg(AddActivity(Activity("1", "Ada", 5)))
donald.expectMsg(AddActivity(Activity("1", "Ada", 5)))
}
test(
"after receiving Subscribe(aUser), Subscribe(aUser) and then Play(5), should send AddActivity(Activity(\"1\", 5)) to aUser only once (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Subscribe(self)
expectNoMessage()
ada ! Subscribe(self)
expectNoMessage()
ada ! Play(5)
expectMsg(AddActivity(Activity("1", "Ada", 5)))
expectNoMessage()
}
test(
"after receiving Subscribe(aUser), Unsubscribe(aUser) and then Play(5), should not send AddActivity(Activity(\"1\", 5)) to aUser (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Subscribe(self)
expectNoMessage()
ada ! Play(5)
expectMsg(AddActivity(Activity("1", "Ada", 5)))
ada ! Unsubscribe(self)
expectNoMessage()
ada ! Play(5)
expectNoMessage()
}
test(
"after receiving AddActivity(Activity(\"1\", 5)), Activity(\"1\", 5) should be in the activity feed (10pts)"
) {
new MyTestKit:
def tests() =
ada ! AddActivity(Activity("0", "Self", 5))
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(), List(Activity("0", "Self", 5))))
}
test(
"after receiving AddActivity(Activity(\"1\", 5)) and AddActivity(Activity(\"1\", 6)), Activity(\"1\", 6) should be in the activity feed and Activity(\"1\", 5) should not (10pts)"
) {
new MyTestKit:
def tests() =
ada ! AddActivity(Activity("0", "Self", 5))
expectNoMessage()
ada ! AddActivity(Activity("0", "Self", 6))
expectNoMessage()
ada ! GetHomepageData
expectMsg(HomepageData(List(), List(Activity("0", "Self", 6))))
}
test(
"after receiving GetHomepageText, should answer with a result containing \"Howdy $name!\" where $name is the user's name (10pts)"
) {
new MyTestKit:
def tests() =
val name = Random.alphanumeric.take(5).mkString
val randomUser =
system.actorOf(Props(classOf[User], "5", name, songsStore), "user-5")
randomUser ! GetHomepageText
expectMsgClass(classOf[HomepageText]).result.contains(f"Howdy $name!")
}
test(
"after receiving GetHomepageText, should answer with the correct names of liked songs (1) (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(8)
expectNoMessage()
ada ! Like(3)
expectNoMessage()
ada ! Like(2)
expectNoMessage()
ada ! GetHomepageText
assertEquals(
expectMsgClass(classOf[HomepageText]).result.linesIterator
.drop(2)
.take(4)
.mkString("\n")
.trim,
"""
|Liked Songs:
|* Sunny by Boney M.
|* J'irai où tu iras by Céline Dion & Jean-Jacques Goldman
|* Hold the line by TOTO
""".stripMargin.trim
)
}
test(
"after receiving GetHomepageText, should answer with the correct names of liked songs (2) (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(9)
expectNoMessage()
ada ! Like(7)
expectNoMessage()
ada ! GetHomepageText
assertEquals(
expectMsgClass(classOf[HomepageText]).result.linesIterator
.drop(2)
.take(3)
.mkString("\n")
.trim,
"""
|Liked Songs:
|* Straight Edge by Minor Threat
|* Anarchy in the UK by Sex Pistols
""".stripMargin.trim
)
}
test(
"after receiving GetHomepageText, should answer with the correct activity feed (1) (10pts)"
) {
new MyTestKit:
def tests() =
bob ! Subscribe(ada)
expectNoMessage()
carol ! Subscribe(ada)
expectNoMessage()
donald ! Subscribe(ada)
expectNoMessage()
bob ! Play(3)
expectNoMessage()
carol ! Play(8)
expectNoMessage()
ada ! GetHomepageText
assertEquals(
expectMsgClass(classOf[HomepageText]).result.linesIterator
.drop(4)
.take(10)
.mkString("\n")
.trim,
"""
|Activity Feed:
|* Carol is listening to Hold the line by TOTO
|* Bob is listening to J'irai où tu iras by Céline Dion & Jean-Jacques Goldman
""".stripMargin.trim
)
}
test(
"after receiving GetHomepageText, should answer with the correct activity feed (2) (10pts)"
) {
new MyTestKit:
def tests() =
bob ! Subscribe(ada)
expectNoMessage()
carol ! Subscribe(ada)
expectNoMessage()
donald ! Subscribe(ada)
expectNoMessage()
bob ! Play(9)
expectNoMessage()
carol ! Play(10)
expectNoMessage()
donald ! Play(6)
expectNoMessage()
bob ! Play(7)
expectNoMessage()
ada ! GetHomepageText
assertEquals(
expectMsgClass(classOf[HomepageText]).result.linesIterator
.drop(4)
.take(10)
.mkString("\n")
.trim,
"""
|Activity Feed:
|* Bob is listening to Straight Edge by Minor Threat
|* Donald is listening to Désenchantée by Mylène Farmer
|* Carol is listening to Breakfast in America by Supertramp
""".stripMargin.trim
)
}
test(
"after receiving GetHomepageText, should answer with the correct text (full test) (10pts)"
) {
new MyTestKit:
def tests() =
ada ! Like(1)
expectNoMessage()
ada ! Like(2)
expectNoMessage()
bob ! Subscribe(ada)
expectNoMessage()
carol ! Subscribe(ada)
expectNoMessage()
donald ! Subscribe(ada)
expectNoMessage()
donald ! Play(3)
expectNoMessage()
bob ! Play(4)
expectNoMessage()
carol ! Play(5)
expectNoMessage()
ada ! GetHomepageText
assertEquals(
expectMsgClass(classOf[HomepageText]).result.linesIterator
.mkString("\n")
.trim,
"""
|Howdy Ada!
|
|Liked Songs:
|* Sunny by Boney M.
|* High Hopes by Pink Floyd
|
|Activity Feed:
|* Carol is listening to Strobe by deadmau5
|* Bob is listening to Ce monde est cruel by Vald
|* Donald is listening to J'irai où tu iras by Céline Dion & Jean-Jacques Goldman
""".stripMargin.trim
)
}
abstract class MyTestKit
extends TestKit(ActorSystem("TestSystem"))
with ImplicitSender:
val songsStore = system.actorOf(Props(MockSongsStore()), "songsStore")
def makeAda() =
system.actorOf(Props(classOf[User], "1", "Ada", songsStore), "user-1")
val ada = makeAda()
val bob =
system.actorOf(Props(classOf[User], "2", "Bob", songsStore), "user-2")
val carol =
system.actorOf(Props(classOf[User], "3", "Carol", songsStore), "user-3")
val donald =
system.actorOf(Props(classOf[User], "4", "Donald", songsStore), "user-4")
def tests(): Unit
try tests()
finally shutdown(system)
......@@ -7,10 +7,13 @@ trait MockedMonitor extends Monitor:
override def waitDefault() =
scheduler.log("wait")
scheduler updateThreadState Wait(this, scheduler.threadLocks.tail)
override def synchronizedDefault[T](toExecute: =>T): T =
override def synchronizedDefault[T](toExecute: => T): T =
scheduler.log("synchronized check")
val prevLocks = scheduler.threadLocks
scheduler updateThreadState Sync(this, prevLocks) // If this belongs to prevLocks, should just continue.
scheduler updateThreadState Sync(
this,
prevLocks
) // If this belongs to prevLocks, should just continue.
scheduler.log("synchronized -> enter")
try
toExecute
......@@ -19,17 +22,22 @@ trait MockedMonitor extends Monitor:
scheduler.log("synchronized -> out")
override def notifyDefault() =
scheduler mapOtherStates {
state => state match
case Wait(lockToAquire, locks) if lockToAquire == this => SyncUnique(this, state.locks)
case e => e
state =>
state match
case Wait(lockToAquire, locks) if lockToAquire == this =>
SyncUnique(this, state.locks)
case e => e
}
scheduler.log("notify")
override def notifyAllDefault() =
scheduler mapOtherStates {
state => state match
case Wait(lockToAquire, locks) if lockToAquire == this => Sync(this, state.locks)
case SyncUnique(lockToAquire, locks) if lockToAquire == this => Sync(this, state.locks)
case e => e
state =>
state match
case Wait(lockToAquire, locks) if lockToAquire == this =>
Sync(this, state.locks)
case SyncUnique(lockToAquire, locks) if lockToAquire == this =>
Sync(this, state.locks)
case e => e
}
scheduler.log("notifyAll")
......@@ -37,10 +45,14 @@ abstract class ThreadState:
def locks: Seq[AnyRef]
trait CanContinueIfAcquiresLock extends ThreadState:
def lockToAquire: AnyRef
case object Start extends ThreadState { def locks: Seq[AnyRef] = Seq.empty }
case object End extends ThreadState { def locks: Seq[AnyRef] = Seq.empty }
case object Start extends ThreadState:
def locks: Seq[AnyRef] = Seq.empty
case object End extends ThreadState:
def locks: Seq[AnyRef] = Seq.empty
case class Wait(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState
case class SyncUnique(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState with CanContinueIfAcquiresLock
case class Sync(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState with CanContinueIfAcquiresLock
case class SyncUnique(lockToAquire: AnyRef, locks: Seq[AnyRef])
extends ThreadState with CanContinueIfAcquiresLock
case class Sync(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState
with CanContinueIfAcquiresLock
case class Running(locks: Seq[AnyRef]) extends ThreadState
case class VariableReadWrite(locks: Seq[AnyRef]) extends ThreadState
package instrumentation
import java.util.concurrent._;
import scala.concurrent.duration._
import scala.collection.mutable._
import Stats._
import java.util.concurrent.*;
import scala.concurrent.duration.*
import scala.collection.mutable.*
import Stats.*
import java.util.concurrent.atomic.AtomicInteger
sealed abstract class Result
case class RetVal(rets: List[Any]) extends Result
case class Except(msg: String, stackTrace: Array[StackTraceElement]) extends Result
case class Except(msg: String, stackTrace: Array[StackTraceElement])
extends Result
case class Timeout(msg: String) extends Result
/**
* A class that maintains schedule and a set of thread ids.
* The schedules are advanced after an operation of a SchedulableBuffer is performed.
* Note: the real schedule that is executed may deviate from the input schedule
* due to the adjustments that had to be made for locks
*/
/** A class that maintains schedule and a set of thread ids. The schedules are
* advanced after an operation of a SchedulableBuffer is performed. Note: the
* real schedule that is executed may deviate from the input schedule due to
* the adjustments that had to be made for locks
*/
class Scheduler(sched: List[Int]):
val maxOps = 500 // a limit on the maximum number of operations the code is allowed to perform
val maxOps =
500 // a limit on the maximum number of operations the code is allowed to perform
private var schedule = sched
private var numThreads = 0
private val realToFakeThreadId = Map[Long, Int]()
private val opLog = ListBuffer[String]() // a mutable list (used for efficient concat)
private val opLog =
ListBuffer[String]() // a mutable list (used for efficient concat)
private val threadStates = Map[Int, ThreadState]()
/**
* Runs a set of operations in parallel as per the schedule.
* Each operation may consist of many primitive operations like reads or writes
* to shared data structure each of which should be executed using the function `exec`.
* @timeout in milliseconds
* @return true - all threads completed on time, false -some tests timed out.
*/
/** Runs a set of operations in parallel as per the schedule. Each operation
* may consist of many primitive operations like reads or writes to shared
* data structure each of which should be executed using the function `exec`.
* @timeout
* in milliseconds
* @return
* true - all threads completed on time, false -some tests timed out.
*/
def runInParallel(timeout: Long, ops: List[() => Any]): Result =
numThreads = ops.length
val threadRes = Array.fill(numThreads) { None: Any }
......@@ -43,42 +46,47 @@ class Scheduler(sched: List[Int]):
// create threads
val threads = ops.zipWithIndex.map {
case (op, i) =>
new Thread(new Runnable() {
def run(): Unit = {
new Thread(new Runnable():
def run(): Unit =
val fakeId = i + 1
setThreadId(fakeId)
try {
try
updateThreadState(Start)
val res = op()
updateThreadState(End)
threadRes(i) = res
// notify the main thread if all threads have completed
if completed.incrementAndGet() == ops.length then {
if completed.incrementAndGet() == ops.length then
syncObject.synchronized { syncObject.notifyAll() }
}
} catch {
case e: Throwable if exception != None => // do nothing here and silently fail
catch
case e: Throwable
if exception != None => // do nothing here and silently fail
case e: Throwable =>
log(s"throw ${e.toString}")
exception = Some(Except(s"Thread $fakeId crashed on the following schedule: \n" + opLog.mkString("\n"),
e.getStackTrace))
exception = Some(Except(
s"Thread $fakeId crashed on the following schedule: \n" + opLog.mkString(
"\n"
),
e.getStackTrace
))
syncObject.synchronized { syncObject.notifyAll() }
//println(s"$fakeId: ${e.toString}")
//Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads)
}
}
})
// println(s"$fakeId: ${e.toString}")
// Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads)
)
}
// start all threads
threads.foreach(_.start())
// wait for all threads to complete, or for an exception to be thrown, or for the time out to expire
var remTime = timeout
syncObject.synchronized {
timed { if completed.get() != ops.length then syncObject.wait(timeout) } { time => remTime -= time }
timed { if completed.get() != ops.length then syncObject.wait(timeout) } {
time => remTime -= time
}
}
if exception.isDefined then
exception.get
else if remTime <= 1 then // timeout ? using 1 instead of zero to allow for some errors
else if remTime <= 1
then // timeout ? using 1 instead of zero to allow for some errors
Timeout(opLog.mkString("\n"))
else
// every thing executed normally
......@@ -92,7 +100,8 @@ class Scheduler(sched: List[Int]):
}
state match
case Sync(lockToAquire, locks) =>
if locks.indexOf(lockToAquire) < 0 then waitForTurn else
if locks.indexOf(lockToAquire) < 0 then waitForTurn
else
// Re-aqcuiring the same lock
updateThreadState(Running(lockToAquire +: locks))
case Start => waitStart()
......@@ -101,9 +110,9 @@ class Scheduler(sched: List[Int]):
case _ => waitForTurn // Wait, SyncUnique, VariableReadWrite
def waitStart(): Unit =
//while (threadStates.size < numThreads) {
//Thread.sleep(1)
//}
// while (threadStates.size < numThreads) {
// Thread.sleep(1)
// }
synchronized {
if threadStates.size < numThreads then
wait()
......@@ -131,27 +140,34 @@ class Scheduler(sched: List[Int]):
def log(str: String) =
if (realToFakeThreadId contains Thread.currentThread().getId()) then
val space = (" " * ((threadId - 1) * 2))
val s = space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + " ")
//println(s)
val s =
space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + " ")
// println(s)
opLog += s
/**
* Executes a read or write operation to a global data structure as per the given schedule
* @param msg a message corresponding to the operation that will be logged
*/
def exec[T](primop: => T)(msg: => String, postMsg: => Option[T => String] = None): T =
if ! (realToFakeThreadId contains Thread.currentThread().getId()) then
/** Executes a read or write operation to a global data structure as per the
* given schedule
* @param msg
* a message corresponding to the operation that will be logged
*/
def exec[T](primop: => T)(
msg: => String,
postMsg: => Option[T => String] = None
): T =
if !(realToFakeThreadId contains Thread.currentThread().getId()) then
primop
else
updateThreadState(VariableReadWrite(threadLocks))
val m = msg
if m != "" then log(m)
if opLog.size > maxOps then
throw new Exception(s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!")
throw new Exception(
s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!"
)
val res = primop
postMsg match
case Some(m) => log(m(res))
case None =>
case None =>
res
private def setThreadId(fakeId: Int) = synchronized {
......@@ -162,8 +178,10 @@ class Scheduler(sched: List[Int]):
try
realToFakeThreadId(Thread.currentThread().getId())
catch
case e: NoSuchElementException =>
throw new Exception("You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!")
case e: NoSuchElementException =>
throw new Exception(
"You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!"
)
private def isTurn(tid: Int) = synchronized {
(!schedule.isEmpty && schedule.head != tid)
......@@ -173,69 +191,93 @@ class Scheduler(sched: List[Int]):
val tid = threadId
canContinue match
case Some((i, state)) if i == tid =>
//println(s"$tid: Runs ! Was in state $state")
// println(s"$tid: Runs ! Was in state $state")
canContinue = None
state match
case Sync(lockToAquire, locks) => updateThreadState(Running(lockToAquire +: locks))
case Sync(lockToAquire, locks) =>
updateThreadState(Running(lockToAquire +: locks))
case SyncUnique(lockToAquire, locks) =>
mapOtherStates {
_ match
case SyncUnique(lockToAquire2, locks2) if lockToAquire2 == lockToAquire => Wait(lockToAquire2, locks2)
case SyncUnique(lockToAquire2, locks2)
if lockToAquire2 == lockToAquire =>
Wait(lockToAquire2, locks2)
case e => e
}
updateThreadState(Running(lockToAquire +: locks))
case VariableReadWrite(locks) => updateThreadState(Running(locks))
true
case Some((i, state)) =>
//println(s"$tid: not my turn but $i !")
// println(s"$tid: not my turn but $i !")
false
case None =>
false
var threadPreference = 0 // In the case the schedule is over, which thread should have the preference to execute.
var threadPreference =
0 // In the case the schedule is over, which thread should have the preference to execute.
/** returns true if the thread can continue to execute, and false otherwise */
def decide(): Option[(Int, ThreadState)] =
if !threadStates.isEmpty then // The last thread who enters the decision loop takes the decision.
//println(s"$threadId: I'm taking a decision")
if threadStates.values.forall { case e: Wait => true case _ => false } then
if !threadStates.isEmpty
then // The last thread who enters the decision loop takes the decision.
// println(s"$threadId: I'm taking a decision")
if threadStates.values.forall {
case e: Wait => true
case _ => false
}
then
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
throw new Exception(s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them.")
throw new Exception(
s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them."
)
else
// Threads can be in Wait, Sync, SyncUnique, and VariableReadWrite mode.
// Let's determine which ones can continue.
val notFree = threadStates.collect { case (id, state) => state.locks }.flatten.toSet
val notFree = threadStates.collect { case (id, state) =>
state.locks
}.flatten.toSet
val threadsNotBlocked = threadStates.toSeq.filter {
case (id, v: VariableReadWrite) => true
case (id, v: CanContinueIfAcquiresLock) => !notFree(v.lockToAquire) || (v.locks contains v.lockToAquire)
case _ => false
case (id, v: VariableReadWrite) => true
case (id, v: CanContinueIfAcquiresLock) =>
!notFree(v.lockToAquire) || (v.locks contains v.lockToAquire)
case _ => false
}
if threadsNotBlocked.isEmpty then
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
val whoHasLock = threadStates.toSeq.flatMap { case (id, state) => state.locks.map(lock => (lock, id)) }.toMap
val whoHasLock = threadStates.toSeq.flatMap { case (id, state) =>
state.locks.map(lock => (lock, id))
}.toMap
val reason = threadStates.collect {
case (id, state: CanContinueIfAcquiresLock) if !notFree(state.lockToAquire) =>
case (id, state: CanContinueIfAcquiresLock)
if !notFree(state.lockToAquire) =>
s"Thread $id is waiting on lock ${state.lockToAquire} held by thread ${whoHasLock(state.lockToAquire)}"
}.mkString("\n")
throw new Exception(s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason")
else if threadsNotBlocked.size == 1 then // Do not consume the schedule if only one thread can execute.
throw new Exception(
s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason"
)
else if threadsNotBlocked.size == 1
then // Do not consume the schedule if only one thread can execute.
Some(threadsNotBlocked(0))
else
val next = schedule.indexWhere(t => threadsNotBlocked.exists { case (id, state) => id == t })
val next = schedule.indexWhere(t =>
threadsNotBlocked.exists { case (id, state) => id == t }
)
if next != -1 then
//println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}")
val chosenOne = schedule(next) // TODO: Make schedule a mutable list.
// println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}")
val chosenOne =
schedule(next) // TODO: Make schedule a mutable list.
schedule = schedule.take(next) ++ schedule.drop(next + 1)
Some((chosenOne, threadStates(chosenOne)))
else
threadPreference = (threadPreference + 1) % threadsNotBlocked.size
val chosenOne = threadsNotBlocked(threadPreference) // Maybe another strategy
val chosenOne =
threadsNotBlocked(threadPreference) // Maybe another strategy
Some(chosenOne)
//threadsNotBlocked.indexOf(threadId) >= 0
// threadsNotBlocked.indexOf(threadId) >= 0
/*
val tnb = threadsNotBlocked.map(_._1).mkString(",")
val s = if (schedule.isEmpty) "empty" else schedule.mkString(",")
......@@ -243,29 +285,28 @@ class Scheduler(sched: List[Int]):
throw new Exception(s"The schedule is $s but$only threads ${tnb} can continue")*/
else canContinue
/**
* This will be called before a schedulable operation begins.
* This should not use synchronized
*/
/** This will be called before a schedulable operation begins. This should not
* use synchronized
*/
var numThreadsWaiting = new AtomicInteger(0)
//var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice.
var canContinue: Option[(Int, ThreadState)] = None // The result of the decision thread Id of the thread authorized to continue.
// var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice.
var canContinue: Option[(Int, ThreadState)] =
None // The result of the decision thread Id of the thread authorized to continue.
private def waitForTurn =
synchronized {
if numThreadsWaiting.incrementAndGet() == threadStates.size then
canContinue = decide()
notifyAll()
//waitingForDecision(threadId) = Some(numThreadsWaiting)
//println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}")
// waitingForDecision(threadId) = Some(numThreadsWaiting)
// println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}")
while !canProceed() do wait()
}
numThreadsWaiting.decrementAndGet()
/**
* To be invoked when a thread is about to complete
*/
/** To be invoked when a thread is about to complete
*/
private def removeFromSchedule(fakeid: Int) = synchronized {
//println(s"$fakeid: I'm taking a decision because I finished")
// println(s"$fakeid: I'm taking a decision because I finished")
schedule = schedule.filterNot(_ == fakeid)
threadStates -= fakeid
if numThreadsWaiting.get() == threadStates.size then
......
package instrumentation
import java.lang.management._
import java.lang.management.*
/**
* A collection of methods that can be used to collect run-time statistics about Leon programs.
* This is mostly used to test the resources properties of Leon programs
*/
/** A collection of methods that can be used to collect run-time statistics */
object Stats:
def timed[T](code: => T)(cont: Long => Unit): T =
var t1 = System.currentTimeMillis()
......