Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • lara/cs206-demos
  • gcharles/cs206-demos
  • gambhir/cs206-demos
3 results
Show changes
Showing
with 1872 additions and 0 deletions
package midterm22
import scala.annotation.nowarn
// Questions 4-7
// See tests in midterm22.Part2Test.
// Run with `sbt testOnly midterm22.Part2Test`.
/*
Answers to the exam questions:
When called with a Vector:
The total amount of work is O(n), as it is dominated by the time needed to
read the array. More precisely W(n) = c + 2*W(n/2) = O(n).
The depth is O(log(n)), because every recursion takes constant time
and we divide the size of the input by 2 every time, i.e. D(n) = c + D(n/2) = O(log(n)).
Note however that in practice it is often still faster to manipulate
start and end indices rather than using take and drop.
When called with a List:
Every recursion takes up to time O(n) rather than constant time.
The total amount of work is O(n) times the number of recursion, because
take and drop takes time O(n) on lists. Precisely, W(n) = n + 2*W(n/2) = O(log(n)*n)
The depth is computed similarly: D(n) = n + D(n/2) = O(n), i.e.
Note: these are theoretical results. In practice, you should always double-check
that kind of conclusions with benchmarks. We did so in
`midterm-code/src/main/scala/bench`. Results are available in `bench-results`.
From these results, we can conclude that
1. Vectors are indeed faster in this case, and
2. parallelization of `contains` yields a 2x speedup.
*/
@nowarn
def contains[A](l: Iterable[A], elem: A): Boolean =
val n = l.size
if n <= 5 then
for i <- l do
if i == elem then
return true
false
else
val (p0, p1) = parallel(
contains(l.take(n / 2), elem),
contains(l.drop(n / 2), elem)
)
p0 || p1
package midterm22
// Question 8
// Run with `sbt runMain midterm22.part3`
@main def part3() =
def thread(b: => Unit) =
val t = new Thread:
override def run() = b
t
val t = thread { println(s"Hello World") }
t.join()
println(s"Hello")
package midterm22
import java.util.concurrent.atomic.AtomicInteger
import instrumentation.*
import instrumentation.Monitor
// Questions 9-15
// See tests in midterm22.Part4Test.
// Run with `sbt testOnly midterm22.Part4Test`.
class Node(
// Globally unique identifier. Different for each instance.
val guid: Int
) extends Monitor
// This function might be called concurrently.
def lockFun(nodes: List[Node], fn: (e: Int) => Unit): Unit =
if nodes.size > 0 then
nodes.head.synchronized {
fn(nodes(0).guid)
lockFun(nodes.tail, fn)
}
package midterm22
import instrumentation.Monitor
// Question 21
// See tests in midterm22.Part6Test.
// Run with `sbt testOnly midterm22.Part6Test`.
class TicketsManager(totalTickets: Int) extends Monitor:
var remainingTickets = totalTickets
// This method might be called concurrently
def getTicket(): Boolean =
if remainingTickets > 0 then
this.synchronized {
remainingTickets -= 1
}
true
else false
package midterm22
import instrumentation.Monitor
// Questions 22-24
// See tests in midterm22.Part7Test.
// Run with `sbt testOnly midterm22.Part7Test`.
class NIC(private val _index: Int, private var _assigned: Boolean)
extends Monitor:
def index = _index
def assigned = _assigned
def assigned_=(v: Boolean) = _assigned = v
class NICManager(n: Int):
// Creates a list with n NICs
val nics = (for i <- 0 until n yield NIC(i, false)).toList
// This method might be called concurrently
def assignNICs(limitRecvNICs: Boolean = false): (Int, Int) =
var recvNIC: Int = 0
var sendNIC: Int = 0
var gotRecvNIC: Boolean = false
var gotSendNIC: Boolean = false
/// Obtaining receiving NIC...
while !gotRecvNIC do
nics(recvNIC).synchronized {
if !nics(recvNIC).assigned then
nics(recvNIC).assigned = true
gotRecvNIC = true
else recvNIC = (recvNIC + 1) % (if limitRecvNICs then n - 1 else n)
}
// Successfully obtained receiving NIC
// Obtaining sending NIC...
while !gotSendNIC do
nics(sendNIC).synchronized {
if !nics(sendNIC).assigned then
nics(sendNIC).assigned = true
gotSendNIC = true
else sendNIC = (sendNIC + 1) % n
}
// Successfully obtained sending NIC
return (recvNIC, sendNIC)
package midterm22
import scala.collection.concurrent.{TrieMap, Map}
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
// Question 25
// See tests in midterm22.Part8Test.
// Run with `sbt testOnly midterm22.Part8Test`.
// Represent a social network where user can follow each other. Each user is
// represented by an id that is an `Int`.
abstract class AbstractInstagram:
// The map storing the "following" relation of our social network.
// `graph(a)` contains the list of user ids that user `a` follows.
val graph: Map[Int, List[Int]] = new TrieMap[Int, List[Int]]()
// The maximum user id allocated until now. This value should be incremented
// by one each time a new user is added.
val maxId = new AtomicInteger(0)
// Allocates a new user and returns its unique id. Internally, this should
// also create an empty list at the corresponding id in `graph`. The
// implementation must be thread-safe.
def add(): Int
// Make `a` follow `b`. The implementation must be thread-safe.
def follow(a: Int, b: Int): Unit
// Makes `a` unfollow `b`. The implementation must be thread-safe.
def unfollow(a: Int, b: Int): Unit
// Removes user with id `a`. This should also remove all references to `a`
// in `graph`. The implementation must be thread-safe.
def remove(a: Int): Unit
class Instagram extends AbstractInstagram:
// This method is worth 6 points.
def add(): Int =
// It is important to increment and read the value in one atomic step. See
// test `testParallelWrongAdd` for an alternative wrong implementation.
val id = maxId.incrementAndGet
// Note: it is also correct to use `graph.putIfAbsent`, but not needed as
// `id` is always new and therefore absent from the map at this point.
graph.update(id, Nil)
id
// This method is worth 8 points.
def remove(a: Int): Unit =
graph.remove(a)
// Iterate through all keys to make sure that nobody follows `a` anymore.
// For each key, we need to unfollow a in a thread-safe manner. Calling
// `unfollow` is the simplest way to so, as it is already guaranteed to be
// thread-safe. See test `testParallelWrongRemove` for an example of wrong
// implementation.
for b <- graph.keys do unfollow(b, a)
// This method is worth 10 points.
def unfollow(a: Int, b: Int) =
// Here, it is important to read the value only once. First calling
// `.contains(a)` and then `graph(a)` does not work, as `a` might be removed
// between the two calls. See `testParallelWrongUnfollow` for an example of
// this wrong implementation.
val prev = graph.get(a)
// Returns silently if `a` does not exist.
if prev.isEmpty then return
// We replace the list of users that `a` follows in an atomic manner. If the
// list of followed users changed concurrently, we start over.
if !graph.replace(a, prev.get, prev.get.filter(_ != b)) then unfollow(a, b)
// This method is worth 12 points.
def follow(a: Int, b: Int) =
val prev = graph.get(a)
// Returns silently if `a` does not exist.
if prev.isEmpty then return
// We replace the list of users that `a` follows in an atomic manner. If the
// list of followed users changed concurrently, we start over.
if !graph.replace(a, prev.get, b :: prev.get) then follow(a, b)
// Difficult: this handles the case where `b` is concurrently removed by
// another thread. To detect this case, we must check if `b` still exists
// after we have followed it, and unfollow it if it is not the case. See
// test `testParallelFollowABRemoveB`. This last step is worth 4 points.
else if !graph.contains(b) then unfollow(a, b)
package midterm22.appendix
// Represents optional values. Instances of Option are either an instance of
// scala.Some or the object None.
sealed abstract class Option[+A]:
// Returns the option's value if the option is an instance of scala.Some, or
// throws an exception if the option is None.
def get: A
// Returns true if the option is an instance of scala.Some, false otherwise.
// This is equivalent to:
// option match
// case Some(v) => true
// case None => false
def isDefined: Boolean
// Returns this scala.Option if it is nonempty, otherwise return the result of
// evaluating alternative.
def orElse[B >: A](alternative: => Option[B]): Option[B]
abstract class Iterable[+A]:
// Selects all elements except first n ones.
def drop(n: Int): Iterable[A]
// The size of this collection.
def size: Int
// Selects the first n elements.
def take(n: Int): Iterable[A]
abstract class List[+A] extends Iterable[A]:
// Adds an element at the beginning of this list.
def ::[B >: A](elem: B): List[B]
// A copy of this sequence with an element appended.
def appended[B >: A](elem: B): List[B]
// Get the element at the specified index.
def apply(n: Int): A
// Selects all elements of this list which satisfy a predicate.
def filter(pred: (A) => Boolean): List[A]
// Selects the first element of this list.
def head: A
// Sorts this sequence according to a comparison function.
def sortWith(lt: (A, A) => Boolean): List[A]
// Selects all elements except the first.
def tail: List[A]
abstract class Array[+A] extends Iterable[A]:
// Get the element at the specified index.
def apply(n: Int): A
abstract class Thread:
// Subclasses should override this method.
def run(): Unit
// Causes this thread to begin execution; the Java Virtual Machine calls the
// run method of this thread.
def start(): Unit
// Waits for this thread to die.
def join(): Unit
// Creates and starts a new task ran concurrently.
def task[T](body: => T): ForkJoinTask[T] = ???
abstract class ForkJoinTask[T]:
// Returns the result of the computation when it is done.
def join(): T
// A concurrent hash-trie or TrieMap is a concurrent thread-safe lock-free
// implementation of a hash array mapped trie.
abstract class TrieMap[K, V]:
// Retrieves the value which is associated with the given key. Throws a
// NoSuchElementException if there is no mapping from the given key to a
// value.
def apply(key: K): V
// Tests whether this map contains a binding for a key.
def contains(key: K): Boolean
// Applies a function f to all elements of this concurrent map. This function
// iterates over a snapshot of the map.
def foreach[U](f: ((K, V)) => U): Unit
// Optionally returns the value associated with a key.
def get(key: K): Option[V]
// Collects all key of this map in an iterable collection. The result is a
// snapshot of the values at a specific point in time.
def keys: Iterator[K]
// Transforms this map by applying a function to every retrieved value. This
// returns a new map.
def mapValues[W](f: V => W): TrieMap[K, W]
// Associates the given key with a given value, unless the key was already
// associated with some other value. This is an atomic operation.
def putIfAbsent(k: K, v: V): Option[V]
// Removes a key from this map, returning the value associated previously with
// that key as an option.
def remove(k: K): Option[V]
// Removes the entry for the specified key if it's currently mapped to the
// specified value. This is an atomic operation.
def remove(k: K, v: V): Boolean
// Replaces the entry for the given key only if it was previously mapped to a
// given value. Returns true if the change is successful, or false otherwise.
// This is an atomic operation.
def replace(k: K, oldvalue: V, newvalue: V): Boolean
// Adds a new key/value pair to this map.
def update(k: K, v: V): Unit
// Collects all values of this map in an iterable collection. The result is a
// snapshot of the values at a specific point in time.
def values: Iterator[V]
// An int value that may be updated atomically.
// The constructor takes the initial value at its only argument. For example,
// this create an `AtomicInteger` with an initial value of `42`:
// val myAtomicInteger = new AtomicInteger(42)
abstract class AtomicInteger:
// Atomically adds the given value to the current value and returns the
// updated value.
def addAndGet(delta: Int): Int
// Atomically sets the value to the given updated value if the current value
// == the expected value. Returns true if the change is successful, or false
// otherwise. This is an atomic operation.
def compareAndSet(oldvalue: Int, newvalue: Int): Boolean
// Gets the current value. This is an atomic operation.
def get(): Int
// Atomically increments by one the current value. This is an atomic operation.
def incrementAndGet(): Int
// ---------------------------------------------
// Needed so that we can compile successfully, but not included for students.
// See Option class doc instead.
abstract class Some[+A](value: A) extends Option[A]
object Some:
def apply[A](value: A): Some[A] = ???
val None: Option[Nothing] = ???
object List:
def apply[A](values: A*): List[A] = ???
val Nil: List[Nothing] = ???
object Array:
def apply[A](values: A*): Array[A] = ???
package midterm22
import java.util.concurrent.ForkJoinTask
import java.util.concurrent.RecursiveTask
import java.util.concurrent.ForkJoinWorkerThread
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.atomic.AtomicInteger
val forkJoinPool = ForkJoinPool()
var parallelismEnabled = true
var tasksCreated: AtomicInteger = AtomicInteger(0)
def schedule[T](body: => T): ForkJoinTask[T] =
val t = new RecursiveTask[T]:
def compute = body
Thread.currentThread match
case wt: ForkJoinWorkerThread => t.fork()
case _ => forkJoinPool.execute(t)
t
def task[T](body: => T): ForkJoinTask[T] =
tasksCreated.incrementAndGet
schedule(body)
def parallel[A, B](op1: => A, op2: => B): (A, B) =
if parallelismEnabled then
val res1 = task { op1 }
val res2 = op2
(res1.join(), res2)
else (op1, op2)
package midterm23
import instrumentation.Monitor
import scala.annotation.tailrec
/** Runs one of the three provided solutions to the barber shop problem.
*
* Run using `sbt "runMain midterm23.barberSolution 2 3"` for example.
*
* @param solutionNumber
* One of the 3 provided solutions to run: 1, 2, or 3.
* @param nCustomers
* The number of customers to simulate.
*/
@main def barberSolution(solutionNumber: Int, nCustomers: Int): Unit =
val barberShop: AbstractBarberShopSolution =
solutionNumber match
case 1 => BarberShopSolution1(nCustomers)
case 2 => BarberShopSolution2(nCustomers)
case 3 => BarberShopSolution3(nCustomers)
barberSolutionMain(barberShop, solutionNumber, nCustomers)
def barberSolutionMain(
barberShop: AbstractBarberShopSolution,
solutionNumber: Int,
nCustomers: Int
): Unit =
val bthread = new Thread:
override def run() =
barberShop.barber()
bthread.start()
val customerThreads =
for i <- 1 to nCustomers yield
val cthread = new Thread:
override def run() =
barberShop.customer(i)
cthread.start()
cthread
bthread.join()
customerThreads.foreach(_.join())
abstract class AbstractBarberShopSolution:
// var notifyBarber: Boolean = false
// var notifyCustomer: Boolean = false
// val lockObj = Object()
// In order to be able to mock the three attributes above in the tests, we
// replace them with three private attributes and public getters and setters.
// Without overrides, this is equivalent to the definitions above.
private var _notifyBarber: Boolean = false
private var _notifyCustomer: Boolean = false
private val _lockObject: Monitor = new Monitor {}
def notifyBarber = _notifyBarber
def notifyBarber_=(v: Boolean) = _notifyBarber = v
def notifyCustomer = _notifyCustomer
def notifyCustomer_=(v: Boolean) = _notifyCustomer = v
def lockObj: Monitor = _lockObject
def hairCut(): Unit =
log("start hair cut")
Thread.sleep(1000)
log("finish hair cut")
def customer(n: Int): Unit
def barber(): Unit
/** Logs a message to the console.
*
* Overridden in the tests to avoid printing gigabytes of output.
*
* @param s
*/
def log(s: String) = println(s)
/** This is the solution we expected for the `customer` method.
*
* However, now that you have learned about `@volatile`, you can see that the
* `customer` method is not thread-safe. Can you spot a problem with this
* solution?
*
* Note that the `log` calls were added to help you understand what is going
* on. They are not part of the expected solution.
*/
trait BarberShopCustomerSolution1 extends AbstractBarberShopSolution:
override def customer(n: Int) =
lockObj.synchronized {
log(f"customer ${n} sits on the chair and waits for the barber")
notifyBarber = true
while !notifyCustomer do {}
notifyCustomer = false
log(f"customer ${n} leaves the chair")
}
/** This is the solution we expected for the `barber` method.
*
* Same question as above: can you spot a problem with this solution?
*
* @param customersNumber
* The number of customers to simulate.
*/
trait BarberShopBarberSolution1(customersNumber: Int)
extends AbstractBarberShopSolution:
override def barber() =
// while true do
// We replace the infinite loop from the instructions with a loop doing a
// limited number of iterations so that we can test the code.
for _ <- 1 to customersNumber do
log("barber waits for a customer")
while !notifyBarber do {}
notifyBarber = false
hairCut()
notifyCustomer = true
log("barber stops working")
class BarberShopSolution1(customersNumber: Int)
extends BarberShopCustomerSolution1,
BarberShopBarberSolution1(customersNumber)
/** The problem with BarberShopSolution1 is that the `notifyBarber` and
* `notifyCustomer` are accessed from multiple threads without any
* synchronization. This means that the compiler is free to reorder the
* instructions in the `customer` and `barber` methods, which can lead to
* unexpected behaviors like infinite loops!
*
* To solve this problem, the solution below redefines the two attributes as
* `@volatile`.
*
* @param customersNumber
* The number of customers to simulate.
*/
class BarberShopSolution2(customersNumber: Int)
extends BarberShopSolution1(customersNumber):
@volatile private var _notifyBarber: Boolean = false
@volatile private var _notifyCustomer: Boolean = false
override def notifyBarber = _notifyBarber
override def notifyBarber_=(v: Boolean) = _notifyBarber = v
override def notifyCustomer = _notifyCustomer
override def notifyCustomer_=(v: Boolean) = _notifyCustomer = v
/** The solution below uses a different approach to solve the problem of
* BarberShopSolution1. Instead of using `@volatile`, it uses the lock object
* to synchronize the accesses to the two attributes. Note how each access to
* the attributes is now wrapped in a `lockObj.synchronized` block.
*
* @param customersNumber
* The number of customers to simulate.
*/
class BarberShopSolution3(customersNumber: Int)
extends AbstractBarberShopSolution:
override def customer(n: Int) =
getHairCut(n)
@tailrec
private def getHairCut(n: Int): Unit =
val sited = lockObj.synchronized {
if notifyBarber then
false
else if notifyCustomer then
false
else
log(f"customer ${n} sits on the chair and waits for the barber")
notifyBarber = true
true
}
if sited then
while !lockObj.synchronized { notifyCustomer } do {}
lockObj.synchronized {
log(f"customer ${n} leaves the chair")
notifyCustomer = false
}
else
getHairCut(n)
override def barber() =
for _ <- 1 to customersNumber do
log("barber waits for a customer")
while !lockObj.synchronized { notifyBarber } do {}
hairCut()
lockObj.synchronized {
notifyCustomer = true
notifyBarber = false
}
log("barber stops working")
package midterm23
import java.util.concurrent.atomic.AtomicInteger
/** This is an example `BarberShopStudentAnswer`, like the one we made for your
* own solution and that you can find on Moodle. The solution below is boringly
* correct as it use the same implementation as the expected solution. See
* BarberShopSolution.scala for explanations and comments.
*
* @param customersNumber
* The number of customers to simulate.
*/
class BarberShopStudentAnswer333(customersNumber: Int)
extends AbstractBarberShopSolution:
/** This attribute is used to count the number of hair cuts. It is needed for
* testing because we do not want to loop forever in the `barber` method!
*/
val hairCutsCounter = AtomicInteger(customersNumber)
override def customer(n: Int) =
lockObj.synchronized {
notifyBarber = true
while !notifyCustomer do {}
notifyCustomer = false
}
override def barber() =
// while true
// We replace the infinite loop from the instructions with a loop doing a
// limited number of iterations so that we can test the code.
while hairCutsCounter.get() != 0 do
while !notifyBarber do {}
notifyBarber = false
hairCut()
hairCutsCounter.decrementAndGet() // You can ignore this line. It has been added for testing.
notifyCustomer = true
/** Version of `BarberShopStudentAnswer333` with the methods `customer`
* overridden to use the expected implementation.
*
* @param n
* The number of customers to simulate.
*/
class BarberShopStudentAnswer333WithCorrectCustomer(n: Int)
extends BarberShopStudentAnswer333(n)
with BarberShopCustomerSolution1
/** Version of `BarberShopStudentAnswer333` with the methods `barber` overridden
* to use the expected implementation.
*
* @param n
* The number of customers to simulate.
*/
class BarberShopStudentAnswer333WithCorrectBarber(n: Int)
extends BarberShopStudentAnswer333(n)
with BarberShopBarberSolution1(n)
// To grade, we ran tests with each of these classes.
import lecture1.parallel
def numContributeTo(input: Array[Int], kernel: Array[Int], output: Array[Int], i: Int): Int =
if i < 0 || i >= output.length then
0
else if (i - kernel.length < 0) then
i+1
else if (i >= input.length) then
output.length - i
else kernel.length
def sequentialConvolve(
input: Array[Int],
kernel: Array[Int],
output: Array[Int],
from: Int,
until: Int
): Unit = {
// Approach A, as described in the clarification announcement for this
// exercise, where we treat `from` and `until` as indices on `output`
// instead of `input` as in the given code.
var iOutput = from
while iOutput < until do
var iKernel = math.max(0, iOutput - input.length + 1)
while iKernel < kernel.length && iKernel <= iOutput do
// `output` is only ever written to between the indices `from` and
// `until`, the range of `iOutput`. The indices for `input` and
// `kernel` are computed accordingly.
output(iOutput) += input(iOutput - iKernel) * kernel(iKernel)
iKernel += 1
iOutput += 1
// ALTERNATE SOLUTION: Approach B, as described in the clarification
// announcement for this exercise, which is unchanged from the given
// code, i.e. we treat `from` and `until` as indices on `input`.
var iInput = from
while iInput < until do
var iKernel = 0
while iKernel < kernel.length do
output(iInput + iKernel) += input(iInput) * kernel(iKernel)
iKernel += 1
iInput += 1
}
def parallelConvolve(
input: Array[Int],
kernel: Array[Int],
output: Array[Int],
from: Int,
until: Int,
threshold: Int
): Unit =
// Approach A, as described in the clarification announcement for this
// exercise, where we treat `from` and `until` as indices on `output`
// instead of `input` as in the given code. This does not require us to
// change anything in this function. Only receives full credit if used
// together with Approach A for `sequentialConvolve`.
if (until - from) <= threshold then
sequentialConvolve(input, kernel, output, from, until)
else
val mid = from + (until - from) / 2
parallel(
parallelConvolve(input, kernel, output, from, mid, threshold),
parallelConvolve(input, kernel, output, mid, until, threshold)
)
// ALTERNATE SOLUTION: Approach B, as described in the clarification
// announcement for this exercise, where we treat `from` and `until` as
// indices on `input` as in the given code. This requires up to leave a
// gap in the parallel calls to be filled in sequentially as described
// below. Only receives full credit if used together with Approach B for
// `sequentialConvolve`.
if (until - from) <= threshold then
sequentialConvolve(input, kernel, output, from, until)
else
val mid = from + (until - from) / 2
val gap = numContributeTo(input, kernel, output, mid)
// Leave a gap large enough that accesses to `output` from the first
// parallel call will not overlap with accesses from the second. This
// gap is of size equal to the number of elements of `input` that will
// contribute to the computation of the result at the lowest index of
// `output` in the second parallel call, namely, at index `mid`.
// However, we are careful not to access indices lower than `from`.
val gapBeforeMid = Math.max(from, mid - gap + 1)
parallel(
parallelConvolve(input, kernel, output, from, gapBeforeMid, threshold),
parallelConvolve(input, kernel, output, mid, until, threshold)
)
// Fill in the gap sequentially.
sequentialConvolve(input, kernel, output, gapBeforeMid, mid)
object Original_WithOutputRace:
def sequentialConvolve(
input: Array[Int],
kernel: Array[Int],
output: Array[Int],
from: Int,
until: Int): Unit =
var iInput = from
while iInput < until do
var iKernel = 0
while iKernel < kernel.length do
output(iInput + iKernel) += input(iInput) * kernel(iKernel)
iKernel += 1
iInput += 1
def parallelConvolve(
input: Array[Int],
kernel: Array[Int],
output: Array[Int],
from: Int,
until: Int,
threshold: Int): Unit =
if (until - from) <= threshold then
sequentialConvolve(input, kernel, output, from, until)
else
val mid = from + (until - from)/2
parallel(
parallelConvolve(input, kernel, output, from, mid, threshold),
parallelConvolve(input, kernel, output, mid, until, threshold))
import lecture1.parallel
import collection.concurrent.TrieMap
import collection.parallel.CollectionConverters.*
// Example run of `groupBy`:
case class Course(semester: String, id: String)
val cs210 = Course("BA3", "CS210")
val cs250 = Course("BA3", "CS250")
val cs206 = Course("BA4", "CS206")
val courses = Vector(cs210, cs250, cs206)
assertEquals(
courses.groupBy(_.semester),
Map(
"BA3" -> Vector(cs210, cs250),
"BA4" -> Vector(cs206)
)
)
/** Asymptotic run time: O(1) */
def appendAtKey[K, V](m: Map[K, Vector[V]], k: K, v: V): Map[K, Vector[V]] =
val prev = m.getOrElse(k, Vector.empty)
m.updated(k, prev :+ v)
// Example runs of `appendAtKey`:
assertEquals(
appendAtKey(Map(), 0, 1),
Map(0 -> Vector(1))
)
assertEquals(
appendAtKey(Map("BA3" -> Vector(cs210)), "BA3", cs250),
Map("BA3" -> List(cs210, cs250))
)
/** Asymptotic run time: O(1) */
def mergeMaps[K, V](a: Map[K, Vector[V]], b: Map[K, Vector[V]]) =
var res = a
for (k, vs) <- b do // O(1) iterations (we assume the number of keys to be O(1))
val prev = res.getOrElse(k, Vector.empty) // O(1) lookup
res = res.updated(k, prev ++ vs) // O(1) update, O(1) concatenation
res
// Example run of `mergeMaps`:
val m1 = Map(1 -> Vector(5, 7))
val m2 = Map(1 -> Vector(6), 2 -> Vector(3))
assertEquals(
mergeMaps(m1, m2),
Map(
1 -> Vector(5, 7, 6),
2 -> Vector(3)
)
)
def parGroupBy1[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
if vs.size == 1 then
Map(key(vs.head) -> Vector(vs.head))
else
val mid = vs.size / 2
val (m1, m2) = parallel(
parGroupBy1(vs.slice(0, mid), key),
parGroupBy1(vs.slice(mid, vs.size), key)
)
mergeMaps(m1, m2)
assertEquals(
parGroupBy1(courses, c => c.semester),
Map(
"BA3" -> List(cs210, cs250),
"BA4" -> List(cs206)
)
)
assertEquals(
parGroupBy1(courses, c => c.semester),
Map(
"BA3" -> List(cs210, cs250),
"BA4" -> List(cs206)
)
)
def parGroupBy2[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
if vs.size == 1 then
Map(key(vs.head) -> Vector(vs.head))
else
val (m1, m2) = parallel(
parGroupBy2(Vector(vs.head), key),
parGroupBy2(vs.tail, key)
)
mergeMaps(m1, m2)
assertEquals(
parGroupBy2(courses, c => c.semester),
Map(
"BA3" -> List(cs210, cs250),
"BA4" -> List(cs206)
)
)
assertEquals(
parGroupBy2(courses, c => c.semester),
Map(
"BA3" -> List(cs210, cs250),
"BA4" -> List(cs206)
)
)
def parGroupBy3[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
val lock = Object()
var res = Map[K, Vector[V]]()
for v <- vs.par do
lock.synchronized { res = appendAtKey(res, key(v), v) }
res
var values3 = Set[Map[String, Vector[Course]]]()
for _ <- 1 to 100000 do
values3 += parGroupBy3(courses, _.semester)
assertEquals(values3.size, 2)
values3
def parGroupBy4[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
val res = TrieMap[K, Vector[V]]()
for v <- vs.par do
val k = key(v)
val prev = res.getOrElse(k, Vector.empty)
res.update(k, prev :+ v)
res.toMap
var values4 = Set[Map[String, Vector[Course]]]()
for _ <- 1 to 100000 do
values4 += parGroupBy4(courses, _.semester)
assertEquals(values4.size, 4)
values4
def parGroupBySolution[K, V](vs: Vector[V], key: V => K): Map[K, Vector[V]] =
vs.par.aggregate(Map.empty)(
(m, v) => appendAtKey(m, key(v), v),
mergeMaps
)
var valuesSol = Set[Map[String, Vector[Course]]]()
for _ <- 1 to 100000 do
valuesSol += parGroupBySolution(courses, _.semester)
assertEquals(valuesSol.size, 1)
valuesSol
def assertEquals(a: Any, b: Any) = assert(a == b)
package concpar21final01
import scala.concurrent.duration.*
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
class Problem1Suite extends munit.FunSuite:
test(
"Retrieves grades at the end of the exam (everyone pushed something) (10pts)"
) {
class Problem1Done extends Problem1:
override def getGrade(sciper: Int): Future[Option[Grade]] =
Future {
Thread.sleep(100)
Some(Grade(sciper, sciper))
}
override def getScipers(): Future[List[Int]] =
Future {
Thread.sleep(100)
List(1, 2, 3, 4)
}
val expected: List[Grade] =
List(Grade(1, 1.0), Grade(2, 2.0), Grade(3, 3.0), Grade(4, 4.0))
(new Problem1Done).leaderboard().map { grades =>
assertEquals(grades.toSet, expected.toSet)
}
}
test("Retrieves grades mid exam (some students didn't push yet) (10pts)") {
class Problem1Partial extends Problem1:
override def getGrade(sciper: Int): Future[Option[Grade]] =
Future {
Thread.sleep(100)
if sciper % 2 == 0 then None
else Some(Grade(sciper, sciper))
}
override def getScipers(): Future[List[Int]] =
Future {
Thread.sleep(100)
List(1, 2, 3, 4)
}
val expected: List[Grade] =
List(Grade(1, 1.0), Grade(3, 3.0))
(new Problem1Partial).leaderboard().map { grades =>
assertEquals(grades.toSet, expected.toSet)
}
}
test("The output list is sorted by grade (10pts)") {
(new Problem1MockData).leaderboard().map { grades =>
assert(grades.size >= 176)
assert(grades.zipWithIndex.forall { case (g, i) =>
grades.drop(i).forall(x => g.grade >= x.grade)
})
}
}
test("GitLab API calls are done in parallel (2pts)") {
var inParallel: Boolean = false
class Problem1Par extends Problem1MockData:
var in: Boolean = false
override def getGrade(sciper: Int): Future[Option[Grade]] =
Future {
if in then inParallel = true
in = true
val out = super.getGrade(sciper)
in = false
concurrent.Await.result(out, Duration(10, SECONDS))
}
(new Problem1Par).leaderboard().map { grades =>
assert(grades.size >= 176)
assert(inParallel)
}
}
test("The IS-academia API is called exactly once (2pts)") {
var called: Int = 0
class Problem1Once extends Problem1MockData:
override def getScipers(): Future[List[Int]] =
called += 1
super.getScipers()
(new Problem1Once).leaderboard().map { grades =>
assert(grades.size >= 176)
assert(called == 1)
}
}
package concpar21final02
import akka.actor.*
import akka.testkit.*
import scala.collection.mutable
import concurrent.duration.*
import Problem2.*
class Problem2Suite extends munit.FunSuite:
import NotificationService.Protocol.*
import NotificationService.Responses.*
import DiscordChannel.Protocol.*
import DiscordChannel.Responses.*
test("Notification register (1pts)") {
new MyTestKit:
def tests() =
val actor = system.actorOf(Props[NotificationService]())
actor ! Register
expectMsg(Registered(true))
}
test("Notification register and un-register (1pts)") {
new MyTestKit:
def tests() =
val actor = system.actorOf(Props[NotificationService]())
actor ! Register
expectMsg(Registered(true))
actor ! UnRegister
expectMsg(Registered(false))
actor ! UnRegister
expectMsg(Registered(false))
actor ! Register
expectMsg(Registered(true))
actor ! UnRegister
expectMsg(Registered(false))
}
test("Notification notify (1pts)") {
new MyTestKit:
def tests() =
val actor = system.actorOf(Props[NotificationService]())
actor ! Register
expectMsg(Registered(true))
actor ! NotifyAll
expectMsg(Notification)
actor ! NotifyAll
expectMsg(Notification)
actor ! UnRegister
expectMsg(Registered(false))
actor ! NotifyAll
expectNoMessage()
actor ! Register
expectMsg(Registered(true))
actor ! NotifyAll
expectMsg(Notification)
actor ! UnRegister
expectMsg(Registered(false))
actor ! NotifyAll
expectNoMessage()
}
test("NotifyAll from other actor (1pts)") {
new MyTestKit:
def tests() =
val actor = system.actorOf(Props[NotificationService]())
val otherActor = system.actorOf(Props[DummyActor]())
def notifyFormAllFromOtherActor() =
given ActorRef = otherActor
actor ! NotifyAll
expectNoMessage()
actor ! Register
expectMsg(Registered(true))
notifyFormAllFromOtherActor()
expectMsg(Notification)
}
test("Channel init (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
expectMsg(Active)
}
test("Channel post and get post (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
expectMsg(Active)
channel ! Post("hello")
channel ! GetLastPosts(1)
expectMsg(Posts(List("hello")))
channel ! GetLastPosts(10)
expectMsg(Posts(List("hello")))
channel ! GetLastPosts(0)
expectMsg(Posts(Nil))
}
test("Channel multiple posts (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
expectMsg(Active)
channel ! Post("hello")
channel ! Post("world")
channel ! GetLastPosts(2)
channel ! GetLastPosts(1)
channel ! Post("!")
channel ! GetLastPosts(3)
expectMsg(Posts(List("world", "hello")))
expectMsg(Posts(List("world")))
expectMsg(Posts(List("!", "world", "hello")))
}
test("Channel posts and notify (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
expectMsg(Active)
notificationService ! Register
expectMsg(Registered(true))
channel ! Post("hello")
channel ! Post("world")
expectMsg(Notification)
expectMsg(Notification)
}
test("Channel init twice (1pts)") {
new MyTestKit:
def tests() =
val notificationService = system.actorOf(Props[NotificationService]())
val channel = system.actorOf(Props[DiscordChannel]())
channel ! Init(notificationService)
expectMsg(Active)
channel ! Init(notificationService)
expectMsg(AlreadyActive)
channel ! Init(notificationService)
expectMsg(AlreadyActive)
}
test("Channel not active (1pts)") {
new MyTestKit:
def tests() =
val channel1 = system.actorOf(Props[DiscordChannel]())
channel1 ! Post("hello")
expectMsg(NotActive)
val channel2 = system.actorOf(Props[DiscordChannel]())
channel2 ! GetLastPosts(0)
expectMsg(NotActive)
}
abstract class MyTestKit
extends TestKit(ActorSystem("TestSystem"))
with ImplicitSender:
def tests(): Unit
try tests()
finally shutdown(system)
class DummyActor extends Actor:
def receive: Receive = { case _ =>
()
}
package concpar21final03
import scala.annotation.tailrec
import scala.concurrent.*
import scala.concurrent.duration.*
import scala.collection.mutable.HashMap
import scala.util.Random
import instrumentation.*
import instrumentation.TestHelper.*
import instrumentation.TestUtils.*
class Problem3Suite extends munit.FunSuite:
test("Part 1: ThreadMap (3pts)") {
testManySchedules(
4,
sched =>
val tmap = new SchedulableThreadMap[Int](sched)
def writeThread(): Unit =
tmap.setCurrentThreadValue(0)
tmap.setCurrentThreadValue(-1)
val readBack = tmap.currentThreadValue
assertEquals(readBack, Some(-1))
def writeAndDeleteThread(): Unit =
tmap.setCurrentThreadValue(42)
tmap.deleteCurrentThreadValue()
@tailrec
def waitThread(): Unit =
tmap.waitForall(_ < 0)
val all = tmap.allValues
if all != List(-1) then waitThread()
val threads = List(
() => writeThread(),
() => writeAndDeleteThread(),
() => waitThread(),
() => waitThread()
)
(threads, _ => (true, ""))
)
}
test("Part 2: RCU (5pts)") {
testManySchedules(
3,
sched =>
val rcu = new SchedulableRCU(sched)
case class State(
value: Int,
isDeleted: AtomicLong = SchedulableAtomicLong(0, sched, "isDeleted")
)
val sharedState =
SchedulableAtomicReference(State(0), sched, "sharedState")
def readThread(): Unit =
rcu.startRead()
val state = sharedState.get
val stateWasDeleted = state.isDeleted.get != 0
assert(
!stateWasDeleted,
"RCU shared state deleted in the middle of a read."
)
rcu.stopRead()
def writeThread(): Unit =
val oldState = sharedState.get
sharedState.set(State(oldState.value + 1))
rcu.waitForOldReads()
oldState.isDeleted.set(1)
val threads = List(
() => readThread(),
() => readThread(),
() => writeThread()
)
(threads, _ => (true, ""))
)
}
test("Part 3: UpdateServer (2pts)") {
testManySchedules(
3,
sched =>
val fs = SchedulableInMemoryFileSystem(sched)
val server = new SchedulableUpdateServer(sched, fs)
def writeThread(): Unit =
server.newUpdate("update1.bin", "Update 1")
server.newUpdate("update2.bin", "Update 2")
assertEquals(fs.fsMap.toSet, Set("update2.bin" -> "Update 2"))
def fetchThread(): Unit =
val res = server.fetchUpdate()
assert(
List(None, Some("Update 1"), Some("Update 2")).contains(res),
s"fetchUpdate returned unexpected value $res"
)
val threads = List(
() => writeThread(),
() => fetchThread(),
() => fetchThread()
)
(threads, _ => (true, ""))
)
}
import scala.concurrent.duration.*
override val munitTimeout = 200.seconds
end Problem3Suite
package concpar21final03.instrumentation
class AtomicReference[T](initial: T):
private val atomic =
new java.util.concurrent.atomic.AtomicReference[T](initial)
def get: T = atomic.get()
def set(value: T): Unit = atomic.set(value)
package concpar21final03.instrumentation
trait MockedMonitor extends Monitor:
def scheduler: Scheduler
// Can be overriden.
override def waitDefault() =
scheduler.log("wait")
scheduler updateThreadState Wait(this, scheduler.threadLocks.tail)
override def synchronizedDefault[T](toExecute: => T): T =
scheduler.log("synchronized check")
val prevLocks = scheduler.threadLocks
scheduler updateThreadState Sync(
this,
prevLocks
) // If this belongs to prevLocks, should just continue.
scheduler.log("synchronized -> enter")
try toExecute
finally
scheduler updateThreadState Running(prevLocks)
scheduler.log("synchronized -> out")
override def notifyDefault() =
scheduler mapOtherStates { state =>
state match
case Wait(lockToAquire, locks) if lockToAquire == this =>
SyncUnique(this, state.locks)
case e => e
}
scheduler.log("notify")
override def notifyAllDefault() =
scheduler mapOtherStates { state =>
state match
case Wait(lockToAquire, locks) if lockToAquire == this =>
Sync(this, state.locks)
case SyncUnique(lockToAquire, locks) if lockToAquire == this =>
Sync(this, state.locks)
case e => e
}
scheduler.log("notifyAll")
trait LockFreeMonitor extends Monitor:
override def waitDefault() =
throw new Exception("Please use lock-free structures and do not use wait()")
override def synchronizedDefault[T](toExecute: => T): T =
throw new Exception(
"Please use lock-free structures and do not use synchronized()"
)
override def notifyDefault() =
throw new Exception(
"Please use lock-free structures and do not use notify()"
)
override def notifyAllDefault() =
throw new Exception(
"Please use lock-free structures and do not use notifyAll()"
)
abstract class ThreadState:
def locks: Seq[AnyRef]
trait CanContinueIfAcquiresLock extends ThreadState:
def lockToAquire: AnyRef
case object Start extends ThreadState:
def locks: Seq[AnyRef] = Seq.empty
case object End extends ThreadState:
def locks: Seq[AnyRef] = Seq.empty
case class Wait(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState
case class SyncUnique(lockToAquire: AnyRef, locks: Seq[AnyRef])
extends ThreadState
with CanContinueIfAcquiresLock
case class Sync(lockToAquire: AnyRef, locks: Seq[AnyRef])
extends ThreadState
with CanContinueIfAcquiresLock
case class Running(locks: Seq[AnyRef]) extends ThreadState
case class VariableReadWrite(locks: Seq[AnyRef]) extends ThreadState
package concpar21final03.instrumentation
import java.util.concurrent.*;
import scala.concurrent.duration.*
import scala.collection.mutable.*
import Stats.*
import java.util.concurrent.atomic.AtomicInteger
sealed abstract class Result
case class RetVal(rets: List[Any]) extends Result
case class Except(msg: String, stackTrace: Array[StackTraceElement])
extends Result
case class Timeout(msg: String) extends Result
/** A class that maintains schedule and a set of thread ids. The schedules are
* advanced after an operation of a SchedulableBuffer is performed. Note: the
* real schedule that is executed may deviate from the input schedule due to
* the adjustments that had to be made for locks
*/
class Scheduler(sched: List[Int]):
val maxOps =
500 // a limit on the maximum number of operations the code is allowed to perform
private var schedule = sched
private var numThreads = 0
private val realToFakeThreadId = Map[Long, Int]()
private val opLog =
ListBuffer[String]() // a mutable list (used for efficient concat)
private val threadStates = Map[Int, ThreadState]()
/** Runs a set of operations in parallel as per the schedule. Each operation
* may consist of many primitive operations like reads or writes to shared
* data structure each of which should be executed using the function `exec`.
* @timeout
* in milliseconds
* @return
* true - all threads completed on time, false -some tests timed out.
*/
def runInParallel(timeout: Long, ops: List[() => Any]): Result =
numThreads = ops.length
val threadRes = Array.fill(numThreads) { None: Any }
var exception: Option[Except] = None
val syncObject = new Object()
var completed = new AtomicInteger(0)
// create threads
val threads = ops.zipWithIndex.map { case (op, i) =>
new Thread(
new Runnable():
def run(): Unit =
val fakeId = i + 1
setThreadId(fakeId)
try
updateThreadState(Start)
val res = op()
updateThreadState(End)
threadRes(i) = res
// notify the master thread if all threads have completed
if completed.incrementAndGet() == ops.length then
syncObject.synchronized { syncObject.notifyAll() }
catch
case e: Throwable
if exception != None => // do nothing here and silently fail
case e: Throwable =>
log(s"throw ${e.toString}")
exception = Some(
Except(
s"Thread $fakeId crashed on the following schedule: \n" + opLog
.mkString("\n"),
e.getStackTrace
)
)
syncObject.synchronized { syncObject.notifyAll() }
// println(s"$fakeId: ${e.toString}")
// Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads)
)
}
// start all threads
threads.foreach(_.start())
// wait for all threads to complete, or for an exception to be thrown, or for the time out to expire
var remTime = timeout
syncObject.synchronized {
timed { if completed.get() != ops.length then syncObject.wait(timeout) } {
time => remTime -= time
}
}
if exception.isDefined then exception.get
else if remTime <= 1
then // timeout ? using 1 instead of zero to allow for some errors
Timeout(opLog.mkString("\n"))
else
// every thing executed normally
RetVal(threadRes.toList)
// Updates the state of the current thread
def updateThreadState(state: ThreadState): Unit =
val tid = threadId
synchronized {
threadStates(tid) = state
}
state match
case Sync(lockToAquire, locks) =>
if locks.indexOf(lockToAquire) < 0 then waitForTurn
else
// Re-aqcuiring the same lock
updateThreadState(Running(lockToAquire +: locks))
case Start => waitStart()
case End => removeFromSchedule(tid)
case Running(_) =>
case _ => waitForTurn // Wait, SyncUnique, VariableReadWrite
def waitStart(): Unit =
// while (threadStates.size < numThreads) {
// Thread.sleep(1)
// }
synchronized {
if threadStates.size < numThreads then wait()
else notifyAll()
}
def threadLocks =
synchronized {
threadStates(threadId).locks
}
def threadState =
synchronized {
threadStates(threadId)
}
def mapOtherStates(f: ThreadState => ThreadState) =
val exception = threadId
synchronized {
for k <- threadStates.keys if k != exception do
threadStates(k) = f(threadStates(k))
}
def log(str: String) =
if (realToFakeThreadId contains Thread.currentThread().getId()) then
val space = (" " * ((threadId - 1) * 2))
val s =
space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + " ")
opLog += s
/** Executes a read or write operation to a global data structure as per the
* given schedule
* @param msg
* a message corresponding to the operation that will be logged
*/
def exec[T](
primop: => T
)(msg: => String, postMsg: => Option[T => String] = None): T =
if !(realToFakeThreadId contains Thread.currentThread().getId()) then primop
else
updateThreadState(VariableReadWrite(threadLocks))
val m = msg
if m != "" then log(m)
if opLog.size > maxOps then
throw new Exception(
s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!"
)
val res = primop
postMsg match
case Some(m) => log(m(res))
case None =>
res
private def setThreadId(fakeId: Int) = synchronized {
realToFakeThreadId(Thread.currentThread.getId) = fakeId
}
def threadId =
try realToFakeThreadId(Thread.currentThread().getId())
catch
case e: NoSuchElementException =>
throw new Exception(
"You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!"
)
private def isTurn(tid: Int) = synchronized {
(!schedule.isEmpty && schedule.head != tid)
}
def canProceed(): Boolean =
val tid = threadId
canContinue match
case Some((i, state)) if i == tid =>
// println(s"$tid: Runs ! Was in state $state")
canContinue = None
state match
case Sync(lockToAquire, locks) =>
updateThreadState(Running(lockToAquire +: locks))
case SyncUnique(lockToAquire, locks) =>
mapOtherStates {
_ match
case SyncUnique(lockToAquire2, locks2)
if lockToAquire2 == lockToAquire =>
Wait(lockToAquire2, locks2)
case e => e
}
updateThreadState(Running(lockToAquire +: locks))
case VariableReadWrite(locks) => updateThreadState(Running(locks))
true
case Some((i, state)) =>
// println(s"$tid: not my turn but $i !")
false
case None =>
false
var threadPreference =
0 // In the case the schedule is over, which thread should have the preference to execute.
/** returns true if the thread can continue to execute, and false otherwise */
def decide(): Option[(Int, ThreadState)] =
if !threadStates.isEmpty
then // The last thread who enters the decision loop takes the decision.
// println(s"$threadId: I'm taking a decision")
if threadStates.values.forall {
case e: Wait => true
case _ => false
}
then
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
throw new Exception(
s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them."
)
else
// Threads can be in Wait, Sync, SyncUnique, and VariableReadWrite mode.
// Let's determine which ones can continue.
val notFree =
threadStates.collect { case (id, state) => state.locks }.flatten.toSet
val threadsNotBlocked = threadStates.toSeq.filter {
case (id, v: VariableReadWrite) => true
case (id, v: CanContinueIfAcquiresLock) =>
!notFree(v.lockToAquire) || (v.locks contains v.lockToAquire)
case _ => false
}
if threadsNotBlocked.isEmpty then
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if threadStates.size > 1 then "s" else ""
val are = if threadStates.size > 1 then "are" else "is"
val whoHasLock = threadStates.toSeq.flatMap { case (id, state) =>
state.locks.map(lock => (lock, id))
}.toMap
val reason = threadStates
.collect {
case (id, state: CanContinueIfAcquiresLock)
if !notFree(state.lockToAquire) =>
s"Thread $id is waiting on lock ${state.lockToAquire} held by thread ${whoHasLock(state.lockToAquire)}"
}
.mkString("\n")
throw new Exception(
s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason"
)
else if threadsNotBlocked.size == 1
then // Do not consume the schedule if only one thread can execute.
Some(threadsNotBlocked(0))
else
val next = schedule.indexWhere(t =>
threadsNotBlocked.exists { case (id, state) => id == t }
)
if next != -1 then
// println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}")
val chosenOne = schedule(
next
) // TODO: Make schedule a mutable list.
schedule = schedule.take(next) ++ schedule.drop(next + 1)
Some((chosenOne, threadStates(chosenOne)))
else
threadPreference = (threadPreference + 1) % threadsNotBlocked.size
val chosenOne = threadsNotBlocked(
threadPreference
) // Maybe another strategy
Some(chosenOne)
// threadsNotBlocked.indexOf(threadId) >= 0
/*
val tnb = threadsNotBlocked.map(_._1).mkString(",")
val s = if (schedule.isEmpty) "empty" else schedule.mkString(",")
val only = if (schedule.isEmpty) "" else " only"
throw new Exception(s"The schedule is $s but$only threads ${tnb} can continue")*/
else canContinue
/** This will be called before a schedulable operation begins. This should not
* use synchronized
*/
var numThreadsWaiting = new AtomicInteger(0)
// var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice.
var canContinue: Option[(Int, ThreadState)] =
None // The result of the decision thread Id of the thread authorized to continue.
private def waitForTurn =
synchronized {
if numThreadsWaiting.incrementAndGet() == threadStates.size then
canContinue = decide()
notifyAll()
// waitingForDecision(threadId) = Some(numThreadsWaiting)
// println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}")
while !canProceed() do wait()
}
numThreadsWaiting.decrementAndGet()
/** To be invoked when a thread is about to complete
*/
private def removeFromSchedule(fakeid: Int) = synchronized {
// println(s"$fakeid: I'm taking a decision because I finished")
schedule = schedule.filterNot(_ == fakeid)
threadStates -= fakeid
if numThreadsWaiting.get() == threadStates.size then
canContinue = decide()
notifyAll()
}
def getOperationLog() = opLog
/* Copyright 2009-2015 EPFL, Lausanne */
package concpar21final03.instrumentation
import java.lang.management.*
/** A collection of methods that can be used to collect run-time statistics */
object Stats:
def timed[T](code: => T)(cont: Long => Unit): T =
var t1 = System.currentTimeMillis()
val r = code
cont((System.currentTimeMillis() - t1))
r
def withTime[T](code: => T): (T, Long) =
var t1 = System.currentTimeMillis()
val r = code
(r, (System.currentTimeMillis() - t1))
package concpar21final03.instrumentation
import scala.util.Random
import scala.collection.mutable.{Map as MutableMap}
import Stats.*
object TestHelper:
val noOfSchedules = 10000 // set this to 100k during deployment
val readWritesPerThread =
20 // maximum number of read/writes possible in one thread
val contextSwitchBound = 10
val testTimeout = 150 // the total time out for a test in seconds
val schedTimeout =
15 // the total time out for execution of a schedule in secs
// Helpers
/*def testManySchedules(op1: => Any): Unit = testManySchedules(List(() => op1))
def testManySchedules(op1: => Any, op2: => Any): Unit = testManySchedules(List(() => op1, () => op2))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any, op4: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3, () => op4))*/
def testSequential[T](
ops: Scheduler => Any
)(assertions: T => (Boolean, String)) =
testManySchedules(
1,
(sched: Scheduler) =>
(
List(() => ops(sched)),
(res: List[Any]) => assertions(res.head.asInstanceOf[T])
)
)
/** @numThreads
* number of threads
* @ops
* operations to be executed, one per thread
* @assertion
* as condition that will executed after all threads have completed
* (without exceptions) the arguments are the results of the threads
*/
def testManySchedules(
numThreads: Int,
ops: Scheduler => (
List[() => Any], // Threads
List[Any] => (Boolean, String)
) // Assertion
) =
var timeout = testTimeout * 1000L
val threadIds = (1 to numThreads)
// (1 to scheduleLength).flatMap(_ => threadIds).toList.permutations.take(noOfSchedules).foreach {
val schedules = (new ScheduleGenerator(numThreads)).schedules()
var schedsExplored = 0
schedules
.takeWhile(_ => schedsExplored <= noOfSchedules && timeout > 0)
.foreach {
// case _ if timeout <= 0 => // break
case schedule =>
schedsExplored += 1
val schedr = new Scheduler(schedule)
// println("Exploring Sched: "+schedule)
val (threadOps, assertion) = ops(schedr)
if threadOps.size != numThreads then
throw new IllegalStateException(
s"Number of threads: $numThreads, do not match operations of threads: $threadOps"
)
timed { schedr.runInParallel(schedTimeout * 1000, threadOps) } { t =>
timeout -= t
} match
case Timeout(msg) =>
throw new java.lang.AssertionError(
"assertion failed\n" + "The schedule took too long to complete. A possible deadlock! \n" + msg
)
case Except(msg, stkTrace) =>
val traceStr = "Thread Stack trace: \n" + stkTrace
.map(" at " + _.toString)
.mkString("\n")
throw new java.lang.AssertionError(
"assertion failed\n" + msg + "\n" + traceStr
)
case RetVal(threadRes) =>
// check the assertion
val (success, custom_msg) = assertion(threadRes)
if !success then
val msg =
"The following schedule resulted in wrong results: \n" + custom_msg + "\n" + schedr
.getOperationLog()
.mkString("\n")
throw new java.lang.AssertionError("Assertion failed: " + msg)
}
if timeout <= 0 then
throw new java.lang.AssertionError(
"Test took too long to complete! Cannot check all schedules as your code is too slow!"
)
/** A schedule generator that is based on the context bound
*/
class ScheduleGenerator(numThreads: Int):
val scheduleLength = readWritesPerThread * numThreads
val rands =
(1 to scheduleLength).map(i =>
new Random(0xcafe * i)
) // random numbers for choosing a thread at each position
def schedules(): LazyList[List[Int]] =
var contextSwitches = 0
var contexts =
List[Int]() // a stack of thread ids in the order of context-switches
val remainingOps = MutableMap[Int, Int]()
remainingOps ++= (1 to numThreads).map(i =>
(i, readWritesPerThread)
) // num ops remaining in each thread
val liveThreads = (1 to numThreads).toSeq.toBuffer
/** Updates remainingOps and liveThreads once a thread is chosen for a
* position in the schedule
*/
def updateState(tid: Int): Unit =
val remOps = remainingOps(tid)
if remOps == 0 then liveThreads -= tid
else remainingOps += (tid -> (remOps - 1))
val schedule = rands.foldLeft(List[Int]()) {
case (acc, r) if contextSwitches < contextSwitchBound =>
val tid = liveThreads(r.nextInt(liveThreads.size))
contexts match
case prev :: tail
if prev != tid => // we have a new context switch here
contexts +:= tid
contextSwitches += 1
case prev :: tail =>
case _ => // init case
contexts +:= tid
updateState(tid)
acc :+ tid
case (
acc,
_
) => // here context-bound has been reached so complete the schedule without any more context switches
if !contexts.isEmpty then
contexts = contexts.dropWhile(remainingOps(_) == 0)
val tid = contexts match
case top :: tail => top
case _ =>
liveThreads(
0
) // here, there has to be threads that have not even started
updateState(tid)
acc :+ tid
}
schedule #:: schedules()