diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 8a1f7841bb0ab1d983e8ec880694736e25148fbf..17c7b9cbf505b8a5eb26eb095e584bfeec5ef678 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,4 +1,4 @@ -image: "sbtscala/scala-sbt:graalvm-ce-22.3.0-b2-java17_1.8.2_3.2.2" +image: "sbtscala/scala-sbt:graalvm-ce-22.3.0-b2-java17_1.8.3_3.2.2" variables: SBT_VERSION: "1.7.1" diff --git a/build.sbt b/build.sbt index f93a9289a63b922e2f82b2c0e10223cea6f86457..d784b6e0d914346162629790e5bde2bb9cb113f4 100644 --- a/build.sbt +++ b/build.sbt @@ -1,14 +1,28 @@ name := "cs206-demos" version := "0.1.2" -scalaVersion := "3.2.0" +scalaVersion := "3.2.2" + +val akkaVersion = "2.8.2" +val logbackVersion = "1.2.11" + libraryDependencies ++= Seq( ("com.storm-enroute" %% "scalameter-core" % "0.21").cross(CrossVersion.for3Use2_13), "org.scala-lang.modules" %% "scala-parallel-collections" % "1.0.4", + "org.scalameta" %% "munit" % "1.0.0-M7" % Test, "junit" % "junit" % "4.13" % Test, "com.github.sbt" % "junit-interface" % "0.13.3" % Test, + "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-testkit" % akkaVersion, + // SLF4J backend + // See https://doc.akka.io/docs/akka/current/typed/logging.html#slf4j-backend + "ch.qos.logback" % "logback-classic" % logbackVersion ) scalacOptions ++= Seq("-unchecked", "-deprecation") +// Enable debug logging for akka +//javaOptions ++= Seq("-Dakka.loglevel=Debug", "-Dakka.actor.debug.receive=on") + +Test / fork := true Test / parallelExecution := false Test / testOptions += Tests.Argument(TestFrameworks.JUnit) diff --git a/project/build.properties b/project/build.properties index 22af2628c41386225b9b373f5d933a0e69d19e33..72413de151e13ea03d5518fd0b9d8c3116db6ea4 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.7.1 +sbt.version=1.8.3 diff --git a/src/main/scala/concpar21final02/Problem2.scala b/src/main/scala/concpar21final02/Problem2.scala new file mode 100644 index 0000000000000000000000000000000000000000..82529b14e45531ac09cd383875967df92b8ba125 --- /dev/null +++ b/src/main/scala/concpar21final02/Problem2.scala @@ -0,0 +1,142 @@ +package concpar21final02 + +import akka.actor.* +import scala.collection.mutable +import akka.testkit.* + +object Problem2: + + ////////////////////////////// + // NOTIFICATION SERVICE // + ////////////////////////////// + + object NotificationService: + enum Protocol: + /** Notify all registered actors */ + case NotifyAll + + /** Register the actor that sent the `Register` request */ + case Register // + /** Un-register the actor that sent the `Register` request */ + case UnRegister + + enum Responses: + /** Message sent to an actor when it is notified */ + case Notification + + /** Response sent to an actor after a `Register` or `UnRegister` */ + case Registered(registered: Boolean) + + class NotificationService extends Actor: + import NotificationService.Protocol.* + import NotificationService.Responses.* + + private val registeredUsers = mutable.Set.empty[ActorRef] + + def receive: Receive = { + + case Register => + registeredUsers += sender() + sender() ! Registered(true) + case UnRegister => + registeredUsers -= sender() + sender() ! Registered(false) + case NotifyAll => + for user <- registeredUsers do user ! Notification + } + + ///////////////////////// + // DISCORD CHANNEL // + ///////////////////////// + + object DiscordChannel: + + enum Protocol: + + /** Post a message in the channel */ + case Post(msg: String) + + /** Ask for the list of most recent posts starting from the most recent + * one. The list must have at most `limit` posts. + */ + case GetLastPosts(limit: Int) + + /** Activates the service channel using the provided notification service. + */ + case Init(notificationService: ActorRef) + + enum Responses: + + /** Response to `GetLastPosts` if active */ + case Posts(msgs: List[String]) + + /** Response after `Init` if non-active */ + case Active + + /** Response `Post` and `GetLastPosts` if non-active */ + case NotActive + + /** Response after `Init` if active */ + case AlreadyActive + + class DiscordChannel extends Actor: + import DiscordChannel.Protocol.* + import DiscordChannel.Responses.* + import NotificationService.Protocol.* + + private var messages: List[String] = Nil + + def receive: Receive = nonActive + + def nonActive: Receive = { + + case Init(service) => + context.become(active(service)) + sender() ! Active + case Post(_) | GetLastPosts(_) => + sender() ! NotActive + } + + def active(notificationService: ActorRef): Receive = { + + case Post(msg) => + messages = msg :: messages + notificationService ! NotifyAll + case GetLastPosts(limit) => + sender() ! Posts(messages.take(limit)) + case Init(_) => + sender() ! AlreadyActive + } + +///////////////////////// +// DEBUG // +///////////////////////// + +/** Infrastructure to help debugging. In sbt use `run` to execute this code. The + * TestKit is an actor that can send messages and check the messages it + * receives (or not). + */ +@main def debug() = new TestKit(ActorSystem("DebugSystem")) with ImplicitSender: + import Problem2.* + import DiscordChannel.Protocol.* + import DiscordChannel.Responses.* + import NotificationService.Protocol.* + import NotificationService.Responses.* + import concurrent.duration.* + + try + val notificationService = system.actorOf(Props[NotificationService]()) + val channel = system.actorOf(Props[DiscordChannel]()) + + notificationService ! NotifyAll + expectNoMessage( + 200.millis + ) // expects no message is received in the next 200 milliseconds + + notificationService ! Register + expectMsg( + 200.millis, + Registered(true) + ) // expects to receive `Registered(true)` in the next 200 milliseconds + + finally shutdown(system) diff --git a/src/main/scala/concpar21final03/FileSystem.scala b/src/main/scala/concpar21final03/FileSystem.scala new file mode 100644 index 0000000000000000000000000000000000000000..356e2a6e42f7fec83467b82b40fab6fa83bb9dcd --- /dev/null +++ b/src/main/scala/concpar21final03/FileSystem.scala @@ -0,0 +1,41 @@ +package concpar21final03 + +import instrumentation.* + +import scala.collection.mutable +import scala.collection.concurrent.TrieMap + +type FileName = String + +/** An API for manipulating files. */ +trait FileSystem: + /** Create a new file named `file` with the passed `content`. */ + def createFile(file: FileName, content: String): Unit + + /** If `file` exists, return its content, otherwise crashes. */ + def readFile(file: FileName): String + + /** If `file` exists, delete it, otherwise crash. */ + def deleteFile(file: FileName): Unit +end FileSystem + +/** An in-memory file system for testing purposes implemented using a Map. + * + * Every method in this class is thread-safe. + */ +class InMemoryFileSystem extends FileSystem: + val fsMap: mutable.Map[FileName, String] = TrieMap() + + def createFile(file: FileName, content: String): Unit = + assert(!fsMap.contains(file), s"$file already exists") + fsMap(file) = content + + def readFile(file: FileName): String = + fsMap.get(file) match + case Some(content) => content + case None => assert(false, s"Attempt to read non-existing $file") + + def deleteFile(file: FileName): Unit = + fsMap.remove(file) + +end InMemoryFileSystem diff --git a/src/main/scala/concpar21final03/RCU.scala b/src/main/scala/concpar21final03/RCU.scala new file mode 100644 index 0000000000000000000000000000000000000000..8bfe685e22528da4c61b8b711ecd25951b5401b5 --- /dev/null +++ b/src/main/scala/concpar21final03/RCU.scala @@ -0,0 +1,36 @@ +package concpar21final03 + +import instrumentation.* + +/** A synchronization mechanism allowing multiple reads to proceed concurrently + * with an update to the state. + */ +class RCU extends Monitor: + protected val latestVersion: AtomicLong = AtomicLong(0) + protected val readersVersion: ThreadMap[Long] = ThreadMap() + + /** This method must be called before accessing shared data for reading. */ + def startRead(): Unit = + assert( + !readersVersion.currentThreadHasValue, + "startRead() cannot be called multiple times without an intervening stopRead()" + ) + readersVersion.setCurrentThreadValue(latestVersion.get) + + /** Once a thread which has previously called `startRead` has finished reading + * shared data, it must call this method. + */ + def stopRead(): Unit = + assert( + readersVersion.currentThreadHasValue, + "stopRead() cannot be called without a preceding startRead()" + ) + readersVersion.deleteCurrentThreadValue() + + /** Wait until all reads started before this method was called have finished, + * then return. + */ + def waitForOldReads(): Unit = + + val newVersion = latestVersion.incrementAndGet() + readersVersion.waitForall(_ >= newVersion) diff --git a/src/main/scala/concpar21final03/ThreadMap.scala b/src/main/scala/concpar21final03/ThreadMap.scala new file mode 100644 index 0000000000000000000000000000000000000000..d728247cfc00bc5f3e60e8575ed69508b26a40cb --- /dev/null +++ b/src/main/scala/concpar21final03/ThreadMap.scala @@ -0,0 +1,51 @@ +package concpar21final03 + +import instrumentation.* + +import scala.collection.mutable + +/** A map which associates every thread to at most one value of type A. + * + * Every method in this class is thread-safe. + */ +class ThreadMap[A] extends Monitor: + protected val theMap: mutable.Map[Thread, A] = mutable.Map() + + /** Return the value in the map entry for the current thread if it exists, + * otherwise None. + */ + def currentThreadValue: Option[A] = synchronized { + theMap.get(Thread.currentThread) + } + + /** Is there a map entry for the current thread? */ + def currentThreadHasValue: Boolean = + synchronized { + theMap.contains(Thread.currentThread) + } + + /** Set the map entry of the current thread to `value` and notify any thread + * waiting on `waitForall`. + */ + def setCurrentThreadValue(value: A): Unit = + synchronized { + theMap(Thread.currentThread) = value + notifyAll() + } + + /** Delete the map entry associated with this thread (if it exists) and notify + * all threads waiting in `waitForall`. + */ + def deleteCurrentThreadValue(): Unit = + synchronized { + theMap.remove(Thread.currentThread) + notifyAll() + } + + /** Wait until `predicate` returns true for all map entries, then return. */ + def waitForall(predicate: A => Boolean): Unit = + synchronized { + while !theMap.forall((_, value) => predicate(value)) do wait() + } + +end ThreadMap diff --git a/src/main/scala/concpar21final03/UpdateServer.scala b/src/main/scala/concpar21final03/UpdateServer.scala new file mode 100644 index 0000000000000000000000000000000000000000..9a6c59fedd2f112ad1dc6fa98784b951c7baaee4 --- /dev/null +++ b/src/main/scala/concpar21final03/UpdateServer.scala @@ -0,0 +1,46 @@ +package concpar21final03 + +import instrumentation.* + +class UpdateServer(fs: FileSystem) extends Monitor: + val rcu = new RCU + + /** The name of the file containing the latest update. + * + * This is `@volatile` to guarantee that `fetchUpdate` always sees the latest + * filename. + */ + @volatile private var updateFile: Option[FileName] = None + + /** Return the content of the latest update if one is available, otherwise + * None. + * + * This method is thread-safe. + */ + def fetchUpdate(): Option[String] = + // TODO: use `rcu` + + rcu.startRead() + val value = updateFile.map(fs.readFile) + rcu.stopRead() + value + + /** Define a new update, more precisely this will: + * - Create a new update file called `newName` with content `newContent` + * - Ensure that any future call to `fetchUpdate` returns the new update + * content. + * - Delete the old update file. + * + * This method is _NOT_ thread-safe, it cannot be safely called from multiple + * threads at once. + */ + def newUpdate(newName: FileName, newContent: String): Unit = + // TODO: use `rcu` + val oldFile = updateFile + fs.createFile(newName, newContent) + updateFile = Some(newName) + + rcu.waitForOldReads() + oldFile.foreach(fs.deleteFile) + +end UpdateServer diff --git a/src/main/scala/concpar21final03/instrumentation/AtomicLong.scala b/src/main/scala/concpar21final03/instrumentation/AtomicLong.scala new file mode 100644 index 0000000000000000000000000000000000000000..83389517581463c2a6a3895ef9a82611ef723bbe --- /dev/null +++ b/src/main/scala/concpar21final03/instrumentation/AtomicLong.scala @@ -0,0 +1,34 @@ +package concpar21final03.instrumentation + +/** A long value that may be updated atomically. */ +class AtomicLong(initial: Long): + + private val atomic = new java.util.concurrent.atomic.AtomicLong(initial) + + /** Get the current value. */ + def get: Long = atomic.get() + + /** Set to the given `value`. */ + def set(value: Long): Unit = atomic.set(value) + + /** Atomically increment by one the current value and return the _original_ + * value. + */ + def getAndIncrement(): Long = + atomic.getAndIncrement() + + /** Atomically increment by one the current value and return the _updated_ + * value. + */ + def incrementAndGet(): Long = + atomic.incrementAndGet() + + /** Atomically set the value to `newValue` if the current value == `expected`. + * + * Return true if successful, otherwise return false to indicate that the + * actual value was not equal to the expected value. + */ + def compareAndSet(expected: Long, newValue: Long): Boolean = + atomic.compareAndSet(expected, newValue) + +end AtomicLong diff --git a/src/main/scala/concpar21final03/instrumentation/Monitor.scala b/src/main/scala/concpar21final03/instrumentation/Monitor.scala new file mode 100644 index 0000000000000000000000000000000000000000..ac6f6d2b25276a007b75d042ad8a81be7b620c51 --- /dev/null +++ b/src/main/scala/concpar21final03/instrumentation/Monitor.scala @@ -0,0 +1,22 @@ +package concpar21final03.instrumentation + +class Dummy + +trait Monitor: + implicit val dummy: Dummy = new Dummy + + def wait()(implicit i: Dummy) = waitDefault() + + def synchronized[T](e: => T)(implicit i: Dummy) = synchronizedDefault(e) + + def notify()(implicit i: Dummy) = notifyDefault() + + def notifyAll()(implicit i: Dummy) = notifyAllDefault() + + private val lock = new AnyRef + + // Can be overridden. + def waitDefault(): Unit = lock.wait() + def synchronizedDefault[T](toExecute: => T): T = lock.synchronized(toExecute) + def notifyDefault(): Unit = lock.notify() + def notifyAllDefault(): Unit = lock.notifyAll() diff --git a/src/main/scala/concpar22final01/Problem1.scala b/src/main/scala/concpar22final01/Problem1.scala new file mode 100644 index 0000000000000000000000000000000000000000..09406a1e928a10ab931e956bcfe8499dbc30384e --- /dev/null +++ b/src/main/scala/concpar22final01/Problem1.scala @@ -0,0 +1,64 @@ +package concpar22final01 + +trait Problem1 extends Lib: + + class DLLCombinerImplementation extends DLLCombiner: + + // Copies every other Integer element of data array, starting from the first (index 0), up to the middle + def task1(data: Array[Int]) = task { + + var current = first + var i = 0 + while current != null && i < size / 2 do + data(i) = current.value + i += 2 + current = current.getNext2 + } + + // Copies every other Integer element of data array, starting from the second, up to the middle + def task2(data: Array[Int]) = task { + + var current = second + var i = 1 + while current != null && i < size / 2 do + data(i) = current.value + i += 2 + current = current.getNext2 + } + + // Copies every other Integer element of data array, starting from the second to last, up to the middle + def task3(data: Array[Int]) = task { + + var current = secondToLast + var i = size - 2 + while current != null && i >= size / 2 do + data(i) = current.value + i -= 2 + current = current.getPrevious2 + } + + // Copies every other Integer element of data array, starting from the last, up to the middle + // This is executed on the current thread. + def task4(data: Array[Int]) = + + var current = last + var i = size - 1 + while current != null && i >= size / 2 do + data(i) = current.value + i -= 2 + current = current.getPrevious2 + + def result(): Array[Int] = + val data = new Array[Int](size) + + val t1 = task1(data) + val t2 = task2(data) + val t3 = task3(data) + + task4(data) + + t1.join() + t2.join() + t3.join() + + data diff --git a/src/main/scala/concpar22final01/lib.scala b/src/main/scala/concpar22final01/lib.scala new file mode 100644 index 0000000000000000000000000000000000000000..64839702a2cb4da9839fa288aae3c442e4c65ecb --- /dev/null +++ b/src/main/scala/concpar22final01/lib.scala @@ -0,0 +1,82 @@ +package concpar22final01 + +import java.util.concurrent.* +import scala.util.DynamicVariable + +trait Lib: + class Node(val value: Int): + protected var next: Node = null // null for last node. + protected var next2: Node = null // null for last node. + protected var previous: Node = null // null for first node. + protected var previous2: Node = null // null for first node. + + def getNext: Node = next // do NOT use in the result method + def getNext2: Node = next2 + def getPrevious: Node = previous // do NOT use in the result method + def getPrevious2: Node = previous2 + + def setNext(n: Node): Unit = next = n + def setNext2(n: Node): Unit = next2 = n + def setPrevious(n: Node): Unit = previous = n + def setPrevious2(n: Node): Unit = previous2 = n + + // Simplified Combiner interface + // Implements methods += and combine + // Abstract methods should be implemented in subclasses + abstract class DLLCombiner: + var first: Node = null // null for empty lists. + var last: Node = null // null for empty lists. + + var second: Node = null // null for empty lists. + var secondToLast: Node = null // null for empty lists. + + var size: Int = 0 + + // Adds an Integer to this array combiner. + def +=(elem: Int): Unit = + val node = new Node(elem) + if size == 0 then + first = node + last = node + size = 1 + else + last.setNext(node) + node.setPrevious(last) + node.setPrevious2(last.getPrevious) + if size > 1 then last.getPrevious.setNext2(node) + else second = node + secondToLast = last + last = node + size += 1 + + // Combines this array combiner and another given combiner in constant O(1) complexity. + def combine(that: DLLCombiner): DLLCombiner = + if this.size == 0 then that + else if that.size == 0 then this + else + this.last.setNext(that.first) + this.last.setNext2(that.first.getNext) + if this.last.getPrevious != null then + this.last.getPrevious.setNext2(that.first) // important + + that.first.setPrevious(this.last) + that.first.setPrevious2(this.last.getPrevious) + if that.first.getNext != null then + that.first.getNext.setPrevious2(this.last) // important + + if this.size == 1 then second = that.first + + this.size = this.size + that.size + this.last = that.last + this.secondToLast = that.secondToLast + + this + + def task1(data: Array[Int]): ForkJoinTask[Unit] + def task2(data: Array[Int]): ForkJoinTask[Unit] + def task3(data: Array[Int]): ForkJoinTask[Unit] + def task4(data: Array[Int]): Unit + + def result(): Array[Int] + + def task[T](body: => T): ForkJoinTask[T] diff --git a/src/main/scala/concpar22final02/AbstractBarrier.scala b/src/main/scala/concpar22final02/AbstractBarrier.scala new file mode 100644 index 0000000000000000000000000000000000000000..decdbc9b912db765987efeb8cb78b554156829d5 --- /dev/null +++ b/src/main/scala/concpar22final02/AbstractBarrier.scala @@ -0,0 +1,11 @@ +package concpar22final02 + +import instrumentation.Monitor + +abstract class AbstractBarrier(val numThreads: Int) extends Monitor: + + var count = numThreads + + def awaitZero(): Unit + + def countDown(): Unit diff --git a/src/main/scala/concpar22final02/Barrier.scala b/src/main/scala/concpar22final02/Barrier.scala new file mode 100644 index 0000000000000000000000000000000000000000..b2544261f369f7f7fb02482f635a09dffa882854 --- /dev/null +++ b/src/main/scala/concpar22final02/Barrier.scala @@ -0,0 +1,14 @@ +package concpar22final02 + +class Barrier(numThreads: Int) extends AbstractBarrier(numThreads): + + def awaitZero(): Unit = + synchronized { + while count > 0 do wait() + } + + def countDown(): Unit = + synchronized { + count -= 1 + if count <= 0 then notifyAll() + } diff --git a/src/main/scala/concpar22final02/ImageLib.scala b/src/main/scala/concpar22final02/ImageLib.scala new file mode 100644 index 0000000000000000000000000000000000000000..049e2efa3d404518397aed55cb7fe33116b6bbf9 --- /dev/null +++ b/src/main/scala/concpar22final02/ImageLib.scala @@ -0,0 +1,57 @@ +package concpar22final02 + +import scala.collection.mutable.ArrayBuffer + +class ImageLib(size: Int): + + val buffer1: ArrayBuffer[ArrayBuffer[Int]] = ArrayBuffer.fill(size, size)(1) + val buffer2: ArrayBuffer[ArrayBuffer[Int]] = ArrayBuffer.fill(size, size)(0) + + enum Filter(val kernel: Array[Array[Int]]): + case Outline extends Filter(Array( + Array(-1, -1, -1), + Array(-1, 8, -1), + Array(-1, -1, -1) + )) + case Sharpen extends Filter(Array( + Array(0, -1, 0), + Array(-1, 5, -1), + Array(0, -1, 0) + )) + case Emboss + extends Filter(Array(Array(-2, -1, 0), Array(-1, 1, 1), Array(0, 1, 2))) + case Identity + extends Filter(Array(Array(0, 0, 0), Array(0, 1, 0), Array(0, 0, 0))) + + def init(input: ArrayBuffer[ArrayBuffer[Int]]) = + for i <- 0 to size - 1 do + for j <- 0 to size - 1 do + buffer1(i)(j) = input(i)(j) + + def computeConvolution( + kernel: Array[Array[Int]], + input: ArrayBuffer[ArrayBuffer[Int]], + row: Int, + column: Int + ): Int = + + val displacement = Array(-1, 0, 1) + var output = 0 + + for i <- 0 to 2 do + for j <- 0 to 2 do + val newI = row + displacement(i) + val newJ = column + displacement(j) + if newI < 0 || newI >= size || newJ < 0 || newJ >= size then output += 0 + else output += (kernel(i)(j) * input(newI)(newJ)) + + output + + def applyFilter( + kernel: Array[Array[Int]], + input: ArrayBuffer[ArrayBuffer[Int]], + output: ArrayBuffer[ArrayBuffer[Int]], + row: Int + ): Unit = + for i <- 0 to input(row).size - 1 do + output(row)(i) = computeConvolution(kernel, input, row, i) diff --git a/src/main/scala/concpar22final02/Problem2.scala b/src/main/scala/concpar22final02/Problem2.scala new file mode 100644 index 0000000000000000000000000000000000000000..5041e8632950bec60c7e5882b8a1c6e1e53b3562 --- /dev/null +++ b/src/main/scala/concpar22final02/Problem2.scala @@ -0,0 +1,38 @@ +package concpar22final02 + +import java.util.concurrent.atomic.AtomicInteger +import scala.collection.mutable.ArrayBuffer + +class Problem2(imageSize: Int, numThreads: Int, numFilters: Int): + + val barrier: ArrayBuffer[Barrier] = + ArrayBuffer.fill(numFilters)(Barrier(numThreads)) + + val imageLib: ImageLib = ImageLib(imageSize) + + def imagePipeline( + filters: Array[imageLib.Filter], + rows: Array[Int] + ): ArrayBuffer[ArrayBuffer[Int]] = + for i <- 0 to filters.size - 1 do + for j <- 0 to rows.size - 1 do + if i % 2 == 0 then + imageLib.applyFilter( + filters(i).kernel, + imageLib.buffer1, + imageLib.buffer2, + rows(j) + ) + else + imageLib.applyFilter( + filters(i).kernel, + imageLib.buffer2, + imageLib.buffer1, + rows(j) + ) + + barrier(i).countDown() + barrier(i).awaitZero() + + if filters.size % 2 == 0 then imageLib.buffer1 + else imageLib.buffer2 diff --git a/src/main/scala/concpar22final02/instrumentation/Monitor.scala b/src/main/scala/concpar22final02/instrumentation/Monitor.scala new file mode 100644 index 0000000000000000000000000000000000000000..a829ec7ad708b05fa6d7f6b82550e9ce6b234757 --- /dev/null +++ b/src/main/scala/concpar22final02/instrumentation/Monitor.scala @@ -0,0 +1,38 @@ +package concpar22final02.instrumentation + +class Dummy + +trait Monitor: + implicit val dummy: Dummy = new Dummy + + def wait()(implicit i: Dummy) = waitDefault() + + def synchronized[T](e: => T)(implicit i: Dummy) = synchronizedDefault(e) + + def notify()(implicit i: Dummy) = notifyDefault() + + def notifyAll()(implicit i: Dummy) = notifyAllDefault() + + private val lock = new AnyRef + + // Can be overriden. + def waitDefault(): Unit = lock.wait() + def synchronizedDefault[T](toExecute: => T): T = lock.synchronized(toExecute) + def notifyDefault(): Unit = lock.notify() + def notifyAllDefault(): Unit = lock.notifyAll() + +trait LockFreeMonitor extends Monitor: + override def waitDefault() = + throw new Exception("Please use lock-free structures and do not use wait()") + override def synchronizedDefault[T](toExecute: => T): T = + throw new Exception( + "Please use lock-free structures and do not use synchronized()" + ) + override def notifyDefault() = + throw new Exception( + "Please use lock-free structures and do not use notify()" + ) + override def notifyAllDefault() = + throw new Exception( + "Please use lock-free structures and do not use notifyAll()" + ) diff --git a/src/main/scala/concpar22final02/instrumentation/Stats.scala b/src/main/scala/concpar22final02/instrumentation/Stats.scala new file mode 100644 index 0000000000000000000000000000000000000000..bd2dcc801bec50562d068c775b188bab042d98b8 --- /dev/null +++ b/src/main/scala/concpar22final02/instrumentation/Stats.scala @@ -0,0 +1,17 @@ +/* Copyright 2009-2015 EPFL, Lausanne */ +package concpar22final02.instrumentation + +import java.lang.management.* + +/** A collection of methods that can be used to collect run-time statistics */ +object Stats: + def timed[T](code: => T)(cont: Long => Unit): T = + var t1 = System.currentTimeMillis() + val r = code + cont((System.currentTimeMillis() - t1)) + r + + def withTime[T](code: => T): (T, Long) = + var t1 = System.currentTimeMillis() + val r = code + (r, (System.currentTimeMillis() - t1)) diff --git a/src/main/scala/concpar22final03/Economics.scala b/src/main/scala/concpar22final03/Economics.scala new file mode 100644 index 0000000000000000000000000000000000000000..ec01f6d6c1c41705368f1d06a22c2b81dec3ce60 --- /dev/null +++ b/src/main/scala/concpar22final03/Economics.scala @@ -0,0 +1,48 @@ +package concpar22final03 + +import scala.concurrent.Future + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future +import scala.util.Random + +trait Economics: + + /** A trading card from the game Scala: The Programming. We can own a card, + * but once don't anymore. + */ + final class Card(val name: String) + def isMine(c: Card): Boolean + + /** This function uses the best available database to return the sell value of + * a card on the market. + */ + def valueOf(cardName: String): Int = List(1, cardName.length).max + + /** This method represents an exact amount of money that can be hold, spent, + * or put in the bank + */ + final class MoneyBag() + def moneyIn(m: MoneyBag): Int + + /** If you sell a card, at some point in the future you will get some money + * (in a bag). + */ + def sellCard(c: Card): Future[MoneyBag] + + /** You can buy any "Scala: The Programming" card by providing a bag of money + * with the appropriate amount and waiting for the transaction to take place. + * You will own the returned card. + */ + def buyCard(money: MoneyBag, name: String): Future[Card] + + /** This simple bank account holds money for you. You can bring a money bag to + * increase your account's balance, or withdraw a money bag of any size not + * greater than your account's balance. + */ + def balance: Int + def withdraw(amount: Int): Future[MoneyBag] + def deposit(bag: MoneyBag): Future[Unit] + + class NotEnoughMoneyException + extends Exception("Not enough money provided to buy those cards") diff --git a/src/main/scala/concpar22final03/Problem3.scala b/src/main/scala/concpar22final03/Problem3.scala new file mode 100644 index 0000000000000000000000000000000000000000..e68988c244a3aa6dfef76758e11b8d90486cb004 --- /dev/null +++ b/src/main/scala/concpar22final03/Problem3.scala @@ -0,0 +1,56 @@ +package concpar22final03 + +import scala.concurrent.Future +import concurrent.ExecutionContext.Implicits.global + +trait Problem3: + val economics: Economics + import economics.* + + /** The objective is to propose a service of deck building. People come to you + * with some money and some cards they want to sell, and you need to return + * them a complete deck of the cards they want. + */ + def orderDeck( + bag: MoneyBag, + cardsToSell: List[Card], + wantedDeck: List[String] + ): Future[List[Card]] = + Future { + val totalGivenMoney = + cardsToSell.foldLeft(moneyIn(bag))((sum, c) => sum + valueOf(c.name)) + val totalNeededMoney = + wantedDeck.foldLeft(0)((sum, n) => sum + valueOf(n)) + if totalGivenMoney < totalNeededMoney then + throw new NotEnoughMoneyException() + val soldCards: Future[Unit] = + if moneyIn(bag) != 0 then + sellListOfCards(cardsToSell).zip(deposit(bag)).map(_ => ()) + else sellListOfCards(cardsToSell).map(_ => ()) + soldCards.flatMap { _ => buyListOfCards(wantedDeck) } + }.flatten + + /** This helper function will sell the provided list of cards and put the + * money on your personal bank account. It returns a Future of Unit, which + * indicates when all sales are completed. + */ + def sellListOfCards(cardsToSell: List[Card]): Future[Unit] = + val moneyFromSales: List[Future[Unit]] = cardsToSell.map { c => + sellCard(c).flatMap(m => deposit(m).map { _ => }) + } + Future + .sequence(moneyFromSales) + .map(_ => + () + ) // Future.sequence transforms a List[Future[A]] into a Future[List[A]] + + /** This helper function, given a list of wanted card names and assuming there + * is enough money in the bank account, will buy (in the future) those cards, + * and return them. + */ + def buyListOfCards(wantedDeck: List[String]): Future[List[Card]] = + + val boughtCards: List[Future[Card]] = wantedDeck.map { name => + withdraw(valueOf(name)).flatMap(mb => buyCard(mb, name)) + } + Future.sequence(boughtCards) diff --git a/src/main/scala/concpar22final04/Problem4.scala b/src/main/scala/concpar22final04/Problem4.scala new file mode 100644 index 0000000000000000000000000000000000000000..c2b1dade8a20b15b7ce034623b7a9be36f5b545c --- /dev/null +++ b/src/main/scala/concpar22final04/Problem4.scala @@ -0,0 +1,225 @@ +package concpar22final04 + +import akka.actor.* +import akka.testkit.* +import java.util.Date +import akka.event.LoggingReceive +import akka.pattern.* +import akka.util.Timeout +import concurrent.duration.* +import scala.concurrent.Future +import scala.concurrent.ExecutionContext + +given timeout: Timeout = Timeout(200.millis) + +/** Data associated with a song: a unique `id`, a `title` and an `artist`. + */ +case class Song(id: Int, title: String, artist: String) + +/** An activity in a user's activity feed, representing that `userRef` is + * listening to `songId`. + */ +case class Activity(userId: String, userName: String, songId: Int) + +/** Companion object of the `User` class. + */ +object User: + /** Messages that can be sent to User actors. + */ + enum Protocol: + /** Asks for a user name and id. Should be answered by a Response.Info. + */ + case GetInfo + + /** Asks home page data. Should be answered by a Response.HomepageData. + */ + case GetHomepageData + + /** Like song with id `songId`. + */ + case Like(songId: Int) + + /** Unlike song with id `songId`. + */ + case Unlike(songId: Int) + + /** Adds `subscriber` to the list of subscribers. + */ + case Subscribe(subscriber: ActorRef) + + /** Remove `subscriber` from the list of subscribers. + */ + case Unsubscribe(subscriber: ActorRef) + + /** Adds the activity `activity` to the activity feed. This message will be + * sent by the users this user has subscribed to. + */ + case AddActivity(activity: Activity) + + /** Sent when a user starts playing a song with id `songId`. The recipient + * should notify all its subscribers to update their activity feeds by + * sending them `AddActivity(Activity(...))` messages. No answer is + * expected. This message is sent by external actors. + */ + case Play(songId: Int) + + /** Asks for home page text. Should be answered by a Response.HomepageText. + */ + case GetHomepageText + + /** Responses that can be sent back from User actors. + */ + enum Responses: + /** Answer to a Protocol.GetInfo message + */ + case Info(id: String, name: String) + + /** Answer to a Protocol.GetHomepageData message + */ + case HomepageData(songIds: List[Int], activities: List[Activity]) + + /** Answer to a Protocol.GetHomepageText message + */ + case HomepageText(result: String) + +/** The `User` actor, responsible to handle `User.Protocol` messages. + */ +class User(id: String, name: String, songsStore: ActorRef) extends Actor: + import User.* + import User.Protocol.* + import User.Responses.* + import SongsStore.Protocol.* + import SongsStore.Responses.* + + given ExecutionContext = context.system.dispatcher + + /** Liked songs, by reverse date of liking time (the last liked song must be + * the first must be the first element of the list). Elements of this list + * must be unique: a song can only be liked once. Liking a song twice should + * not change the order. + */ + var likedSongs: List[Int] = List() + + /** Users who have subscribed to this users. + */ + var subscribers: Set[ActorRef] = Set() + + /** Activity feed, by reverse date of activity time (the last added activity + * must be the first element of the list). Items in this list should be + * unique by `userRef`. If a new activity with a `userRef` already in the + * list is added, the former should be removed, so that we always see the + * latest activity for each user we have subscribed to. + */ + var activityFeed: List[Activity] = List() + + /** This actor's behavior. */ + + override def receive: Receive = LoggingReceive { + case GetInfo => + sender() ! Info(id, name) + case GetHomepageData => + sender() ! HomepageData(likedSongs, activityFeed) + case Like(songId) if !likedSongs.contains(songId) => + likedSongs = songId :: likedSongs + case Unlike(songId) => + likedSongs = likedSongs.filter(_ != songId) + case Subscribe(ref: ActorRef) => + subscribers = subscribers + ref + case Unsubscribe(ref: ActorRef) => + subscribers = subscribers - ref + case AddActivity(activity: Activity) => + activityFeed = + activity :: activityFeed.filter(_.userId != activity.userId) + case Play(songId) => + subscribers.foreach(_ ! AddActivity(Activity(id, name, songId))) + case GetHomepageText => + val likedSongsFuture: Future[Songs] = + (songsStore ? GetSongs(likedSongs)).mapTo[Songs] + val activitySongsFuture: Future[Songs] = + (songsStore ? GetSongs(activityFeed.map(_.songId))).mapTo[Songs] + val response: Future[HomepageText] = + for + likedSongs <- likedSongsFuture; + activitySongs <- activitySongsFuture + yield HomepageText( + f""" + |Howdy ${name}! + | + |Liked Songs: + |${likedSongs.songs + .map(song => f"* ${song.title} by ${song.artist}") + .mkString("\n")} + | + |Activity Feed: + |${activityFeed + .zip(activitySongs.songs) + .map((activity, song) => + f"* ${activity.userName} is listening to ${song.title} by ${song.artist}" + ) + .mkString("\n")}""".stripMargin.trim + ) + response.pipeTo(sender()) + } + +/** Objects containing the messages a songs store should handle. + */ +object SongsStore: + /** Ask information about a list of songs by their ids. + */ + enum Protocol: + case GetSongs(songIds: List[Int]) + + /** List of `Song` corresponding to the list of IDs given to `GetSongs`. + */ + enum Responses: + case Songs(songs: List[Song]) + +/** A mock implementation of a songs store. + */ +class MockSongsStore extends Actor: + import SongsStore.Protocol.* + import SongsStore.Responses.* + import SongsStore.* + + val songsDB = Map( + 1 -> Song(1, "High Hopes", "Pink Floyd"), + 2 -> Song(2, "Sunny", "Boney M."), + 3 -> Song(3, "J'irai où tu iras", "Céline Dion & Jean-Jacques Goldman"), + 4 -> Song(4, "Ce monde est cruel", "Vald"), + 5 -> Song(5, "Strobe", "deadmau5"), + 6 -> Song(6, "Désenchantée", "Mylène Farmer"), + 7 -> Song(7, "Straight Edge", "Minor Threat"), + 8 -> Song(8, "Hold the line", "TOTO"), + 9 -> Song(9, "Anarchy in the UK", "Sex Pistols"), + 10 -> Song(10, "Breakfast in America", "Supertramp") + ) + + override def receive: Receive = LoggingReceive { case GetSongs(songsIds) => + sender() ! Songs(songsIds.map(songsDB)) + } + +///////////////////////// +// DEBUG // +///////////////////////// + +/** Infrastructure to help debugging. In sbt use `run` to execute this code. The + * TestKit is an actor that can send messages and check the messages it + * receives (or not). + */ +@main def debug() = new TestKit(ActorSystem("DebugSystem")) with ImplicitSender: + import User.* + import User.Protocol.* + import User.Responses.* + import SongsStore.Protocol.* + import SongsStore.Responses.* + + try + val songsStore = system.actorOf(Props(MockSongsStore()), "songsStore") + val anita = system.actorOf(Props(User("100", "Anita", songsStore))) + + anita ! Like(6) + expectNoMessage() // expects no message is received + + anita ! GetHomepageData + expectMsg(HomepageData(List(6), List())) + finally shutdown(system) diff --git a/src/main/scala/lecture6/05-DiningPhilosophers.scala b/src/main/scala/lecture6/05-DiningPhilosophers.scala index c83fc4d15eca47302e540d98e4ae3854c5272938..8bd4b3069fed02d2975f14ec27918bfa009d7938 100644 --- a/src/main/scala/lecture6/05-DiningPhilosophers.scala +++ b/src/main/scala/lecture6/05-DiningPhilosophers.scala @@ -6,7 +6,8 @@ def philosopherTurn(i: Int, l: Fork, r: Fork): Unit = Thread.sleep(10) var (left, right) = - if i % 2 == 0 then (l, r) else (r, l) + if i % 2 == 0 then (l, r) + else (r, l) // if l.index < r.index then (l, r) else (r, l) left.synchronized { diff --git a/src/test/scala/concpar21final02/Problem2Suite.scala b/src/test/scala/concpar21final02/Problem2Suite.scala new file mode 100644 index 0000000000000000000000000000000000000000..b0aa9f2c9b48a00849561f4187be5d32f07c1810 --- /dev/null +++ b/src/test/scala/concpar21final02/Problem2Suite.scala @@ -0,0 +1,176 @@ +package concpar21final02 + +import akka.actor.* +import akka.testkit.* +import scala.collection.mutable +import concurrent.duration.* + +import Problem2.* + +class Problem2Suite extends munit.FunSuite: + import NotificationService.Protocol.* + import NotificationService.Responses.* + import DiscordChannel.Protocol.* + import DiscordChannel.Responses.* + + test("Notification register (1pts)") { + new MyTestKit: + def tests() = + val actor = system.actorOf(Props[NotificationService]()) + actor ! Register + expectMsg(Registered(true)) + } + + test("Notification register and un-register (1pts)") { + new MyTestKit: + def tests() = + val actor = system.actorOf(Props[NotificationService]()) + actor ! Register + expectMsg(Registered(true)) + actor ! UnRegister + expectMsg(Registered(false)) + actor ! UnRegister + expectMsg(Registered(false)) + actor ! Register + expectMsg(Registered(true)) + actor ! UnRegister + expectMsg(Registered(false)) + } + + test("Notification notify (1pts)") { + new MyTestKit: + def tests() = + val actor = system.actorOf(Props[NotificationService]()) + actor ! Register + expectMsg(Registered(true)) + actor ! NotifyAll + expectMsg(Notification) + actor ! NotifyAll + expectMsg(Notification) + actor ! UnRegister + expectMsg(Registered(false)) + actor ! NotifyAll + expectNoMessage() + actor ! Register + expectMsg(Registered(true)) + actor ! NotifyAll + expectMsg(Notification) + actor ! UnRegister + expectMsg(Registered(false)) + actor ! NotifyAll + expectNoMessage() + } + + test("NotifyAll from other actor (1pts)") { + new MyTestKit: + def tests() = + val actor = system.actorOf(Props[NotificationService]()) + val otherActor = system.actorOf(Props[DummyActor]()) + + def notifyFormAllFromOtherActor() = + given ActorRef = otherActor + actor ! NotifyAll + + expectNoMessage() + + actor ! Register + expectMsg(Registered(true)) + + notifyFormAllFromOtherActor() + expectMsg(Notification) + } + + test("Channel init (1pts)") { + new MyTestKit: + def tests() = + val notificationService = system.actorOf(Props[NotificationService]()) + val channel = system.actorOf(Props[DiscordChannel]()) + channel ! Init(notificationService) + expectMsg(Active) + } + + test("Channel post and get post (1pts)") { + new MyTestKit: + def tests() = + val notificationService = system.actorOf(Props[NotificationService]()) + val channel = system.actorOf(Props[DiscordChannel]()) + channel ! Init(notificationService) + expectMsg(Active) + channel ! Post("hello") + channel ! GetLastPosts(1) + expectMsg(Posts(List("hello"))) + channel ! GetLastPosts(10) + expectMsg(Posts(List("hello"))) + channel ! GetLastPosts(0) + expectMsg(Posts(Nil)) + } + + test("Channel multiple posts (1pts)") { + new MyTestKit: + def tests() = + val notificationService = system.actorOf(Props[NotificationService]()) + val channel = system.actorOf(Props[DiscordChannel]()) + channel ! Init(notificationService) + expectMsg(Active) + channel ! Post("hello") + channel ! Post("world") + channel ! GetLastPosts(2) + channel ! GetLastPosts(1) + channel ! Post("!") + channel ! GetLastPosts(3) + expectMsg(Posts(List("world", "hello"))) + expectMsg(Posts(List("world"))) + expectMsg(Posts(List("!", "world", "hello"))) + } + + test("Channel posts and notify (1pts)") { + new MyTestKit: + def tests() = + val notificationService = system.actorOf(Props[NotificationService]()) + val channel = system.actorOf(Props[DiscordChannel]()) + channel ! Init(notificationService) + expectMsg(Active) + notificationService ! Register + expectMsg(Registered(true)) + channel ! Post("hello") + channel ! Post("world") + expectMsg(Notification) + expectMsg(Notification) + } + + test("Channel init twice (1pts)") { + new MyTestKit: + def tests() = + val notificationService = system.actorOf(Props[NotificationService]()) + val channel = system.actorOf(Props[DiscordChannel]()) + channel ! Init(notificationService) + expectMsg(Active) + channel ! Init(notificationService) + expectMsg(AlreadyActive) + channel ! Init(notificationService) + expectMsg(AlreadyActive) + } + + test("Channel not active (1pts)") { + new MyTestKit: + def tests() = + val channel1 = system.actorOf(Props[DiscordChannel]()) + channel1 ! Post("hello") + expectMsg(NotActive) + + val channel2 = system.actorOf(Props[DiscordChannel]()) + channel2 ! GetLastPosts(0) + expectMsg(NotActive) + } + + abstract class MyTestKit + extends TestKit(ActorSystem("TestSystem")) + with ImplicitSender: + def tests(): Unit + try tests() + finally shutdown(system) + +class DummyActor extends Actor: + def receive: Receive = { case _ => + () + } diff --git a/src/test/scala/concpar21final03/Problem3Suite.scala b/src/test/scala/concpar21final03/Problem3Suite.scala new file mode 100644 index 0000000000000000000000000000000000000000..f4b0b8ff6d8255dafcd3244bd974956adea8a467 --- /dev/null +++ b/src/test/scala/concpar21final03/Problem3Suite.scala @@ -0,0 +1,118 @@ +package concpar21final03 + +import scala.annotation.tailrec +import scala.concurrent.* +import scala.concurrent.duration.* +import scala.collection.mutable.HashMap +import scala.util.Random +import instrumentation.* +import instrumentation.TestHelper.* +import instrumentation.TestUtils.* + +class Problem3Suite extends munit.FunSuite: + + test("Part 1: ThreadMap (3pts)") { + testManySchedules( + 4, + sched => + val tmap = new SchedulableThreadMap[Int](sched) + + def writeThread(): Unit = + tmap.setCurrentThreadValue(0) + tmap.setCurrentThreadValue(-1) + val readBack = tmap.currentThreadValue + assertEquals(readBack, Some(-1)) + + def writeAndDeleteThread(): Unit = + tmap.setCurrentThreadValue(42) + tmap.deleteCurrentThreadValue() + + @tailrec + def waitThread(): Unit = + tmap.waitForall(_ < 0) + val all = tmap.allValues + if all != List(-1) then waitThread() + + val threads = List( + () => writeThread(), + () => writeAndDeleteThread(), + () => waitThread(), + () => waitThread() + ) + + (threads, _ => (true, "")) + ) + } + + test("Part 2: RCU (5pts)") { + testManySchedules( + 3, + sched => + val rcu = new SchedulableRCU(sched) + + case class State( + value: Int, + isDeleted: AtomicLong = SchedulableAtomicLong(0, sched, "isDeleted") + ) + + val sharedState = + SchedulableAtomicReference(State(0), sched, "sharedState") + + def readThread(): Unit = + rcu.startRead() + val state = sharedState.get + val stateWasDeleted = state.isDeleted.get != 0 + assert( + !stateWasDeleted, + "RCU shared state deleted in the middle of a read." + ) + rcu.stopRead() + + def writeThread(): Unit = + val oldState = sharedState.get + sharedState.set(State(oldState.value + 1)) + rcu.waitForOldReads() + oldState.isDeleted.set(1) + + val threads = List( + () => readThread(), + () => readThread(), + () => writeThread() + ) + + (threads, _ => (true, "")) + ) + } + + test("Part 3: UpdateServer (2pts)") { + testManySchedules( + 3, + sched => + val fs = SchedulableInMemoryFileSystem(sched) + val server = new SchedulableUpdateServer(sched, fs) + + def writeThread(): Unit = + server.newUpdate("update1.bin", "Update 1") + server.newUpdate("update2.bin", "Update 2") + assertEquals(fs.fsMap.toSet, Set("update2.bin" -> "Update 2")) + + def fetchThread(): Unit = + val res = server.fetchUpdate() + assert( + List(None, Some("Update 1"), Some("Update 2")).contains(res), + s"fetchUpdate returned unexpected value $res" + ) + + val threads = List( + () => writeThread(), + () => fetchThread(), + () => fetchThread() + ) + + (threads, _ => (true, "")) + ) + } + + import scala.concurrent.duration.* + override val munitTimeout = 200.seconds +end Problem3Suite diff --git a/src/test/scala/concpar21final03/instrumentation/AtomicReference.scala b/src/test/scala/concpar21final03/instrumentation/AtomicReference.scala new file mode 100644 index 0000000000000000000000000000000000000000..784e02a4f5f197e7a8e1e518e81b267299ae5adc --- /dev/null +++ b/src/test/scala/concpar21final03/instrumentation/AtomicReference.scala @@ -0,0 +1,10 @@ +package concpar21final03.instrumentation + +class AtomicReference[T](initial: T): + + private val atomic = + new java.util.concurrent.atomic.AtomicReference[T](initial) + + def get: T = atomic.get() + + def set(value: T): Unit = atomic.set(value) diff --git a/src/test/scala/concpar21final03/instrumentation/MockedMonitor.scala b/src/test/scala/concpar21final03/instrumentation/MockedMonitor.scala new file mode 100644 index 0000000000000000000000000000000000000000..af27165bc0e22a3f8e9e5940f8ec045b42951ac2 --- /dev/null +++ b/src/test/scala/concpar21final03/instrumentation/MockedMonitor.scala @@ -0,0 +1,73 @@ +package concpar21final03.instrumentation + +trait MockedMonitor extends Monitor: + def scheduler: Scheduler + + // Can be overriden. + override def waitDefault() = + scheduler.log("wait") + scheduler updateThreadState Wait(this, scheduler.threadLocks.tail) + override def synchronizedDefault[T](toExecute: => T): T = + scheduler.log("synchronized check") + val prevLocks = scheduler.threadLocks + scheduler updateThreadState Sync( + this, + prevLocks + ) // If this belongs to prevLocks, should just continue. + scheduler.log("synchronized -> enter") + try toExecute + finally + scheduler updateThreadState Running(prevLocks) + scheduler.log("synchronized -> out") + override def notifyDefault() = + scheduler mapOtherStates { state => + state match + case Wait(lockToAquire, locks) if lockToAquire == this => + SyncUnique(this, state.locks) + case e => e + } + scheduler.log("notify") + override def notifyAllDefault() = + scheduler mapOtherStates { state => + state match + case Wait(lockToAquire, locks) if lockToAquire == this => + Sync(this, state.locks) + case SyncUnique(lockToAquire, locks) if lockToAquire == this => + Sync(this, state.locks) + case e => e + } + scheduler.log("notifyAll") + +trait LockFreeMonitor extends Monitor: + override def waitDefault() = + throw new Exception("Please use lock-free structures and do not use wait()") + override def synchronizedDefault[T](toExecute: => T): T = + throw new Exception( + "Please use lock-free structures and do not use synchronized()" + ) + override def notifyDefault() = + throw new Exception( + "Please use lock-free structures and do not use notify()" + ) + override def notifyAllDefault() = + throw new Exception( + "Please use lock-free structures and do not use notifyAll()" + ) + +abstract class ThreadState: + def locks: Seq[AnyRef] +trait CanContinueIfAcquiresLock extends ThreadState: + def lockToAquire: AnyRef +case object Start extends ThreadState: + def locks: Seq[AnyRef] = Seq.empty +case object End extends ThreadState: + def locks: Seq[AnyRef] = Seq.empty +case class Wait(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState +case class SyncUnique(lockToAquire: AnyRef, locks: Seq[AnyRef]) + extends ThreadState + with CanContinueIfAcquiresLock +case class Sync(lockToAquire: AnyRef, locks: Seq[AnyRef]) + extends ThreadState + with CanContinueIfAcquiresLock +case class Running(locks: Seq[AnyRef]) extends ThreadState +case class VariableReadWrite(locks: Seq[AnyRef]) extends ThreadState diff --git a/src/test/scala/concpar21final03/instrumentation/Scheduler.scala b/src/test/scala/concpar21final03/instrumentation/Scheduler.scala new file mode 100644 index 0000000000000000000000000000000000000000..9872b4f7793bb08d31efbecfd4ad68de2d367ece --- /dev/null +++ b/src/test/scala/concpar21final03/instrumentation/Scheduler.scala @@ -0,0 +1,314 @@ +package concpar21final03.instrumentation + +import java.util.concurrent.*; +import scala.concurrent.duration.* +import scala.collection.mutable.* +import Stats.* + +import java.util.concurrent.atomic.AtomicInteger + +sealed abstract class Result +case class RetVal(rets: List[Any]) extends Result +case class Except(msg: String, stackTrace: Array[StackTraceElement]) + extends Result +case class Timeout(msg: String) extends Result + +/** A class that maintains schedule and a set of thread ids. The schedules are + * advanced after an operation of a SchedulableBuffer is performed. Note: the + * real schedule that is executed may deviate from the input schedule due to + * the adjustments that had to be made for locks + */ +class Scheduler(sched: List[Int]): + val maxOps = + 500 // a limit on the maximum number of operations the code is allowed to perform + + private var schedule = sched + private var numThreads = 0 + private val realToFakeThreadId = Map[Long, Int]() + private val opLog = + ListBuffer[String]() // a mutable list (used for efficient concat) + private val threadStates = Map[Int, ThreadState]() + + /** Runs a set of operations in parallel as per the schedule. Each operation + * may consist of many primitive operations like reads or writes to shared + * data structure each of which should be executed using the function `exec`. + * @timeout + * in milliseconds + * @return + * true - all threads completed on time, false -some tests timed out. + */ + def runInParallel(timeout: Long, ops: List[() => Any]): Result = + numThreads = ops.length + val threadRes = Array.fill(numThreads) { None: Any } + var exception: Option[Except] = None + val syncObject = new Object() + var completed = new AtomicInteger(0) + // create threads + val threads = ops.zipWithIndex.map { case (op, i) => + new Thread( + new Runnable(): + def run(): Unit = + val fakeId = i + 1 + setThreadId(fakeId) + try + updateThreadState(Start) + val res = op() + updateThreadState(End) + threadRes(i) = res + // notify the master thread if all threads have completed + if completed.incrementAndGet() == ops.length then + syncObject.synchronized { syncObject.notifyAll() } + catch + case e: Throwable + if exception != None => // do nothing here and silently fail + case e: Throwable => + log(s"throw ${e.toString}") + exception = Some( + Except( + s"Thread $fakeId crashed on the following schedule: \n" + opLog + .mkString("\n"), + e.getStackTrace + ) + ) + syncObject.synchronized { syncObject.notifyAll() } + // println(s"$fakeId: ${e.toString}") + // Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads) + ) + } + // start all threads + threads.foreach(_.start()) + // wait for all threads to complete, or for an exception to be thrown, or for the time out to expire + var remTime = timeout + syncObject.synchronized { + timed { if completed.get() != ops.length then syncObject.wait(timeout) } { + time => remTime -= time + } + } + if exception.isDefined then exception.get + else if remTime <= 1 + then // timeout ? using 1 instead of zero to allow for some errors + Timeout(opLog.mkString("\n")) + else + // every thing executed normally + RetVal(threadRes.toList) + + // Updates the state of the current thread + def updateThreadState(state: ThreadState): Unit = + val tid = threadId + synchronized { + threadStates(tid) = state + } + state match + case Sync(lockToAquire, locks) => + if locks.indexOf(lockToAquire) < 0 then waitForTurn + else + // Re-aqcuiring the same lock + updateThreadState(Running(lockToAquire +: locks)) + case Start => waitStart() + case End => removeFromSchedule(tid) + case Running(_) => + case _ => waitForTurn // Wait, SyncUnique, VariableReadWrite + + def waitStart(): Unit = + // while (threadStates.size < numThreads) { + // Thread.sleep(1) + // } + synchronized { + if threadStates.size < numThreads then wait() + else notifyAll() + } + + def threadLocks = + synchronized { + threadStates(threadId).locks + } + + def threadState = + synchronized { + threadStates(threadId) + } + + def mapOtherStates(f: ThreadState => ThreadState) = + val exception = threadId + synchronized { + for k <- threadStates.keys if k != exception do + threadStates(k) = f(threadStates(k)) + } + + def log(str: String) = + if (realToFakeThreadId contains Thread.currentThread().getId()) then + val space = (" " * ((threadId - 1) * 2)) + val s = + space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + " ") + opLog += s + + /** Executes a read or write operation to a global data structure as per the + * given schedule + * @param msg + * a message corresponding to the operation that will be logged + */ + def exec[T]( + primop: => T + )(msg: => String, postMsg: => Option[T => String] = None): T = + if !(realToFakeThreadId contains Thread.currentThread().getId()) then primop + else + updateThreadState(VariableReadWrite(threadLocks)) + val m = msg + if m != "" then log(m) + if opLog.size > maxOps then + throw new Exception( + s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!" + ) + val res = primop + postMsg match + case Some(m) => log(m(res)) + case None => + res + + private def setThreadId(fakeId: Int) = synchronized { + realToFakeThreadId(Thread.currentThread.getId) = fakeId + } + + def threadId = + try realToFakeThreadId(Thread.currentThread().getId()) + catch + case e: NoSuchElementException => + throw new Exception( + "You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!" + ) + + private def isTurn(tid: Int) = synchronized { + (!schedule.isEmpty && schedule.head != tid) + } + + def canProceed(): Boolean = + val tid = threadId + canContinue match + case Some((i, state)) if i == tid => + // println(s"$tid: Runs ! Was in state $state") + canContinue = None + state match + case Sync(lockToAquire, locks) => + updateThreadState(Running(lockToAquire +: locks)) + case SyncUnique(lockToAquire, locks) => + mapOtherStates { + _ match + case SyncUnique(lockToAquire2, locks2) + if lockToAquire2 == lockToAquire => + Wait(lockToAquire2, locks2) + case e => e + } + updateThreadState(Running(lockToAquire +: locks)) + case VariableReadWrite(locks) => updateThreadState(Running(locks)) + true + case Some((i, state)) => + // println(s"$tid: not my turn but $i !") + false + case None => + false + + var threadPreference = + 0 // In the case the schedule is over, which thread should have the preference to execute. + + /** returns true if the thread can continue to execute, and false otherwise */ + def decide(): Option[(Int, ThreadState)] = + if !threadStates.isEmpty + then // The last thread who enters the decision loop takes the decision. + // println(s"$threadId: I'm taking a decision") + if threadStates.values.forall { + case e: Wait => true + case _ => false + } + then + val waiting = threadStates.keys.map(_.toString).mkString(", ") + val s = if threadStates.size > 1 then "s" else "" + val are = if threadStates.size > 1 then "are" else "is" + throw new Exception( + s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them." + ) + else + // Threads can be in Wait, Sync, SyncUnique, and VariableReadWrite mode. + // Let's determine which ones can continue. + val notFree = + threadStates.collect { case (id, state) => state.locks }.flatten.toSet + val threadsNotBlocked = threadStates.toSeq.filter { + case (id, v: VariableReadWrite) => true + case (id, v: CanContinueIfAcquiresLock) => + !notFree(v.lockToAquire) || (v.locks contains v.lockToAquire) + case _ => false + } + if threadsNotBlocked.isEmpty then + val waiting = threadStates.keys.map(_.toString).mkString(", ") + val s = if threadStates.size > 1 then "s" else "" + val are = if threadStates.size > 1 then "are" else "is" + val whoHasLock = threadStates.toSeq.flatMap { case (id, state) => + state.locks.map(lock => (lock, id)) + }.toMap + val reason = threadStates + .collect { + case (id, state: CanContinueIfAcquiresLock) + if !notFree(state.lockToAquire) => + s"Thread $id is waiting on lock ${state.lockToAquire} held by thread ${whoHasLock(state.lockToAquire)}" + } + .mkString("\n") + throw new Exception( + s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason" + ) + else if threadsNotBlocked.size == 1 + then // Do not consume the schedule if only one thread can execute. + Some(threadsNotBlocked(0)) + else + val next = schedule.indexWhere(t => + threadsNotBlocked.exists { case (id, state) => id == t } + ) + if next != -1 then + // println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}") + val chosenOne = schedule( + next + ) // TODO: Make schedule a mutable list. + schedule = schedule.take(next) ++ schedule.drop(next + 1) + Some((chosenOne, threadStates(chosenOne))) + else + threadPreference = (threadPreference + 1) % threadsNotBlocked.size + val chosenOne = threadsNotBlocked( + threadPreference + ) // Maybe another strategy + Some(chosenOne) + // threadsNotBlocked.indexOf(threadId) >= 0 + /* + val tnb = threadsNotBlocked.map(_._1).mkString(",") + val s = if (schedule.isEmpty) "empty" else schedule.mkString(",") + val only = if (schedule.isEmpty) "" else " only" + throw new Exception(s"The schedule is $s but$only threads ${tnb} can continue")*/ + else canContinue + + /** This will be called before a schedulable operation begins. This should not + * use synchronized + */ + var numThreadsWaiting = new AtomicInteger(0) + // var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice. + var canContinue: Option[(Int, ThreadState)] = + None // The result of the decision thread Id of the thread authorized to continue. + private def waitForTurn = + synchronized { + if numThreadsWaiting.incrementAndGet() == threadStates.size then + canContinue = decide() + notifyAll() + // waitingForDecision(threadId) = Some(numThreadsWaiting) + // println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}") + while !canProceed() do wait() + } + numThreadsWaiting.decrementAndGet() + + /** To be invoked when a thread is about to complete + */ + private def removeFromSchedule(fakeid: Int) = synchronized { + // println(s"$fakeid: I'm taking a decision because I finished") + schedule = schedule.filterNot(_ == fakeid) + threadStates -= fakeid + if numThreadsWaiting.get() == threadStates.size then + canContinue = decide() + notifyAll() + } + + def getOperationLog() = opLog diff --git a/src/test/scala/concpar21final03/instrumentation/Stats.scala b/src/test/scala/concpar21final03/instrumentation/Stats.scala new file mode 100644 index 0000000000000000000000000000000000000000..4ca8fd67fa41a7d22bb3eb0790666908667267aa --- /dev/null +++ b/src/test/scala/concpar21final03/instrumentation/Stats.scala @@ -0,0 +1,17 @@ +/* Copyright 2009-2015 EPFL, Lausanne */ +package concpar21final03.instrumentation + +import java.lang.management.* + +/** A collection of methods that can be used to collect run-time statistics */ +object Stats: + def timed[T](code: => T)(cont: Long => Unit): T = + var t1 = System.currentTimeMillis() + val r = code + cont((System.currentTimeMillis() - t1)) + r + + def withTime[T](code: => T): (T, Long) = + var t1 = System.currentTimeMillis() + val r = code + (r, (System.currentTimeMillis() - t1)) diff --git a/src/test/scala/concpar21final03/instrumentation/TestHelper.scala b/src/test/scala/concpar21final03/instrumentation/TestHelper.scala new file mode 100644 index 0000000000000000000000000000000000000000..987f4450fc267cc79c6c6cd5561d3450a0280645 --- /dev/null +++ b/src/test/scala/concpar21final03/instrumentation/TestHelper.scala @@ -0,0 +1,150 @@ +package concpar21final03.instrumentation + +import scala.util.Random +import scala.collection.mutable.{Map as MutableMap} + +import Stats.* + +object TestHelper: + val noOfSchedules = 10000 // set this to 100k during deployment + val readWritesPerThread = + 20 // maximum number of read/writes possible in one thread + val contextSwitchBound = 10 + val testTimeout = 150 // the total time out for a test in seconds + val schedTimeout = + 15 // the total time out for execution of a schedule in secs + + // Helpers + /*def testManySchedules(op1: => Any): Unit = testManySchedules(List(() => op1)) + def testManySchedules(op1: => Any, op2: => Any): Unit = testManySchedules(List(() => op1, () => op2)) + def testManySchedules(op1: => Any, op2: => Any, op3: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3)) + def testManySchedules(op1: => Any, op2: => Any, op3: => Any, op4: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3, () => op4))*/ + + def testSequential[T]( + ops: Scheduler => Any + )(assertions: T => (Boolean, String)) = + testManySchedules( + 1, + (sched: Scheduler) => + ( + List(() => ops(sched)), + (res: List[Any]) => assertions(res.head.asInstanceOf[T]) + ) + ) + + /** @numThreads + * number of threads + * @ops + * operations to be executed, one per thread + * @assertion + * as condition that will executed after all threads have completed + * (without exceptions) the arguments are the results of the threads + */ + def testManySchedules( + numThreads: Int, + ops: Scheduler => ( + List[() => Any], // Threads + List[Any] => (Boolean, String) + ) // Assertion + ) = + var timeout = testTimeout * 1000L + val threadIds = (1 to numThreads) + // (1 to scheduleLength).flatMap(_ => threadIds).toList.permutations.take(noOfSchedules).foreach { + val schedules = (new ScheduleGenerator(numThreads)).schedules() + var schedsExplored = 0 + schedules + .takeWhile(_ => schedsExplored <= noOfSchedules && timeout > 0) + .foreach { + // case _ if timeout <= 0 => // break + case schedule => + schedsExplored += 1 + val schedr = new Scheduler(schedule) + // println("Exploring Sched: "+schedule) + val (threadOps, assertion) = ops(schedr) + if threadOps.size != numThreads then + throw new IllegalStateException( + s"Number of threads: $numThreads, do not match operations of threads: $threadOps" + ) + timed { schedr.runInParallel(schedTimeout * 1000, threadOps) } { t => + timeout -= t + } match + case Timeout(msg) => + throw new java.lang.AssertionError( + "assertion failed\n" + "The schedule took too long to complete. A possible deadlock! \n" + msg + ) + case Except(msg, stkTrace) => + val traceStr = "Thread Stack trace: \n" + stkTrace + .map(" at " + _.toString) + .mkString("\n") + throw new java.lang.AssertionError( + "assertion failed\n" + msg + "\n" + traceStr + ) + case RetVal(threadRes) => + // check the assertion + val (success, custom_msg) = assertion(threadRes) + if !success then + val msg = + "The following schedule resulted in wrong results: \n" + custom_msg + "\n" + schedr + .getOperationLog() + .mkString("\n") + throw new java.lang.AssertionError("Assertion failed: " + msg) + } + if timeout <= 0 then + throw new java.lang.AssertionError( + "Test took too long to complete! Cannot check all schedules as your code is too slow!" + ) + + /** A schedule generator that is based on the context bound + */ + class ScheduleGenerator(numThreads: Int): + val scheduleLength = readWritesPerThread * numThreads + val rands = + (1 to scheduleLength).map(i => + new Random(0xcafe * i) + ) // random numbers for choosing a thread at each position + def schedules(): LazyList[List[Int]] = + var contextSwitches = 0 + var contexts = + List[Int]() // a stack of thread ids in the order of context-switches + val remainingOps = MutableMap[Int, Int]() + remainingOps ++= (1 to numThreads).map(i => + (i, readWritesPerThread) + ) // num ops remaining in each thread + val liveThreads = (1 to numThreads).toSeq.toBuffer + + /** Updates remainingOps and liveThreads once a thread is chosen for a + * position in the schedule + */ + def updateState(tid: Int): Unit = + val remOps = remainingOps(tid) + if remOps == 0 then liveThreads -= tid + else remainingOps += (tid -> (remOps - 1)) + val schedule = rands.foldLeft(List[Int]()) { + case (acc, r) if contextSwitches < contextSwitchBound => + val tid = liveThreads(r.nextInt(liveThreads.size)) + contexts match + case prev :: tail + if prev != tid => // we have a new context switch here + contexts +:= tid + contextSwitches += 1 + case prev :: tail => + case _ => // init case + contexts +:= tid + updateState(tid) + acc :+ tid + case ( + acc, + _ + ) => // here context-bound has been reached so complete the schedule without any more context switches + if !contexts.isEmpty then + contexts = contexts.dropWhile(remainingOps(_) == 0) + val tid = contexts match + case top :: tail => top + case _ => + liveThreads( + 0 + ) // here, there has to be threads that have not even started + updateState(tid) + acc :+ tid + } + schedule #:: schedules() diff --git a/src/test/scala/concpar21final03/instrumentation/TestUtils.scala b/src/test/scala/concpar21final03/instrumentation/TestUtils.scala new file mode 100644 index 0000000000000000000000000000000000000000..0c9534b81cd3123661c6767df378422fb8a0c959 --- /dev/null +++ b/src/test/scala/concpar21final03/instrumentation/TestUtils.scala @@ -0,0 +1,14 @@ +package concpar21final03.instrumentation + +import scala.concurrent.* +import scala.concurrent.duration.* +import scala.concurrent.ExecutionContext.Implicits.global + +object TestUtils: + def failsOrTimesOut[T](action: => T): Boolean = + val asyncAction = Future { + action + } + try Await.result(asyncAction, 2000.millisecond) + catch case _: Throwable => return true + return false diff --git a/src/test/scala/concpar21final03/overrides.scala b/src/test/scala/concpar21final03/overrides.scala new file mode 100644 index 0000000000000000000000000000000000000000..dd9c8b9a985d04ca38fe169e573a9aee0db86010 --- /dev/null +++ b/src/test/scala/concpar21final03/overrides.scala @@ -0,0 +1,100 @@ +package concpar21final03 + +import instrumentation.* + +class SchedulableThreadMap[A](val scheduler: Scheduler) + extends ThreadMap[A] + with MockedMonitor: + + override def currentThreadHasValue: Boolean = scheduler.exec { + super.currentThreadHasValue + }("", Some(res => s"currentThreadHasValue is $res")) + + override def currentThreadValue: Option[A] = scheduler.exec { + super.currentThreadValue + }("", Some(res => s"currentThreadValue is $res")) + + override def setCurrentThreadValue(value: A): Unit = scheduler.exec { + super.setCurrentThreadValue(value) + }(s"setCurrentThreadValue($value)") + + override def deleteCurrentThreadValue(): Unit = scheduler.exec { + super.deleteCurrentThreadValue() + }("deleteCurrentThreadValue()") + + override def waitForall(predicate: A => Boolean): Unit = scheduler.exec { + super.waitForall(predicate) + }("waitForall") + + def allValues: List[A] = synchronized { + theMap.values.toList + } + +end SchedulableThreadMap + +class SchedulableRCU(scheduler: Scheduler) extends RCU with LockFreeMonitor: + override protected val latestVersion = + SchedulableAtomicLong(0, scheduler, "latestVersion") + override protected val readersVersion: ThreadMap[Long] = SchedulableThreadMap( + scheduler + ) + +class SchedulableInMemoryFileSystem(scheduler: Scheduler) + extends InMemoryFileSystem: + override def createFile(file: FileName, content: String): Unit = + scheduler.exec { + super.createFile(file, content) + }(s"createFile($file)") + override def readFile(file: FileName): String = scheduler.exec { + super.readFile(file) + }(s"readFile($file)") + override def deleteFile(file: FileName): Unit = scheduler.exec { + super.deleteFile(file) + }(s"deleteFile($file)") + +class SchedulableUpdateServer(scheduler: Scheduler, fs: InMemoryFileSystem) + extends UpdateServer(fs) + with LockFreeMonitor: + override val rcu = SchedulableRCU(scheduler) + +class SchedulableAtomicLong(initial: Long, scheduler: Scheduler, name: String) + extends AtomicLong(initial): + + override def get: Long = scheduler.exec { + super.get + }(s"", Some(res => s"$name: get $res")) + + override def set(value: Long): Unit = scheduler.exec { + super.set(value) + }(s"$name: set $value", None) + + override def incrementAndGet(): Long = scheduler.exec { + super.incrementAndGet() + }(s"", Some(res => s"$name: incrementAndGet $res")) + + override def getAndIncrement(): Long = scheduler.exec { + super.getAndIncrement() + }(s"", Some(res => s"$name: getandIncrement $res")) + + override def compareAndSet(expected: Long, newValue: Long): Boolean = + scheduler.exec { + super.compareAndSet(expected, newValue) + }( + s"$name: compareAndSet(expected = $expected, newValue = $newValue)", + Some(res => s"$name: Did it set? $res") + ) + +end SchedulableAtomicLong + +class SchedulableAtomicReference[T]( + initial: T, + scheduler: Scheduler, + name: String +) extends AtomicReference(initial): + override def get: T = scheduler.exec { + super.get + }(s"", Some(res => s"$name: get $res")) + + override def set(value: T): Unit = scheduler.exec { + super.set(value) + }(s"$name: set $value", None) diff --git a/src/test/scala/concpar22final01/Problem1Suite.scala b/src/test/scala/concpar22final01/Problem1Suite.scala new file mode 100644 index 0000000000000000000000000000000000000000..3bc2ce8a205863c1ea1bc3d230ca899adaa6159f --- /dev/null +++ b/src/test/scala/concpar22final01/Problem1Suite.scala @@ -0,0 +1,537 @@ +package concpar22final01 + +import java.util.concurrent.* +import scala.util.DynamicVariable + +class Problem1Suite extends AbstractProblem1Suite: + + test("[Public] fetch simple result without combining (2pts)") { + val combiner1 = new DLLCombinerTest + combiner1 += 7 + combiner1 += 2 + combiner1 += 3 + combiner1 += 8 + combiner1 += 1 + combiner1 += 2 + combiner1 += 3 + combiner1 += 8 + + val result = combiner1.result() + val array = Array(7, 2, 3, 8, 1, 2, 3, 8) + + assert(Range(0, array.size).forall(i => array(i) == result(i))) + } + + test("[Public] fetch result without combining (2pts)") { + val combiner1 = new DLLCombinerTest + combiner1 += 7 + combiner1 += 2 + combiner1 += 3 + combiner1 += 8 + combiner1 += 1 + + val result = combiner1.result() + val array = Array(7, 2, 3, 8, 1) + + assert(Range(0, array.size).forall(i => array(i) == result(i))) + } + + test("[Public] fetch result after simple combining (2pts)") { + val combiner1 = new DLLCombinerTest + combiner1 += 7 + combiner1 += 2 + + val combiner2 = new DLLCombinerTest + combiner2 += 3 + combiner2 += 8 + + val combiner3 = new DLLCombinerTest + combiner3 += 1 + combiner3 += 9 + + val combiner4 = new DLLCombinerTest + combiner4 += 3 + combiner4 += 2 + + val result = combiner1.combine(combiner2).combine(combiner3).combine( + combiner4 + ).result() + val array = Array(7, 2, 3, 8, 1, 9, 3, 2) + + assert(Range(0, array.size).forall(i => array(i) == result(i))) + } + + test("[Public] fetch result - small combiner (2pts)") { + val combiner1 = new DLLCombinerTest + combiner1 += 4 + combiner1 += 2 + combiner1 += 6 + + val result = combiner1.result() + val array = Array(4, 2, 6) + + assert(Range(0, array.size).forall(i => array(i) == result(i))) + } + + // (25+) 15 / 250 points for correct implementation, don't check parallelism + test("[Correctness] fetch result - simple combiners (2pts)") { + assertCorrectnessSimple() + } + + test("[Correctness] fetch result - small combiners (3pts)") { + assertCorrectnessBasic() + } + + test("[Correctness] fetch result - small combiners after combining (5pts)") { + assertCorrectnessCombined() + } + + test("[Correctness] fetch result - large combiners (5pts)") { + assertCorrectnessLarge() + } + + def assertCorrectnessSimple() = + simpleCombiners.foreach(elem => assert(compare(elem._1, elem._2))) + + def assertCorrectnessBasic() = + basicCombiners.foreach(elem => assert(compare(elem._1, elem._2))) + + def assertCorrectnessCombined() = + combinedCombiners.foreach(elem => assert(compare(elem._1, elem._2))) + + def assertCorrectnessLarge() = + largeCombiners.foreach(elem => assert(compare(elem._1, elem._2))) + + // (25+15+) 25 / 250 points for correct parallel implementation, don't check if it's exactly 1/4 of the array per task + private var count = 0 + private val expected = 3 + + override def task[T](body: => T): ForkJoinTask[T] = + count += 1 + scheduler.value.schedule(body) + + test("[TaskCount] number of newly created tasks should be 3 (5pts)") { + assertTaskCountSimple() + } + + test( + "[TaskCount] fetch result and check parallel - simple combiners (5pts)" + ) { + assertTaskCountSimple() + assertCorrectnessSimple() + } + + test("[TaskCount] fetch result and check parallel - small combiners (5pts)") { + assertTaskCountSimple() + assertCorrectnessBasic() + } + + test( + "[TaskCount] fetch result and check parallel - small combiners after combining (5pts)" + ) { + assertTaskCountSimple() + assertCorrectnessCombined() + } + + test("[TaskCount] fetch result and check parallel - large combiners (5pts)") { + assertTaskCountSimple() + assertCorrectnessLarge() + } + + def assertTaskCountSimple(): Unit = + simpleCombiners.foreach(elem => assertTaskCount(elem._1, elem._2)) + + def assertTaskCount(combiner: DLLCombinerTest, array: Array[Int]): Unit = + try + count = 0 + build(combiner, array) + combiner.result() + assertEquals( + count, + expected, { + s"ERROR: Expected $expected instead of $count calls to `task(...)`" + } + ) + finally count = 0 + + // (25+15+25+) 50 / 250 points for correct implementation that uses only next2 and previous2, and not next and previous + test( + "[Skip2] fetch parallel result and check skip2 - simple combiners (10pts)" + ) { + assertTaskCountSimple() + assertSkipSimple() + assertCorrectnessSimple() + } + + test("[Skip2] fetch result and check skip2 - simple combiners (10pts)") { + assertSkipSimple() + assertCorrectnessSimple() + } + + test("[Skip2] fetch result and check skip2 - small combiners (10pts)") { + assertSkipSimple() + assertCorrectnessBasic() + } + + test( + "[Skip2] fetch result and check skip2 - small combiners after combining (10pts)" + ) { + assertSkipSimple() + assertCorrectnessCombined() + } + + test("[Skip2] fetch result and check skip2 - large combiners (10pts)") { + assertSkipSimple() + assertCorrectnessLarge() + } + + def assertSkipSimple(): Unit = + simpleCombiners.foreach(elem => assertSkip(elem._1, elem._2)) + + def assertSkip(combiner: DLLCombinerTest, array: Array[Int]): Unit = + build(combiner, array) + combiner.result() + assertEquals( + combiner.nonSkipped, + false, { + s"ERROR: Calls to 'next' and 'previous' are not allowed! You should only use 'next2` and 'previous2' in your solution." + } + ) + + // (25+15+25+50+) 75 / 250 points for correct parallel implementation, exactly 1/4 of the array per task + test("[TaskFairness] each task should compute 1/4 of the result (15pts)") { + assertTaskFairness(simpleCombiners.unzip._1) + } + + test( + "[TaskFairness] each task should correctly compute 1/4 of the result - simple combiners (15pts)" + ) { + assertTaskFairness(simpleCombiners.unzip._1) + assertCorrectnessSimple() + } + + test( + "[TaskFairness] each task should correctly compute 1/4 of the result - small combiners (15pts)" + ) { + assertTaskFairness(basicCombiners.unzip._1) + assertCorrectnessBasic() + } + + test( + "[TaskFairness] each task should correctly compute 1/4 of the result - small combiners after combining (15pts)" + ) { + assertTaskFairness(combinedCombiners.unzip._1) + assertCorrectnessCombined() + } + + test( + "[TaskFairness] each task should correctly compute 1/4 of the result - large combiners (15pts)" + ) { + assertTaskFairness(largeCombiners.unzip._1) + assertCorrectnessLarge() + } + + def assertTaskFairness(combiners: List[DLLCombiner]): Unit = + def assertNewTaskFairness( + combiner: DLLCombiner, + task: ForkJoinTask[Unit], + data: Array[Int] + ) = + var count = 0 + var expected = combiner.size / 4 + task.join + count = data.count(elem => elem != 0) + assert((count - expected).abs <= 1) + + def assertMainTaskFairness( + combiner: DLLCombiner, + task: Unit, + data: Array[Int] + ) = + var count = 0 + var expected = combiner.size / 4 + count = data.count(elem => elem != 0) + assert((count - expected).abs <= 1) + + combiners.foreach { elem => + var data = Array.fill(elem.size)(0) + assertNewTaskFairness(elem, elem.task1(data), data) + + data = Array.fill(elem.size)(0) + assertNewTaskFairness(elem, elem.task2(data), data) + + data = Array.fill(elem.size)(0) + assertNewTaskFairness(elem, elem.task3(data), data) + + data = Array.fill(elem.size)(0) + assertMainTaskFairness(elem, elem.task4(data), data) + } + + // (25+15+25+50+75+) 60 / 250 points for correct parallel implementation, exactly 1/4 of the array per task, exactly the specified quarter + + test( + "[TaskPrecision] each task should compute specified 1/4 of the result - simple combiners (10pts)" + ) { + assertTaskPrecision(simpleCombiners) + } + + test( + "[TaskPrecision] task1 should compute specified 1/4 of the result - simple combiners (5pts)" + ) { + assertTaskPrecision1(simpleCombiners) + } + + test( + "[TaskPrecision] task2 should compute specified 1/4 of the result - simple combiners (5pts)" + ) { + assertTaskPrecision2(simpleCombiners) + } + + test( + "[TaskPrecision] task3 should compute specified 1/4 of the result - simple combiners (5pts)" + ) { + assertTaskPrecision3(simpleCombiners) + } + + test( + "[TaskPrecision] task4 should compute specified 1/4 of the result - simple combiners (5pts)" + ) { + assertTaskPrecision4(simpleCombiners) + } + + test( + "[TaskPrecision] each task should compute specified 1/4 of the result - other combiners (30pts)" + ) { + assertTaskPrecision(basicCombiners) + assertTaskPrecision(combinedCombiners) + assertTaskPrecision(largeCombiners) + } + + def assertTaskPrecision(combiners: List[(DLLCombiner, Array[Int])]): Unit = + assertTaskPrecision1(combiners) + assertTaskPrecision2(combiners) + assertTaskPrecision3(combiners) + assertTaskPrecision4(combiners) + + def assertTaskPrecision1(combiners: List[(DLLCombiner, Array[Int])]): Unit = + combiners.foreach { elem => + var data = Array.fill(elem._1.size)(0) + var ref = Array.fill(elem._1.size)(0) + val task1 = elem._1.task1(data) + task1.join + Range(0, elem._1.size).foreach(i => + (if i < elem._1.size / 2 - 1 && i % 2 == 0 then ref(i) = elem._2(i)) + ) + assert(Range(0, elem._1.size / 2 - 1).forall(i => data(i) == ref(i))) + } + + def assertTaskPrecision2(combiners: List[(DLLCombiner, Array[Int])]): Unit = + combiners.foreach { elem => + var data = Array.fill(elem._1.size)(0) + var ref = Array.fill(elem._1.size)(0) + val task2 = elem._1.task2(data) + task2.join + Range(0, elem._1.size).foreach(i => + (if i < elem._1.size / 2 - 1 && i % 2 == 1 then ref(i) = elem._2(i)) + ) + assert(Range(0, elem._1.size / 2 - 1).forall(i => data(i) == ref(i))) + } + + def assertTaskPrecision3(combiners: List[(DLLCombiner, Array[Int])]): Unit = + combiners.foreach { elem => + var data = Array.fill(elem._1.size)(0) + var ref = Array.fill(elem._1.size)(0) + val task3 = elem._1.task3(data) + task3.join + Range(0, elem._1.size).foreach(i => + (if i > elem._1.size / 2 + 1 && i % 2 == elem._1.size % 2 then + ref(i) = elem._2(i)) + ) + assert(Range(elem._1.size / 2 + 2, elem._1.size).forall(i => + data(i) == ref(i) + )) + } + + def assertTaskPrecision4(combiners: List[(DLLCombiner, Array[Int])]): Unit = + combiners.foreach { elem => + var data = Array.fill(elem._1.size)(0) + var ref = Array.fill(elem._1.size)(0) + val task4 = elem._1.task4(data) + Range(0, elem._1.size).foreach(i => + (if i > elem._1.size / 2 + 1 && i % 2 != elem._1.size % 2 then + ref(i) = elem._2(i)) + ) + assert(Range(elem._1.size / 2 + 2, elem._1.size).forall(i => + data(i) == ref(i) + )) + } + +trait AbstractProblem1Suite extends munit.FunSuite with LibImpl: + + def simpleCombiners = buildSimpleCombiners() + def basicCombiners = buildBasicCombiners() + def combinedCombiners = buildCombinedCombiners() + def largeCombiners = buildLargeCombiners() + + def buildSimpleCombiners() = + val simpleCombiners = List( + ( + new DLLCombinerTest, + Array(4, 2, 6, 1, 5, 4, 3, 5, 6, 3, 4, 5, 6, 3, 4, 5) + ), + ( + new DLLCombinerTest, + Array(7, 2, 2, 9, 3, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 2) + ), + (new DLLCombinerTest, Array.fill(16)(5)) + ) + simpleCombiners.foreach(elem => build(elem._1, elem._2)) + simpleCombiners + + def buildBasicCombiners() = + val basicCombiners = List( + (new DLLCombinerTest, Array(4, 2, 6)), + (new DLLCombinerTest, Array(4, 1, 6)), + ( + new DLLCombinerTest, + Array(7, 2, 2, 9, 3, 2, 11, 12, 5, 14, 15, 1, 17, 23) + ), + ( + new DLLCombinerTest, + Array(7, 2, 9, 9, 3, 2, 11, 12, 13, 14, 15, 16, 17, 22) + ), + (new DLLCombinerTest, Array.fill(16)(7)), + (new DLLCombinerTest, Array.fill(16)(4)), + (new DLLCombinerTest, Array.fill(5)(3)), + (new DLLCombinerTest, Array.fill(5)(7)), + (new DLLCombinerTest, Array.fill(5)(4)) + ) + basicCombiners.foreach(elem => build(elem._1, elem._2)) + basicCombiners + + def buildCombinedCombiners() = + var combinedCombiners = List[(DLLCombiner, Array[Int])]() + + Range(1, 10).foreach { n => + val array = basicCombiners.filter(elem => elem._1.size == n).foldLeft( + Array[Int]() + ) { + (acc, i) => acc ++ i._2 + } + val empty: DLLCombiner = new DLLCombinerTest + val combiner = basicCombiners.filter(elem => elem._1.size == n).map( + _._1 + ).foldLeft(empty) { + (acc, c) => acc.combine(c) + } + + combinedCombiners = combinedCombiners :+ (combiner, array) + } + combinedCombiners + + def buildLargeCombiners() = + val largeCombiners = List( + (new DLLCombinerTest, Array.fill(1321)(4) ++ Array.fill(1322)(7)), + (new DLLCombinerTest, Array.fill(1341)(2) ++ Array.fill(1122)(5)), + ( + new DLLCombinerTest, + Array.fill(1321)(4) ++ Array.fill(1322)(7) ++ Array.fill(321)( + 4 + ) ++ Array.fill(322)(7) + ), + (new DLLCombinerTest, Array.fill(992321)(4) ++ Array.fill(99322)(7)), + (new DLLCombinerTest, Array.fill(953211)(4) ++ Array.fill(999322)(1)) + ) + largeCombiners.foreach(elem => build(elem._1, elem._2)) + largeCombiners + + def build(combiner: DLLCombinerTest, array: Array[Int]): DLLCombinerTest = + array.foreach(elem => combiner += elem) + combiner + + def compare(combiner: DLLCombiner, array: Array[Int]): Boolean = + val result = combiner.result() + Range(0, array.size).forall(i => array(i) == result(i)) + + def buildAndCompare(combiner: DLLCombinerTest, array: Array[Int]): Boolean = + array.foreach(elem => combiner += elem) + val result = combiner.result() + Range(0, array.size).forall(i => array(i) == result(i)) + +trait LibImpl extends Problem1: + + val forkJoinPool = new ForkJoinPool + + abstract class TaskScheduler: + def schedule[T](body: => T): ForkJoinTask[T] + + class DefaultTaskScheduler extends TaskScheduler: + def schedule[T](body: => T): ForkJoinTask[T] = + val t = new RecursiveTask[T]: + def compute = body + Thread.currentThread match + case wt: ForkJoinWorkerThread => + t.fork() + case _ => + forkJoinPool.execute(t) + t + + val scheduler = new DynamicVariable[TaskScheduler](new DefaultTaskScheduler) + + def task[T](body: => T): ForkJoinTask[T] = scheduler.value.schedule(body) + + class NodeTest(val v: Int, val myCombiner: DLLCombinerTest) extends Node(v): + override def getNext: Node = + myCombiner.nonSkipped = true + next + override def getNext2: Node = next2 + override def getPrevious: Node = + myCombiner.nonSkipped = true + previous + override def getPrevious2: Node = previous2 + override def setNext(n: Node): Unit = next = n + override def setNext2(n: Node): Unit = next2 = n + override def setPrevious(n: Node): Unit = previous = n + override def setPrevious2(n: Node): Unit = previous2 = n + + class DLLCombinerTest extends DLLCombinerImplementation: + var nonSkipped = false + override def result(): Array[Int] = + nonSkipped = false + super.result() + override def +=(elem: Int): Unit = + val node = new NodeTest(elem, this) + if size == 0 then + first = node + last = node + size = 1 + else + last.setNext(node) + node.setPrevious(last) + node.setPrevious2(last.getPrevious) + if size > 1 then last.getPrevious.setNext2(node) + else second = node + secondToLast = last + last = node + size += 1 + override def combine(that: DLLCombiner): DLLCombiner = + if this.size == 0 then that + else if that.size == 0 then this + else + this.last.setNext(that.first) + this.last.setNext2(that.first.getNext) + if this.last.getPrevious != null then + this.last.getPrevious.setNext2(that.first) // important + + that.first.setPrevious(this.last) + that.first.setPrevious2(this.last.getPrevious) + if that.first.getNext != null then + that.first.getNext.setPrevious2(this.last) // important + + if this.size == 1 then second = that.first + + this.size = this.size + that.size + this.last = that.last + this.secondToLast = that.secondToLast + + this diff --git a/src/test/scala/concpar22final02/Problem2Suite.scala b/src/test/scala/concpar22final02/Problem2Suite.scala new file mode 100644 index 0000000000000000000000000000000000000000..59248e41e05ba07d9ed18e5edc820a55e613ee6b --- /dev/null +++ b/src/test/scala/concpar22final02/Problem2Suite.scala @@ -0,0 +1,429 @@ +package concpar22final02 + +import scala.concurrent.* +import scala.concurrent.duration.* +import scala.collection.mutable.HashMap +import scala.util.Random +import instrumentation.SchedulableProblem2 + +import instrumentation.TestHelper.* +import instrumentation.TestUtils.* +import scala.collection.mutable.ArrayBuffer + +class Problem2Suite extends munit.FunSuite: + + val imageSize = 5 + val nThreads = 3 + + def rowsForThread(threadNumber: Int): Array[Int] = + val start: Int = (imageSize * threadNumber) / nThreads + val end: Int = (imageSize * (threadNumber + 1)) / nThreads + (start until end).toArray + + test("Should work when barrier is called by a single thread (10pts)") { + testManySchedules( + 1, + sched => + val temp = new Problem2(imageSize, 1, 1) + ( + List(() => temp.barrier(0).countDown()), + results => + if sched.notifyCount == 0 && sched.notifyAllCount == 0 then + val notifyCount = sched.notifyCount + val notifyAllCount = sched.notifyAllCount + (false, s"No notify call $notifyCount $notifyAllCount") + else if temp.barrier(0).count != 0 then + val count = temp.barrier(0).count + (false, s"Barrier count not equal to zero: $count") + else (true, "") + ) + ) + } + + test("Should work when a single thread processes a single filter (10pts)") { + val temp = new Problem2(imageSize, 1, 1) + val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer() + for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i) + temp.imageLib.init(buf) + temp.imagePipeline( + Array(temp.imageLib.Filter.Outline), + Array(0, 1, 2, 3, 4) + ) + assertEquals( + temp.imageLib.buffer1, + ArrayBuffer( + ArrayBuffer(0, 0, 0, 0, 0), + ArrayBuffer(1, 1, 1, 1, 1), + ArrayBuffer(2, 2, 2, 2, 2), + ArrayBuffer(3, 3, 3, 3, 3), + ArrayBuffer(4, 4, 4, 4, 4) + ) + ) + assertEquals( + temp.imageLib.buffer2, + ArrayBuffer( + ArrayBuffer(-2, -3, -3, -3, -2), + ArrayBuffer(3, 0, 0, 0, 3), + ArrayBuffer(6, 0, 0, 0, 6), + ArrayBuffer(9, 0, 0, 0, 9), + ArrayBuffer(22, 15, 15, 15, 22) + ) + ) + } + + test("Should work when a single thread processes a 2 same filters (15pts)") { + val temp = new Problem2(imageSize, 1, 2) + val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer() + for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i) + temp.imageLib.init(buf) + temp.imagePipeline( + Array(temp.imageLib.Filter.Identity, temp.imageLib.Filter.Identity), + Array(0, 1, 2, 3, 4) + ) + assertEquals( + temp.imageLib.buffer1, + ArrayBuffer( + ArrayBuffer(0, 0, 0, 0, 0), + ArrayBuffer(1, 1, 1, 1, 1), + ArrayBuffer(2, 2, 2, 2, 2), + ArrayBuffer(3, 3, 3, 3, 3), + ArrayBuffer(4, 4, 4, 4, 4) + ) + ) + assertEquals( + temp.imageLib.buffer2, + ArrayBuffer( + ArrayBuffer(0, 0, 0, 0, 0), + ArrayBuffer(1, 1, 1, 1, 1), + ArrayBuffer(2, 2, 2, 2, 2), + ArrayBuffer(3, 3, 3, 3, 3), + ArrayBuffer(4, 4, 4, 4, 4) + ) + ) + } + + test( + "Should work when a single thread processes a 2 different filters (15pts)" + ) { + val temp = new Problem2(imageSize, 1, 2) + val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer() + for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i) + temp.imageLib.init(buf) + temp.imagePipeline( + Array(temp.imageLib.Filter.Identity, temp.imageLib.Filter.Outline), + Array(0, 1, 2, 3, 4) + ) + assertEquals( + temp.imageLib.buffer1, + ArrayBuffer( + ArrayBuffer(-2, -3, -3, -3, -2), + ArrayBuffer(3, 0, 0, 0, 3), + ArrayBuffer(6, 0, 0, 0, 6), + ArrayBuffer(9, 0, 0, 0, 9), + ArrayBuffer(22, 15, 15, 15, 22) + ) + ) + assertEquals( + temp.imageLib.buffer2, + ArrayBuffer( + ArrayBuffer(0, 0, 0, 0, 0), + ArrayBuffer(1, 1, 1, 1, 1), + ArrayBuffer(2, 2, 2, 2, 2), + ArrayBuffer(3, 3, 3, 3, 3), + ArrayBuffer(4, 4, 4, 4, 4) + ) + ) + } + + test("Should work when barrier is called by two threads (25pts)") { + testManySchedules( + 2, + sched => + val temp = new Problem2(imageSize, 2, 1) + ( + List( + () => + temp.barrier(0).countDown() + temp.barrier(0).awaitZero() + , + () => + temp.barrier(0).countDown() + temp.barrier(0).awaitZero() + ), + results => + if sched.notifyCount == 0 && sched.notifyAllCount == 0 then + (false, s"No notify call") + else if sched.waitCount == 0 then (false, s"No wait call") + else if temp.barrier(0).count != 0 then + val count = temp.barrier(0).count + (false, s"Barrier count not equal to zero: $count") + else (true, "") + ) + ) + } + + test("Should work when barrier is called by multiple threads (25pts)") { + testManySchedules( + nThreads, + sched => + val temp = new Problem2(imageSize, nThreads, 1) + ( + (for i <- 0 until nThreads yield () => + temp.barrier(0).countDown() + temp.barrier(0).awaitZero() + ).toList, + results => + if sched.notifyCount == 0 && sched.notifyAllCount == 0 then + (false, s"No notify call") + else if sched.waitCount == 0 then (false, s"No wait call") + else if temp.barrier(0).count != 0 then + val count = temp.barrier(0).count + (false, s"Barrier count not equal to zero: $count") + else (true, "") + ) + ) + } + + test( + "Should work when a single thread processes a multiple same filters (25pts)" + ) { + val temp = new Problem2(imageSize, 1, 3) + val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer() + for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i) + temp.imageLib.init(buf) + temp.imagePipeline( + Array( + temp.imageLib.Filter.Outline, + temp.imageLib.Filter.Outline, + temp.imageLib.Filter.Outline + ), + Array(0, 1, 2, 3, 4) + ) + assertEquals( + temp.imageLib.buffer2, + ArrayBuffer( + ArrayBuffer(-128, -173, -107, -173, -128), + ArrayBuffer(205, -2, 172, -2, 205), + ArrayBuffer(322, -128, 208, -128, 322), + ArrayBuffer(55, -854, -428, -854, 55), + ArrayBuffer(1180, 433, 751, 433, 1180) + ) + ) + assertEquals( + temp.imageLib.buffer1, + ArrayBuffer( + ArrayBuffer(-16, -22, -18, -22, -16), + ArrayBuffer(23, -1, 9, -1, 23), + ArrayBuffer(36, -18, 0, -18, 36), + ArrayBuffer(29, -67, -45, -67, 29), + ArrayBuffer(152, 74, 90, 74, 152) + ) + ) + } + + test("Should work when a single thread processes multiple filters (25pts)") { + val temp = new Problem2(imageSize, 1, 3) + val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer() + for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i) + temp.imageLib.init(buf) + temp.imagePipeline( + Array( + temp.imageLib.Filter.Identity, + temp.imageLib.Filter.Outline, + temp.imageLib.Filter.Sharpen + ), + Array(0, 1, 2, 3, 4) + ) + assertEquals( + temp.imageLib.buffer1, + ArrayBuffer( + ArrayBuffer(-2, -3, -3, -3, -2), + ArrayBuffer(3, 0, 0, 0, 3), + ArrayBuffer(6, 0, 0, 0, 6), + ArrayBuffer(9, 0, 0, 0, 9), + ArrayBuffer(22, 15, 15, 15, 22) + ) + ) + assertEquals( + temp.imageLib.buffer2, + ArrayBuffer( + ArrayBuffer(-10, -10, -9, -10, -10), + ArrayBuffer(11, 0, 3, 0, 11), + ArrayBuffer(18, -6, 0, -6, 18), + ArrayBuffer(17, -24, -15, -24, 17), + ArrayBuffer(86, 38, 45, 38, 86) + ) + ) + } + + test("Should work when multiple thread processes a single filter (25pts)") { + testManySchedules( + nThreads, + sched => + val temp = new SchedulableProblem2(sched, imageSize, nThreads, 1) + ( + (for i <- 0 until nThreads + yield () => + temp.imagePipeline( + Array(temp.imageLib.Filter.Outline), + rowsForThread(i) + )).toList, + results => + val expected_buffer1 = ArrayBuffer( + ArrayBuffer(1, 1, 1, 1, 1), + ArrayBuffer(1, 1, 1, 1, 1), + ArrayBuffer(1, 1, 1, 1, 1), + ArrayBuffer(1, 1, 1, 1, 1), + ArrayBuffer(1, 1, 1, 1, 1) + ) + val expected_buffer2 = ArrayBuffer( + ArrayBuffer(5, 3, 3, 3, 5), + ArrayBuffer(3, 0, 0, 0, 3), + ArrayBuffer(3, 0, 0, 0, 3), + ArrayBuffer(3, 0, 0, 0, 3), + ArrayBuffer(5, 3, 3, 3, 5) + ) + val res_buffer1 = temp.imageLib.buffer1 + val res_buffer2 = temp.imageLib.buffer2 + if res_buffer1 != expected_buffer1 then + (false, s"Buffer1 expected: $expected_buffer1 , got $res_buffer1") + else if res_buffer2 != expected_buffer2 then + (false, s"Buffer2 expected: $expected_buffer2 , got $res_buffer2") + else (true, "") + ) + ) + } + + test("Should work when multiple thread processes two filters (25pts)") { + testManySchedules( + nThreads, + sched => + val temp = new SchedulableProblem2(sched, imageSize, nThreads, 2) + ( + (for i <- 0 until nThreads + yield () => + temp.imagePipeline( + Array(temp.imageLib.Filter.Outline, temp.imageLib.Filter.Sharpen), + rowsForThread(i) + )).toList, + results => + val expected_buffer1 = ArrayBuffer( + ArrayBuffer(19, 7, 9, 7, 19), + ArrayBuffer(7, -6, -3, -6, 7), + ArrayBuffer(9, -3, 0, -3, 9), + ArrayBuffer(7, -6, -3, -6, 7), + ArrayBuffer(19, 7, 9, 7, 19) + ) + val expected_buffer2 = ArrayBuffer( + ArrayBuffer(5, 3, 3, 3, 5), + ArrayBuffer(3, 0, 0, 0, 3), + ArrayBuffer(3, 0, 0, 0, 3), + ArrayBuffer(3, 0, 0, 0, 3), + ArrayBuffer(5, 3, 3, 3, 5) + ) + val res_buffer1 = temp.imageLib.buffer1 + val res_buffer2 = temp.imageLib.buffer2 + if res_buffer1 != expected_buffer1 then + (false, s"Buffer1 expected: $expected_buffer1 , got $res_buffer1") + else if res_buffer2 != expected_buffer2 then + (false, s"Buffer2 expected: $expected_buffer2 , got $res_buffer2") + else (true, "") + ) + ) + } + + test( + "Should work when multiple thread processes multiple same filters (25pts)" + ) { + testManySchedules( + nThreads, + sched => + val temp = new SchedulableProblem2(sched, imageSize, nThreads, 4) + val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer() + for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i) + temp.imageLib.init(buf) + ( + (for i <- 0 until nThreads + yield () => + temp.imagePipeline( + Array( + temp.imageLib.Filter.Identity, + temp.imageLib.Filter.Identity, + temp.imageLib.Filter.Identity, + temp.imageLib.Filter.Identity + ), + rowsForThread(i) + )).toList, + results => + val expected_buffer1 = ArrayBuffer( + ArrayBuffer(0, 0, 0, 0, 0), + ArrayBuffer(1, 1, 1, 1, 1), + ArrayBuffer(2, 2, 2, 2, 2), + ArrayBuffer(3, 3, 3, 3, 3), + ArrayBuffer(4, 4, 4, 4, 4) + ) + val expected_buffer2 = ArrayBuffer( + ArrayBuffer(0, 0, 0, 0, 0), + ArrayBuffer(1, 1, 1, 1, 1), + ArrayBuffer(2, 2, 2, 2, 2), + ArrayBuffer(3, 3, 3, 3, 3), + ArrayBuffer(4, 4, 4, 4, 4) + ) + val res_buffer1 = temp.imageLib.buffer1 + val res_buffer2 = temp.imageLib.buffer2 + if res_buffer1 != expected_buffer1 then + (false, s"Buffer1 expected: $expected_buffer1 , got $res_buffer1") + else if res_buffer2 != expected_buffer2 then + (false, s"Buffer2 expected: $expected_buffer2 , got $res_buffer2") + else (true, "") + ) + ) + } + + test( + "Should work when multiple thread processes multiple different filters (25pts)" + ) { + testManySchedules( + nThreads, + sched => + val temp = new SchedulableProblem2(sched, imageSize, nThreads, 4) + val buf: ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer() + for i: Int <- 0 until imageSize do buf += ArrayBuffer.fill(5)(i) + temp.imageLib.init(buf) + ( + (for i <- 0 until nThreads + yield () => + temp.imagePipeline( + Array( + temp.imageLib.Filter.Outline, + temp.imageLib.Filter.Sharpen, + temp.imageLib.Filter.Identity, + temp.imageLib.Filter.Sharpen + ), + rowsForThread(i) + )).toList, + results => + val expected_buffer1 = ArrayBuffer( + ArrayBuffer(-51, -31, -28, -31, -51), + ArrayBuffer(47, 2, 24, 2, 47), + ArrayBuffer(68, -24, 24, -24, 68), + ArrayBuffer(5, -154, -72, -154, 5), + ArrayBuffer(375, 83, 164, 83, 375) + ) + val expected_buffer2 = ArrayBuffer( + ArrayBuffer(-10, -10, -9, -10, -10), + ArrayBuffer(11, 0, 3, 0, 11), + ArrayBuffer(18, -6, 0, -6, 18), + ArrayBuffer(17, -24, -15, -24, 17), + ArrayBuffer(86, 38, 45, 38, 86) + ) + val res_buffer1 = temp.imageLib.buffer1 + val res_buffer2 = temp.imageLib.buffer2 + if res_buffer1 != expected_buffer1 then + (false, s"Buffer1 expected: $expected_buffer1 , got $res_buffer1") + else if res_buffer2 != expected_buffer2 then + (false, s"Buffer2 expected: $expected_buffer2 , got $res_buffer2") + else (true, "") + ) + ) + } diff --git a/src/test/scala/concpar22final02/instrumentation/MockedMonitor.scala b/src/test/scala/concpar22final02/instrumentation/MockedMonitor.scala new file mode 100644 index 0000000000000000000000000000000000000000..c42f408dca974a200be152281c096f9fdee78d0e --- /dev/null +++ b/src/test/scala/concpar22final02/instrumentation/MockedMonitor.scala @@ -0,0 +1,60 @@ +package concpar22final02.instrumentation + +trait MockedMonitor extends Monitor: + def scheduler: Scheduler + + // Can be overriden. + override def waitDefault() = + scheduler.log("wait") + scheduler.waitCount.incrementAndGet() + scheduler updateThreadState Wait(this, scheduler.threadLocks.tail) + override def synchronizedDefault[T](toExecute: => T): T = + scheduler.log("synchronized check") + val prevLocks = scheduler.threadLocks + scheduler updateThreadState Sync( + this, + prevLocks + ) // If this belongs to prevLocks, should just continue. + scheduler.log("synchronized -> enter") + try toExecute + finally + scheduler updateThreadState Running(prevLocks) + scheduler.log("synchronized -> out") + override def notifyDefault() = + scheduler mapOtherStates { state => + state match + case Wait(lockToAquire, locks) if lockToAquire == this => + SyncUnique(this, state.locks) + case e => e + } + scheduler.notifyCount.incrementAndGet() + scheduler.log("notify") + override def notifyAllDefault() = + scheduler mapOtherStates { state => + state match + case Wait(lockToAquire, locks) if lockToAquire == this => + Sync(this, state.locks) + case SyncUnique(lockToAquire, locks) if lockToAquire == this => + Sync(this, state.locks) + case e => e + } + scheduler.notifyAllCount.incrementAndGet() + scheduler.log("notifyAll") + +abstract class ThreadState: + def locks: Seq[AnyRef] +trait CanContinueIfAcquiresLock extends ThreadState: + def lockToAquire: AnyRef +case object Start extends ThreadState: + def locks: Seq[AnyRef] = Seq.empty +case object End extends ThreadState: + def locks: Seq[AnyRef] = Seq.empty +case class Wait(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState +case class SyncUnique(lockToAquire: AnyRef, locks: Seq[AnyRef]) + extends ThreadState + with CanContinueIfAcquiresLock +case class Sync(lockToAquire: AnyRef, locks: Seq[AnyRef]) + extends ThreadState + with CanContinueIfAcquiresLock +case class Running(locks: Seq[AnyRef]) extends ThreadState +case class VariableReadWrite(locks: Seq[AnyRef]) extends ThreadState diff --git a/src/test/scala/concpar22final02/instrumentation/SchedulableBarrier.scala b/src/test/scala/concpar22final02/instrumentation/SchedulableBarrier.scala new file mode 100644 index 0000000000000000000000000000000000000000..a14587b96699a005681ed720427fb29fae554b64 --- /dev/null +++ b/src/test/scala/concpar22final02/instrumentation/SchedulableBarrier.scala @@ -0,0 +1,20 @@ +package concpar22final02.instrumentation + +import scala.annotation.tailrec +import concpar22final02.* +import scala.collection.mutable.ArrayBuffer + +class SchedulableBarrier(val scheduler: Scheduler, size: Int) + extends Barrier(size) + with MockedMonitor + +class SchedulableProblem2( + val scheduler: Scheduler, + imageSize: Int, + threadCount: Int, + numFilters: Int +) extends Problem2(imageSize, threadCount, numFilters): + self => + + override val barrier = + ArrayBuffer.fill(numFilters)(SchedulableBarrier(scheduler, threadCount)) diff --git a/src/test/scala/concpar22final02/instrumentation/Scheduler.scala b/src/test/scala/concpar22final02/instrumentation/Scheduler.scala new file mode 100644 index 0000000000000000000000000000000000000000..0dee02a93f0bc33a06bd29c01d99aac5777e86fc --- /dev/null +++ b/src/test/scala/concpar22final02/instrumentation/Scheduler.scala @@ -0,0 +1,321 @@ +package concpar22final02.instrumentation + +import java.util.concurrent.* +import scala.concurrent.duration.* +import scala.collection.mutable.* +import Stats.* + +import java.util.concurrent.atomic.AtomicInteger + +sealed abstract class Result +case class RetVal(rets: List[Any]) extends Result +case class Except(msg: String, stackTrace: Array[StackTraceElement]) + extends Result +case class Timeout(msg: String) extends Result + +/** A class that maintains schedule and a set of thread ids. The schedules are + * advanced after an operation of a SchedulableBuffer is performed. Note: the + * real schedule that is executed may deviate from the input schedule due to + * the adjustments that had to be made for locks + */ +class Scheduler(sched: List[Int]): + val maxOps = + 500 // a limit on the maximum number of operations the code is allowed to perform + + var waitCount: AtomicInteger = new AtomicInteger(0) + var notifyCount: AtomicInteger = new AtomicInteger(0) + var notifyAllCount: AtomicInteger = new AtomicInteger(0) + + private var schedule = sched + var numThreads = 0 + private val realToFakeThreadId = Map[Long, Int]() + private val opLog = + ListBuffer[String]() // a mutable list (used for efficient concat) + private val threadStates = Map[Int, ThreadState]() + + /** Runs a set of operations in parallel as per the schedule. Each operation + * may consist of many primitive operations like reads or writes to shared + * data structure each of which should be executed using the function `exec`. + * @timeout + * in milliseconds + * @return + * true - all threads completed on time, false -some tests timed out. + */ + def runInParallel(timeout: Long, ops: List[() => Any]): Result = + numThreads = ops.length + val threadRes = Array.fill(numThreads) { None: Any } + var exception: Option[(Throwable, Int)] = None + val syncObject = new Object() + var completed = new AtomicInteger(0) + // create threads + val threads = ops.zipWithIndex.map { case (op, i) => + new Thread( + new Runnable(): + def run(): Unit = + val fakeId = i + 1 + setThreadId(fakeId) + try + updateThreadState(Start) + val res = op() + updateThreadState(End) + threadRes(i) = res + // notify the main thread if all threads have completed + if completed.incrementAndGet() == ops.length then + syncObject.synchronized { syncObject.notifyAll() } + catch + case e: Throwable + if exception != None => // do nothing here and silently fail + case e: Throwable => + log(s"throw ${e.toString}") + exception = Some((e, fakeId)) + syncObject.synchronized { syncObject.notifyAll() } + // println(s"$fakeId: ${e.toString}") + // Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads) + ) + } + // start all threads + threads.foreach(_.start()) + // wait for all threads to complete, or for an exception to be thrown, or for the time out to expire + var remTime = timeout + syncObject.synchronized { + timed { + if completed.get() != ops.length then syncObject.wait(timeout) + } { time => + remTime -= time + } + } + if exception.isDefined then + Except( + s"Thread ${exception.get._2} crashed on the following schedule: \n" + opLog.mkString( + "\n" + ), + exception.get._1.getStackTrace + ) + else if remTime <= 1 + then // timeout ? using 1 instead of zero to allow for some errors + Timeout(opLog.mkString("\n")) + else + // every thing executed normally + RetVal(threadRes.toList) + + // Updates the state of the current thread + def updateThreadState(state: ThreadState): Unit = + val tid = threadId + synchronized { + threadStates(tid) = state + } + state match + case Sync(lockToAquire, locks) => + if locks.indexOf(lockToAquire) < 0 then waitForTurn + else + // Re-aqcuiring the same lock + updateThreadState(Running(lockToAquire +: locks)) + case Start => waitStart() + case End => removeFromSchedule(tid) + case Running(_) => + case _ => waitForTurn // Wait, SyncUnique, VariableReadWrite + + def waitStart(): Unit = + // while (threadStates.size < numThreads) { + // Thread.sleep(1) + // } + synchronized { + if threadStates.size < numThreads then wait() + else notifyAll() + } + + def threadLocks = + synchronized { + threadStates(threadId).locks + } + + def threadState = + synchronized { + threadStates(threadId) + } + + def mapOtherStates(f: ThreadState => ThreadState) = + val exception = threadId + synchronized { + for k <- threadStates.keys if k != exception do + threadStates(k) = f(threadStates(k)) + } + + def log(str: String) = + if (realToFakeThreadId contains Thread.currentThread().getId()) then + val space = (" " * ((threadId - 1) * 2)) + val s = + space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + " ") + opLog += s + + /** Executes a read or write operation to a global data structure as per the + * given schedule + * @param msg + * a message corresponding to the operation that will be logged + */ + def exec[T](primop: => T)( + msg: => String, + postMsg: => Option[T => String] = None + ): T = + if !(realToFakeThreadId contains Thread.currentThread().getId()) then primop + else + updateThreadState(VariableReadWrite(threadLocks)) + val m = msg + if m != "" then log(m) + if opLog.size > maxOps then + throw new Exception( + s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!" + ) + val res = primop + postMsg match + case Some(m) => log(m(res)) + case None => + res + + private def setThreadId(fakeId: Int) = synchronized { + realToFakeThreadId(Thread.currentThread.getId) = fakeId + } + + def threadId = + try realToFakeThreadId(Thread.currentThread().getId()) + catch + case e: NoSuchElementException => + throw new Exception( + "You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!" + ) + + private def isTurn(tid: Int) = synchronized { + (!schedule.isEmpty && schedule.head != tid) + } + + def canProceed(): Boolean = + val tid = threadId + canContinue match + case Some((i, state)) if i == tid => + // println(s"$tid: Runs ! Was in state $state") + canContinue = None + state match + case Sync(lockToAquire, locks) => + updateThreadState(Running(lockToAquire +: locks)) + case SyncUnique(lockToAquire, locks) => + mapOtherStates { + _ match + case SyncUnique(lockToAquire2, locks2) + if lockToAquire2 == lockToAquire => + Wait(lockToAquire2, locks2) + case e => e + } + updateThreadState(Running(lockToAquire +: locks)) + case VariableReadWrite(locks) => updateThreadState(Running(locks)) + true + case Some((i, state)) => + // println(s"$tid: not my turn but $i !") + false + case None => + false + + var threadPreference = + 0 // In the case the schedule is over, which thread should have the preference to execute. + + /** returns true if the thread can continue to execute, and false otherwise */ + def decide(): Option[(Int, ThreadState)] = + if !threadStates.isEmpty + then // The last thread who enters the decision loop takes the decision. + // println(s"$threadId: I'm taking a decision") + if threadStates.values.forall { + case e: Wait => true + case _ => false + } + then + val waiting = threadStates.keys.map(_.toString).mkString(", ") + val s = if threadStates.size > 1 then "s" else "" + val are = if threadStates.size > 1 then "are" else "is" + throw new Exception( + s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them." + ) + else + // Threads can be in Wait, Sync, SyncUnique, and VariableReadWrite mode. + // Let's determine which ones can continue. + val notFree = threadStates.collect { case (id, state) => + state.locks + }.flatten.toSet + val threadsNotBlocked = threadStates.toSeq.filter { + case (id, v: VariableReadWrite) => true + case (id, v: CanContinueIfAcquiresLock) => + !notFree(v.lockToAquire) || (v.locks contains v.lockToAquire) + case _ => false + } + if threadsNotBlocked.isEmpty then + val waiting = threadStates.keys.map(_.toString).mkString(", ") + val s = if threadStates.size > 1 then "s" else "" + val are = if threadStates.size > 1 then "are" else "is" + val whoHasLock = threadStates.toSeq.flatMap { case (id, state) => + state.locks.map(lock => (lock, id)) + }.toMap + val reason = threadStates + .collect { + case (id, state: CanContinueIfAcquiresLock) + if !notFree(state.lockToAquire) => + s"Thread $id is waiting on lock ${state.lockToAquire} held by thread ${whoHasLock(state.lockToAquire)}" + } + .mkString("\n") + throw new Exception( + s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason" + ) + else if threadsNotBlocked.size == 1 + then // Do not consume the schedule if only one thread can execute. + Some(threadsNotBlocked(0)) + else + val next = + schedule.indexWhere(t => + threadsNotBlocked.exists { case (id, state) => id == t } + ) + if next != -1 then + // println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}") + val chosenOne = + schedule(next) // TODO: Make schedule a mutable list. + schedule = schedule.take(next) ++ schedule.drop(next + 1) + Some((chosenOne, threadStates(chosenOne))) + else + threadPreference = (threadPreference + 1) % threadsNotBlocked.size + val chosenOne = + threadsNotBlocked(threadPreference) // Maybe another strategy + Some(chosenOne) + // threadsNotBlocked.indexOf(threadId) >= 0 + /* + val tnb = threadsNotBlocked.map(_._1).mkString(",") + val s = if (schedule.isEmpty) "empty" else schedule.mkString(",") + val only = if (schedule.isEmpty) "" else " only" + throw new Exception(s"The schedule is $s but$only threads ${tnb} can continue")*/ + else canContinue + + /** This will be called before a schedulable operation begins. This should not + * use synchronized + */ + var numThreadsWaiting = new AtomicInteger(0) + // var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice. + var canContinue: Option[(Int, ThreadState)] = + None // The result of the decision thread Id of the thread authorized to continue. + private def waitForTurn = + synchronized { + if numThreadsWaiting.incrementAndGet() == threadStates.size then + canContinue = decide() + notifyAll() + // waitingForDecision(threadId) = Some(numThreadsWaiting) + // println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}") + while !canProceed() do wait() + } + numThreadsWaiting.decrementAndGet() + + /** To be invoked when a thread is about to complete + */ + private def removeFromSchedule(fakeid: Int) = synchronized { + // println(s"$fakeid: I'm taking a decision because I finished") + schedule = schedule.filterNot(_ == fakeid) + threadStates -= fakeid + if numThreadsWaiting.get() == threadStates.size then + canContinue = decide() + notifyAll() + } + + def getOperationLog() = opLog diff --git a/src/test/scala/concpar22final02/instrumentation/TestHelper.scala b/src/test/scala/concpar22final02/instrumentation/TestHelper.scala new file mode 100644 index 0000000000000000000000000000000000000000..2610f1e2e7c425aa8aa37b40d0091ce7266d45a8 --- /dev/null +++ b/src/test/scala/concpar22final02/instrumentation/TestHelper.scala @@ -0,0 +1,152 @@ +package concpar22final02.instrumentation + +import scala.util.Random +import scala.collection.mutable.{Map as MutableMap} + +import Stats.* + +object TestHelper: + val noOfSchedules = 10000 // set this to 100k during deployment + val readWritesPerThread = + 20 // maximum number of read/writes possible in one thread + val contextSwitchBound = 10 + val testTimeout = 240 // the total time out for a test in seconds + val schedTimeout = + 15 // the total time out for execution of a schedule in secs + + // Helpers + /*def testManySchedules(op1: => Any): Unit = testManySchedules(List(() => op1)) + def testManySchedules(op1: => Any, op2: => Any): Unit = testManySchedules(List(() => op1, () => op2)) + def testManySchedules(op1: => Any, op2: => Any, op3: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3)) + def testManySchedules(op1: => Any, op2: => Any, op3: => Any, op4: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3, () => op4))*/ + + def testSequential[T](ops: Scheduler => Any)(assertions: T => ( + Boolean, + String + )) = + testManySchedules( + 1, + (sched: Scheduler) => + ( + List(() => ops(sched)), + (res: List[Any]) => assertions(res.head.asInstanceOf[T]) + ) + ) + + /** @numThreads + * number of threads + * @ops + * operations to be executed, one per thread + * @assertion + * as condition that will executed after all threads have completed + * (without exceptions) the arguments are the results of the threads + */ + def testManySchedules( + numThreads: Int, + ops: Scheduler => ( + List[() => Any], // Threads + List[Any] => (Boolean, String) + ) // Assertion + ) = + var timeout = testTimeout * 1000L + val threadIds = (1 to numThreads) + // (1 to scheduleLength).flatMap(_ => threadIds).toList.permutations.take(noOfSchedules).foreach { + val schedules = (new ScheduleGenerator(numThreads)).schedules() + var schedsExplored = 0 + schedules.takeWhile(_ => + schedsExplored <= noOfSchedules && timeout > 0 + ).foreach { + // case _ if timeout <= 0 => // break + case schedule => + schedsExplored += 1 + val schedr = new Scheduler(schedule) + // println("Exploring Sched: "+schedule) + val (threadOps, assertion) = ops(schedr) + if threadOps.size != numThreads then + throw new IllegalStateException( + s"Number of threads: $numThreads, do not match operations of threads: $threadOps" + ) + timed { schedr.runInParallel(schedTimeout * 1000, threadOps) } { t => + timeout -= t + } match + case Timeout(msg) => + throw new java.lang.AssertionError( + "assertion failed\n" + "The schedule took too long to complete. A possible deadlock! \n" + msg + ) + case Except(msg, stkTrace) => + val traceStr = + "Thread Stack trace: \n" + stkTrace.map( + " at " + _.toString + ).mkString("\n") + throw new java.lang.AssertionError( + "assertion failed\n" + msg + "\n" + traceStr + ) + case RetVal(threadRes) => + // check the assertion + val (success, custom_msg) = assertion(threadRes) + if !success then + val msg = + "The following schedule resulted in wrong results: \n" + custom_msg + "\n" + schedr + .getOperationLog() + .mkString("\n") + throw new java.lang.AssertionError("Assertion failed: " + msg) + } + if timeout <= 0 then + throw new java.lang.AssertionError( + "Test took too long to complete! Cannot check all schedules as your code is too slow!" + ) + + /** A schedule generator that is based on the context bound + */ + class ScheduleGenerator(numThreads: Int): + val scheduleLength = readWritesPerThread * numThreads + val rands = + (1 to scheduleLength).map(i => + new Random(0xcafe * i) + ) // random numbers for choosing a thread at each position + def schedules(): LazyList[List[Int]] = + var contextSwitches = 0 + var contexts = + List[Int]() // a stack of thread ids in the order of context-switches + val remainingOps = MutableMap[Int, Int]() + remainingOps ++= (1 to numThreads).map(i => + (i, readWritesPerThread) + ) // num ops remaining in each thread + val liveThreads = (1 to numThreads).toSeq.toBuffer + + /** Updates remainingOps and liveThreads once a thread is chosen for a + * position in the schedule + */ + def updateState(tid: Int): Unit = + val remOps = remainingOps(tid) + if remOps == 0 then liveThreads -= tid + else remainingOps += (tid -> (remOps - 1)) + val schedule = rands.foldLeft(List[Int]()) { + case (acc, r) if contextSwitches < contextSwitchBound => + val tid = liveThreads(r.nextInt(liveThreads.size)) + contexts match + case prev :: tail + if prev != tid => // we have a new context switch here + contexts +:= tid + contextSwitches += 1 + case prev :: tail => + case _ => // init case + contexts +:= tid + updateState(tid) + acc :+ tid + case ( + acc, + _ + ) => // here context-bound has been reached so complete the schedule without any more context switches + if !contexts.isEmpty then + contexts = contexts.dropWhile(remainingOps(_) == 0) + val tid = contexts match + case top :: tail => top + case _ => + liveThreads( + 0 + ) // here, there has to be threads that have not even started + updateState(tid) + acc :+ tid + } + schedule #:: schedules() diff --git a/src/test/scala/concpar22final02/instrumentation/TestUtils.scala b/src/test/scala/concpar22final02/instrumentation/TestUtils.scala new file mode 100644 index 0000000000000000000000000000000000000000..5c76ec9ba4020737013b5302b7e3af0e61cc8898 --- /dev/null +++ b/src/test/scala/concpar22final02/instrumentation/TestUtils.scala @@ -0,0 +1,14 @@ +package concpar22final02.instrumentation + +import scala.concurrent.* +import scala.concurrent.duration.* +import scala.concurrent.ExecutionContext.Implicits.global + +object TestUtils: + def failsOrTimesOut[T](action: => T): Boolean = + val asyncAction = Future { + action + } + try Await.result(asyncAction, 2000.millisecond) + catch case _: Throwable => return true + return false diff --git a/src/test/scala/concpar22final03/EconomicsTest.scala b/src/test/scala/concpar22final03/EconomicsTest.scala new file mode 100644 index 0000000000000000000000000000000000000000..ae765a3414de0605d5f3fc33658e045023d8d8d4 --- /dev/null +++ b/src/test/scala/concpar22final03/EconomicsTest.scala @@ -0,0 +1,102 @@ +package concpar22final03 + +import scala.concurrent.Future + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future +import scala.util.Random + +trait EconomicsTest extends Economics: + val ownedCards: collection.mutable.Set[Card] = collection.mutable.Set[Card]() + def owned(c: Card): Boolean = ownedCards(c) + def isMine(c: Card): Boolean = ownedCards(c) + + override def valueOf(cardName: String): Int = List(1, cardName.length).max + + /** This is a container for an exact amount of money that can be hold, spent, + * or put in the bank + */ + val moneyInMoneyBag = collection.mutable.Map[MoneyBag, Int]() + def moneyIn(m: MoneyBag): Int = moneyInMoneyBag.getOrElse(m, 0) + + /** If you sell a card, at some point in the future you will get some money + * (in a bag). + */ + def sellCard(c: Card): Future[MoneyBag] = + Future { + Thread.sleep(sellWaitTime()) + synchronized( + if owned(c) then + ownedCards.remove(c) + getMoneyBag(valueOf(c.name)) + else + throw Exception( + "This card doesn't belong to you or has already been sold, you can't sell it." + ) + ) + } + + /** You can buy any "Scala: The Programming" card by providing a bag of money + * with the appropriate amount and waiting for the transaction to take place + */ + def buyCard(bag: MoneyBag, name: String): Future[Card] = + Future { + Thread.sleep(buyWaitTime()) + synchronized { + if moneyIn(bag) != valueOf(name) then + throw Exception( + "You didn't provide the exact amount of money necessary to buy this card." + ) + else moneyInMoneyBag.update(bag, 0) + getCard(name) + } + + } + + /** This simple bank account hold money for you. You can bring a money bag to + * increase your account, or withdraw a money bag of any size not greater + * than your account's balance. + */ + private var balance_ = initialBalance() + def balance: Int = balance_ + def withdraw(amount: Int): Future[MoneyBag] = + Future { + Thread.sleep(withdrawWaitTime()) + synchronized( + if balance_ >= amount then + balance_ -= amount + getMoneyBag(amount) + else + throw new Exception( + "You try to withdraw more money than you have on your account" + ) + ) + } + + def deposit(bag: MoneyBag): Future[Unit] = + Future { + Thread.sleep(depositWaitTime()) + synchronized { + if moneyInMoneyBag(bag) == 0 then + throw new Exception("You are depositing en empty bag!") + else + balance_ += moneyIn(bag) + moneyInMoneyBag.update(bag, 0) + } + } + + def sellWaitTime(): Int + def buyWaitTime(): Int + def withdrawWaitTime(): Int + def depositWaitTime(): Int + def initialBalance(): Int + + def getMoneyBag(i: Int) = + val m = MoneyBag() + synchronized(moneyInMoneyBag.update(m, i)) + m + + def getCard(n: String): Card = + val c = Card(n) + synchronized(ownedCards.update(c, true)) + c diff --git a/src/test/scala/concpar22final03/Problem3Suite.scala b/src/test/scala/concpar22final03/Problem3Suite.scala new file mode 100644 index 0000000000000000000000000000000000000000..ac500f92da81591a14b433202aef605546aa4bf2 --- /dev/null +++ b/src/test/scala/concpar22final03/Problem3Suite.scala @@ -0,0 +1,203 @@ +package concpar22final03 + +import scala.concurrent.duration.* +import scala.concurrent.{Await, Future} +import scala.util.{Try, Success, Failure} +import scala.concurrent.ExecutionContext.Implicits.global + +class Problem3Suite extends munit.FunSuite: + trait Prob3Test extends Problem3: + override val economics: EconomicsTest + class Test1 extends Prob3Test: + override val economics: EconomicsTest = new EconomicsTest: + override def sellWaitTime() = 10 + override def buyWaitTime() = 20 + override def depositWaitTime() = 30 + override def withdrawWaitTime() = 40 + override def initialBalance() = 0 + class Test2 extends Prob3Test: + override val economics: EconomicsTest = new EconomicsTest: + override def sellWaitTime() = 100 + override def buyWaitTime() = 5 + override def depositWaitTime() = 50 + override def withdrawWaitTime() = 5 + override def initialBalance() = 0 + + class Test3 extends Prob3Test: + override val economics: EconomicsTest = new EconomicsTest: + val rgen = new scala.util.Random(666) + override def sellWaitTime() = rgen.nextInt(100) + override def buyWaitTime() = rgen.nextInt(100) + override def depositWaitTime() = rgen.nextInt(100) + override def withdrawWaitTime() = rgen.nextInt(100) + override def initialBalance() = 0 + + class Test4 extends Prob3Test: + var counter = 5 + def next(): Int = + counter = counter + 5 % 119 + counter + override val economics: EconomicsTest = new EconomicsTest: + override def sellWaitTime() = next() + override def buyWaitTime() = next() + override def depositWaitTime() = next() + override def withdrawWaitTime() = next() + override def initialBalance() = next() + + def testCases = List(new Test1, new Test2) + def unevenTestCases = List(new Test3, new Test4) + + def tot(cards: List[String]): Int = + cards.map[Int]((n: String) => n.length).sum + + def testOk( + t: Prob3Test, + money: Int, + sold: List[String], + wanted: List[String] + ): Unit = + import t.* + import economics.* + val f = orderDeck(getMoneyBag(money), sold.map(getCard), wanted) + val r = Await.ready(f, 3.seconds).value.get + assert(r.isSuccess) + r match + case Success(d) => + assertEquals(d.map(_.name).sorted, wanted.sorted) + assertEquals(d.length, wanted.length) + assertEquals(isMine(d.head), true) + case Failure(e) => () + + def testFailure( + t: Prob3Test, + money: Int, + sold: List[String], + wanted: List[String] + ): Unit = + import t.* + import economics.* + val f = orderDeck(getMoneyBag(money), sold.map(getCard), wanted) + val r = Await.ready(f, 3.seconds).value.get + assert(r.isFailure) + r match + case Failure(e: NotEnoughMoneyException) => () + case _ => fail( + "Should have thrown a NotEnoughMoneyException exception, but did not" + ) + + // --- Without sold cards --- + + test( + "Should work correctly when a single card is asked with enough money (no card sold) (20pts)" + ) { + testCases.foreach(t => testOk(t, 7, Nil, List("Tefeiri"))) + } + test( + "Should work correctly when a single card is asked with enough money (no card sold, uneven waiting time) (10pts)" + ) { + unevenTestCases.foreach(t => testOk(t, 7, Nil, List("Tefeiri"))) + } + test( + "Should work correctly when multiple cards are asked with enough money (no card sold) (20pts)" + ) { + val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg") + testCases.foreach(t => testOk(t, tot(cards), Nil, cards)) + } + test( + "Should work correctly when multiple cards are asked with enough money (no card sold, uneven waiting time) (10pts)" + ) { + val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg") + unevenTestCases.foreach(t => testOk(t, tot(cards), Nil, cards)) + } + test( + "Should work correctly when asked duplicates of cards, with enough money (no card sold) (20pts)" + ) { + val cards = List("aaaa", "aaaa", "aaaa", "dd", "dd", "dd", "dd") + testCases.foreach(t => testOk(t, tot(cards), Nil, cards)) + } + test( + "Should work correctly when asked duplicates of cards, with enough money (no card sold, uneven waiting time) (10pts)" + ) { + val cards = List("aaaa", "aaaa", "aaaa", "dd", "dd", "dd", "dd") + unevenTestCases.foreach(t => testOk(t, tot(cards), Nil, cards)) + } + + // --- With sold cards --- + + test( + "Should work correctly when a single card is bought and a single of the same price is sold (20pts)" + ) { + testCases.foreach(t => testOk(t, 0, List("Chandra"), List("Tefeiri"))) + } + test( + "Should work correctly when a single card is bought and a single of the same price is sold (uneven waiting time) (10pts)" + ) { + unevenTestCases.foreach(t => testOk(t, 0, List("Chandra"), List("Tefeiri"))) + } + + test( + "Should work correctly when multiple cards are asked and multiple of matching values are sold (20pts)" + ) { + val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg") + val sold = List("1111111", "2", "3333", "44", "55555", "666", "7777") + testCases.foreach(t => testOk(t, 0, sold, cards)) + } + test( + "Should work correctly when multiple cards are asked and multiple of matching values are sold (uneven waiting time) (10pts)" + ) { + val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg") + val sold = List("1111111", "2", "3333", "44", "55555", "666", "7777") + unevenTestCases.foreach(t => testOk(t, 0, sold, cards)) + } + test( + "Should work correctly when multiple cards are asked and multiple of the same total value are sold (20pts)" + ) { + val cards2 = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg") + val sold2 = List("111111111", "22", "3", "44", "555555", "666", "777") + assert(tot(sold2) == tot(cards2)) + testCases.foreach(t => testOk(t, 0, sold2, cards2)) + } + test( + "Should work correctly when multiple cards are asked and multiple of the same total value are sold (uneven waiting time) (10pts)" + ) { + val cards2 = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg") + val sold2 = List("111111111", "22", "3", "44", "555555", "666", "777") + assert(tot(sold2) == tot(cards2)) + unevenTestCases.foreach(t => testOk(t, 0, sold2, cards2)) + } + + test( + "Should work correctly when given money and sold cards are sufficient for the wanted cards (20pts)" + ) { + val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg") + val sold = List("11111", "2", "33", "44", "5555", "666", "777") + val bagMoney = tot(cards) - tot(sold) + testCases.foreach(t => testOk(t, bagMoney, sold, cards)) + } + test( + "Should work correctly when given money and sold cards are sufficient for the wanted cards (uneven waiting time) (10pts)" + ) { + val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg") + val sold = List("11111", "2", "33", "44", "5555", "666", "777") + val bagMoney = tot(cards) - tot(sold) + unevenTestCases.foreach(t => testOk(t, bagMoney, sold, cards)) + } + + // --- Failures --- + + test( + "Should return a failure when too little money is provided (no card sold) (20pts)" + ) { + val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg") + testCases.foreach(t => testFailure(t, tot(cards) - 1, Nil, cards)) + testCases.foreach(t => testFailure(t, tot(cards) - 50, Nil, cards)) + } + + test( + "Should return a failure when too little money or sold cards are provided (20pts)" + ) { + val cards = List("aaaa", "bbb", "ccccc", "dd", "eeee", "f", "ggggggg") + val sold = List("11111", "2", "33", "44", "5555", "666", "777") + val bagMoney = tot(cards) - tot(sold) + testCases.foreach(t => testFailure(t, bagMoney - 2, sold, cards)) + } diff --git a/src/test/scala/concpar22final04/Problem4Suite.scala b/src/test/scala/concpar22final04/Problem4Suite.scala new file mode 100644 index 0000000000000000000000000000000000000000..9e2b1c2cbf3edaf3ca1db726c96be79bc1ed2b5b --- /dev/null +++ b/src/test/scala/concpar22final04/Problem4Suite.scala @@ -0,0 +1,370 @@ +package concpar22final04 + +import akka.actor.* +import akka.testkit.* +import akka.pattern.* +import akka.util.Timeout +import concurrent.duration.* +import User.Protocol.* +import User.Responses.* +import SongsStore.Protocol.* +import SongsStore.Responses.* +import scala.util.{Try, Success, Failure} +import com.typesafe.config.ConfigFactory +import java.util.Date +import scala.util.Random + +class Problem4Suite extends munit.FunSuite: +//--- + Random.setSeed(42178263) + /*+++ + Random.setSeed(42) +++*/ + + test("after receiving GetInfo, should answer with Info (20pts)") { + new MyTestKit: + def tests() = + ada ! GetInfo + expectMsg(Info("1", "Ada")) + } + + test( + "after receiving GetHomepageData, should answer with the correct HomepageData when there is no liked songs and no activity items (30pts)" + ) { + new MyTestKit: + def tests() = + ada ! GetHomepageData + expectMsg(HomepageData(List(), List())) + } + + test( + "after receiving Like(1), should add 1 to the list of liked songs (20pts)" + ) { + new MyTestKit: + def tests() = + ada ! Like(1) + expectNoMessage() + ada ! GetHomepageData + expectMsg(HomepageData(List(1), List())) + } + + test( + "after receiving Like(1) and then Like(2), the list of liked songs should start with List(2, 1) (20pts)" + ) { + new MyTestKit: + def tests() = + ada ! Like(1) + expectNoMessage() + ada ! Like(2) + expectNoMessage() + ada ! GetHomepageData + expectMsg(HomepageData(List(2, 1), List())) + } + + test( + "after receiving Like(1) and then Like(1), song 1 should be in the list of liked songs only once (10pts)" + ) { + new MyTestKit: + def tests() = + ada ! Like(1) + expectNoMessage() + ada ! Like(1) + expectNoMessage() + ada ! GetHomepageData + expectMsg(HomepageData(List(1), List())) + } + + test( + "after receiving Like(1), Like(2) and then Like(1), the list of liked songs should start with List(2, 1) (10pts)" + ) { + new MyTestKit: + def tests() = + ada ! Like(1) + expectNoMessage() + ada ! Like(2) + expectNoMessage() + ada ! Like(1) + expectNoMessage() + ada ! GetHomepageData + expectMsg(HomepageData(List(2, 1), List())) + } + + test( + "after receiving Like(1), Unlike(1) and then Unlike(1), the list of liked songs should not contain song 1 (10pts)" + ) { + new MyTestKit: + def tests() = + ada ! Like(1) + expectNoMessage() + ada ! Like(2) + expectNoMessage() + ada ! Like(1) + expectNoMessage() + ada ! GetHomepageData + expectMsg(HomepageData(List(2, 1), List())) + } + + test( + "after receiving Subscribe(aUser) and then Play(5), should send AddActivity(Activity(\"1\", 5)) to aUser (20pts)" + ) { + new MyTestKit: + def tests() = + ada ! Subscribe(self) + expectNoMessage() + ada ! Play(5) + expectMsg(AddActivity(Activity("1", "Ada", 5))) + } + + test( + "after receiving Subscribe(aUser), Subscribe(bUser) and then Play(5), should send AddActivity(Activity(\"1\", 5)) to aUser (10pts)" + ) { + new MyTestKit: + def tests() = + ada ! Subscribe(self) + expectNoMessage() + val donald = new TestProbe(system) + ada ! Subscribe(donald.ref) + expectNoMessage() + ada ! Play(5) + expectMsg(AddActivity(Activity("1", "Ada", 5))) + donald.expectMsg(AddActivity(Activity("1", "Ada", 5))) + } + + test( + "after receiving Subscribe(aUser), Subscribe(aUser) and then Play(5), should send AddActivity(Activity(\"1\", 5)) to aUser only once (10pts)" + ) { + new MyTestKit: + def tests() = + ada ! Subscribe(self) + expectNoMessage() + ada ! Subscribe(self) + expectNoMessage() + ada ! Play(5) + expectMsg(AddActivity(Activity("1", "Ada", 5))) + expectNoMessage() + } + + test( + "after receiving Subscribe(aUser), Unsubscribe(aUser) and then Play(5), should not send AddActivity(Activity(\"1\", 5)) to aUser (10pts)" + ) { + new MyTestKit: + def tests() = + ada ! Subscribe(self) + expectNoMessage() + ada ! Play(5) + expectMsg(AddActivity(Activity("1", "Ada", 5))) + ada ! Unsubscribe(self) + expectNoMessage() + ada ! Play(5) + expectNoMessage() + } + + test( + "after receiving AddActivity(Activity(\"1\", 5)), Activity(\"1\", 5) should be in the activity feed (10pts)" + ) { + new MyTestKit: + def tests() = + ada ! AddActivity(Activity("0", "Self", 5)) + expectNoMessage() + ada ! GetHomepageData + expectMsg(HomepageData(List(), List(Activity("0", "Self", 5)))) + } + + test( + "after receiving AddActivity(Activity(\"1\", 5)) and AddActivity(Activity(\"1\", 6)), Activity(\"1\", 6) should be in the activity feed and Activity(\"1\", 5) should not (10pts)" + ) { + new MyTestKit: + def tests() = + ada ! AddActivity(Activity("0", "Self", 5)) + expectNoMessage() + ada ! AddActivity(Activity("0", "Self", 6)) + expectNoMessage() + ada ! GetHomepageData + expectMsg(HomepageData(List(), List(Activity("0", "Self", 6)))) + } + + test( + "after receiving GetHomepageText, should answer with a result containing \"Howdy $name!\" where $name is the user's name (10pts)" + ) { + new MyTestKit: + def tests() = + val name = Random.alphanumeric.take(5).mkString + val randomUser = + system.actorOf(Props(classOf[User], "5", name, songsStore), "user-5") + randomUser ! GetHomepageText + expectMsgClass(classOf[HomepageText]).result.contains(f"Howdy $name!") + } + + test( + "after receiving GetHomepageText, should answer with the correct names of liked songs (1) (10pts)" + ) { + new MyTestKit: + def tests() = + ada ! Like(8) + expectNoMessage() + ada ! Like(3) + expectNoMessage() + ada ! Like(2) + expectNoMessage() + ada ! GetHomepageText + assertEquals( + expectMsgClass(classOf[HomepageText]).result.linesIterator + .drop(2) + .take(4) + .mkString("\n") + .trim, + """ + |Liked Songs: + |* Sunny by Boney M. + |* J'irai où tu iras by Céline Dion & Jean-Jacques Goldman + |* Hold the line by TOTO + """.stripMargin.trim + ) + } + + test( + "after receiving GetHomepageText, should answer with the correct names of liked songs (2) (10pts)" + ) { + new MyTestKit: + def tests() = + ada ! Like(9) + expectNoMessage() + ada ! Like(7) + expectNoMessage() + ada ! GetHomepageText + assertEquals( + expectMsgClass(classOf[HomepageText]).result.linesIterator + .drop(2) + .take(3) + .mkString("\n") + .trim, + """ + |Liked Songs: + |* Straight Edge by Minor Threat + |* Anarchy in the UK by Sex Pistols + """.stripMargin.trim + ) + } + + test( + "after receiving GetHomepageText, should answer with the correct activity feed (1) (10pts)" + ) { + new MyTestKit: + def tests() = + bob ! Subscribe(ada) + expectNoMessage() + carol ! Subscribe(ada) + expectNoMessage() + donald ! Subscribe(ada) + expectNoMessage() + bob ! Play(3) + expectNoMessage() + carol ! Play(8) + expectNoMessage() + ada ! GetHomepageText + assertEquals( + expectMsgClass(classOf[HomepageText]).result.linesIterator + .drop(4) + .take(10) + .mkString("\n") + .trim, + """ + |Activity Feed: + |* Carol is listening to Hold the line by TOTO + |* Bob is listening to J'irai où tu iras by Céline Dion & Jean-Jacques Goldman + """.stripMargin.trim + ) + } + + test( + "after receiving GetHomepageText, should answer with the correct activity feed (2) (10pts)" + ) { + new MyTestKit: + def tests() = + bob ! Subscribe(ada) + expectNoMessage() + carol ! Subscribe(ada) + expectNoMessage() + donald ! Subscribe(ada) + expectNoMessage() + bob ! Play(9) + expectNoMessage() + carol ! Play(10) + expectNoMessage() + donald ! Play(6) + expectNoMessage() + bob ! Play(7) + expectNoMessage() + ada ! GetHomepageText + assertEquals( + expectMsgClass(classOf[HomepageText]).result.linesIterator + .drop(4) + .take(10) + .mkString("\n") + .trim, + """ + |Activity Feed: + |* Bob is listening to Straight Edge by Minor Threat + |* Donald is listening to Désenchantée by Mylène Farmer + |* Carol is listening to Breakfast in America by Supertramp + """.stripMargin.trim + ) + } + + test( + "after receiving GetHomepageText, should answer with the correct text (full test) (10pts)" + ) { + new MyTestKit: + def tests() = + ada ! Like(1) + expectNoMessage() + ada ! Like(2) + expectNoMessage() + bob ! Subscribe(ada) + expectNoMessage() + carol ! Subscribe(ada) + expectNoMessage() + donald ! Subscribe(ada) + expectNoMessage() + donald ! Play(3) + expectNoMessage() + bob ! Play(4) + expectNoMessage() + carol ! Play(5) + expectNoMessage() + ada ! GetHomepageText + assertEquals( + expectMsgClass(classOf[HomepageText]).result.linesIterator + .mkString("\n") + .trim, + """ + |Howdy Ada! + | + |Liked Songs: + |* Sunny by Boney M. + |* High Hopes by Pink Floyd + | + |Activity Feed: + |* Carol is listening to Strobe by deadmau5 + |* Bob is listening to Ce monde est cruel by Vald + |* Donald is listening to J'irai où tu iras by Céline Dion & Jean-Jacques Goldman + """.stripMargin.trim + ) + } + + abstract class MyTestKit + extends TestKit(ActorSystem("TestSystem")) + with ImplicitSender: + val songsStore = system.actorOf(Props(MockSongsStore()), "songsStore") + def makeAda() = + system.actorOf(Props(classOf[User], "1", "Ada", songsStore), "user-1") + val ada = makeAda() + val bob = + system.actorOf(Props(classOf[User], "2", "Bob", songsStore), "user-2") + val carol = + system.actorOf(Props(classOf[User], "3", "Carol", songsStore), "user-3") + val donald = + system.actorOf(Props(classOf[User], "4", "Donald", songsStore), "user-4") + def tests(): Unit + try tests() + finally shutdown(system) diff --git a/src/test/scala/instrumentation/Stats.scala b/src/test/scala/instrumentation/Stats.scala index 967bd0a83e2a7e2f1b9a4262c68a6cb1785658ee..81dca0de4e93422823aebfeea37cd9bb31177aad 100644 --- a/src/test/scala/instrumentation/Stats.scala +++ b/src/test/scala/instrumentation/Stats.scala @@ -2,10 +2,7 @@ package instrumentation import java.lang.management.* -/** A collection of methods that can be used to collect run-time statistics - * about Leon programs. This is mostly used to test the resources properties of - * Leon programs - */ +/** A collection of methods that can be used to collect run-time statistics */ object Stats: def timed[T](code: => T)(cont: Long => Unit): T = var t1 = System.currentTimeMillis()