Skip to content
Snippets Groups Projects
Scheduler.scala 11.7 KiB
Newer Older
Matt Bovel's avatar
Matt Bovel committed
package instrumentation

Matt Bovel's avatar
Matt Bovel committed
import java.util.concurrent.*;
import scala.concurrent.duration.*
import scala.collection.mutable.*
import Stats.*
Matt Bovel's avatar
Matt Bovel committed

import java.util.concurrent.atomic.AtomicInteger

sealed abstract class Result
case class RetVal(rets: List[Any]) extends Result
Matt Bovel's avatar
Matt Bovel committed
case class Except(msg: String, stackTrace: Array[StackTraceElement])
    extends Result
Matt Bovel's avatar
Matt Bovel committed
case class Timeout(msg: String) extends Result

Matt Bovel's avatar
Matt Bovel committed
/** A class that maintains schedule and a set of thread ids. The schedules are
  * advanced after an operation of a SchedulableBuffer is performed. Note: the
  * real schedule that is executed may deviate from the input schedule due to
  * the adjustments that had to be made for locks
  */
Matt Bovel's avatar
Matt Bovel committed
class Scheduler(sched: List[Int]):
Matt Bovel's avatar
Matt Bovel committed
  val maxOps =
    500 // a limit on the maximum number of operations the code is allowed to perform
Matt Bovel's avatar
Matt Bovel committed

  private var schedule = sched
  private var numThreads = 0
  private val realToFakeThreadId = Map[Long, Int]()
Matt Bovel's avatar
Matt Bovel committed
  private val opLog =
    ListBuffer[String]() // a mutable list (used for efficient concat)
Matt Bovel's avatar
Matt Bovel committed
  private val threadStates = Map[Int, ThreadState]()

Matt Bovel's avatar
Matt Bovel committed
  /** Runs a set of operations in parallel as per the schedule. Each operation
    * may consist of many primitive operations like reads or writes to shared
    * data structure each of which should be executed using the function `exec`.
    * @timeout
    *   in milliseconds
    * @return
    *   true - all threads completed on time, false -some tests timed out.
    */
Matt Bovel's avatar
Matt Bovel committed
  def runInParallel(timeout: Long, ops: List[() => Any]): Result =
    numThreads = ops.length
    val threadRes = Array.fill(numThreads) { None: Any }
    var exception: Option[Except] = None
    val syncObject = new Object()
    var completed = new AtomicInteger(0)
    // create threads
    val threads = ops.zipWithIndex.map {
      case (op, i) =>
Matt Bovel's avatar
Matt Bovel committed
        new Thread(new Runnable():
          def run(): Unit =
Matt Bovel's avatar
Matt Bovel committed
            val fakeId = i + 1
            setThreadId(fakeId)
Matt Bovel's avatar
Matt Bovel committed
            try
Matt Bovel's avatar
Matt Bovel committed
              updateThreadState(Start)
              val res = op()
              updateThreadState(End)
              threadRes(i) = res
              // notify the main thread if all threads have completed
Matt Bovel's avatar
Matt Bovel committed
              if completed.incrementAndGet() == ops.length then
Matt Bovel's avatar
Matt Bovel committed
                syncObject.synchronized { syncObject.notifyAll() }
Matt Bovel's avatar
Matt Bovel committed
            catch
              case e: Throwable
                  if exception != None => // do nothing here and silently fail
Matt Bovel's avatar
Matt Bovel committed
              case e: Throwable =>
                log(s"throw ${e.toString}")
Matt Bovel's avatar
Matt Bovel committed
                exception = Some(Except(
                  s"Thread $fakeId crashed on the following schedule: \n" + opLog.mkString(
                    "\n"
                  ),
                  e.getStackTrace
                ))
Matt Bovel's avatar
Matt Bovel committed
                syncObject.synchronized { syncObject.notifyAll() }
Matt Bovel's avatar
Matt Bovel committed
            // println(s"$fakeId: ${e.toString}")
            // Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads)
        )
Matt Bovel's avatar
Matt Bovel committed
    }
    // start all threads
    threads.foreach(_.start())
    // wait for all threads to complete, or for an exception to be thrown, or for the time out to expire
    var remTime = timeout
    syncObject.synchronized {
Matt Bovel's avatar
Matt Bovel committed
      timed { if completed.get() != ops.length then syncObject.wait(timeout) } {
        time => remTime -= time
      }
Matt Bovel's avatar
Matt Bovel committed
    }
    if exception.isDefined then
      exception.get
Matt Bovel's avatar
Matt Bovel committed
    else if remTime <= 1
    then // timeout ? using 1 instead of zero to allow for some errors
Matt Bovel's avatar
Matt Bovel committed
      Timeout(opLog.mkString("\n"))
    else
      // every thing executed normally
      RetVal(threadRes.toList)

  // Updates the state of the current thread
  def updateThreadState(state: ThreadState): Unit =
    val tid = threadId
    synchronized {
      threadStates(tid) = state
    }
    state match
      case Sync(lockToAquire, locks) =>
Matt Bovel's avatar
Matt Bovel committed
        if locks.indexOf(lockToAquire) < 0 then waitForTurn
        else
Matt Bovel's avatar
Matt Bovel committed
          // Re-aqcuiring the same lock
          updateThreadState(Running(lockToAquire +: locks))
      case Start      => waitStart()
      case End        => removeFromSchedule(tid)
      case Running(_) =>
      case _          => waitForTurn // Wait, SyncUnique, VariableReadWrite

  def waitStart(): Unit =
Matt Bovel's avatar
Matt Bovel committed
    // while (threadStates.size < numThreads) {
    // Thread.sleep(1)
    // }
Matt Bovel's avatar
Matt Bovel committed
    synchronized {
      if threadStates.size < numThreads then
        wait()
      else
        notifyAll()
    }

  def threadLocks =
    synchronized {
      threadStates(threadId).locks
    }

  def threadState =
    synchronized {
      threadStates(threadId)
    }

  def mapOtherStates(f: ThreadState => ThreadState) =
    val exception = threadId
    synchronized {
      for k <- threadStates.keys if k != exception do
        threadStates(k) = f(threadStates(k))
    }

  def log(str: String) =
    if (realToFakeThreadId contains Thread.currentThread().getId()) then
      val space = (" " * ((threadId - 1) * 2))
Matt Bovel's avatar
Matt Bovel committed
      val s =
        space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + "  ")
      // println(s)
Matt Bovel's avatar
Matt Bovel committed
      opLog += s

Matt Bovel's avatar
Matt Bovel committed
  /** Executes a read or write operation to a global data structure as per the
    * given schedule
    * @param msg
    *   a message corresponding to the operation that will be logged
    */
  def exec[T](primop: => T)(
      msg: => String,
      postMsg: => Option[T => String] = None
  ): T =
    if !(realToFakeThreadId contains Thread.currentThread().getId()) then
Matt Bovel's avatar
Matt Bovel committed
      primop
    else
      updateThreadState(VariableReadWrite(threadLocks))
      val m = msg
      if m != "" then log(m)
      if opLog.size > maxOps then
Matt Bovel's avatar
Matt Bovel committed
        throw new Exception(
          s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!"
        )
Matt Bovel's avatar
Matt Bovel committed
      val res = primop
      postMsg match
        case Some(m) => log(m(res))
Matt Bovel's avatar
Matt Bovel committed
        case None    =>
Matt Bovel's avatar
Matt Bovel committed
      res

  private def setThreadId(fakeId: Int) = synchronized {
    realToFakeThreadId(Thread.currentThread.getId) = fakeId
  }

  def threadId =
    try
      realToFakeThreadId(Thread.currentThread().getId())
    catch
Matt Bovel's avatar
Matt Bovel committed
      case e: NoSuchElementException =>
        throw new Exception(
          "You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!"
        )
Matt Bovel's avatar
Matt Bovel committed

  private def isTurn(tid: Int) = synchronized {
    (!schedule.isEmpty && schedule.head != tid)
  }

  def canProceed(): Boolean =
    val tid = threadId
    canContinue match
      case Some((i, state)) if i == tid =>
Matt Bovel's avatar
Matt Bovel committed
        // println(s"$tid: Runs ! Was in state $state")
Matt Bovel's avatar
Matt Bovel committed
        canContinue = None
        state match
Matt Bovel's avatar
Matt Bovel committed
          case Sync(lockToAquire, locks) =>
            updateThreadState(Running(lockToAquire +: locks))
Matt Bovel's avatar
Matt Bovel committed
          case SyncUnique(lockToAquire, locks) =>
            mapOtherStates {
              _ match
Matt Bovel's avatar
Matt Bovel committed
                case SyncUnique(lockToAquire2, locks2)
                    if lockToAquire2 == lockToAquire =>
                  Wait(lockToAquire2, locks2)
Matt Bovel's avatar
Matt Bovel committed
                case e => e
            }
            updateThreadState(Running(lockToAquire +: locks))
          case VariableReadWrite(locks) => updateThreadState(Running(locks))
        true
      case Some((i, state)) =>
Matt Bovel's avatar
Matt Bovel committed
        // println(s"$tid: not my turn but $i !")
Matt Bovel's avatar
Matt Bovel committed
        false
      case None =>
        false

Matt Bovel's avatar
Matt Bovel committed
  var threadPreference =
    0 // In the case the schedule is over, which thread should have the preference to execute.
Matt Bovel's avatar
Matt Bovel committed

  /** returns true if the thread can continue to execute, and false otherwise */
  def decide(): Option[(Int, ThreadState)] =
Matt Bovel's avatar
Matt Bovel committed
    if !threadStates.isEmpty
    then // The last thread who enters the decision loop takes the decision.
      // println(s"$threadId: I'm taking a decision")
      if threadStates.values.forall {
          case e: Wait => true
          case _       => false
        }
      then
Matt Bovel's avatar
Matt Bovel committed
        val waiting = threadStates.keys.map(_.toString).mkString(", ")
        val s = if threadStates.size > 1 then "s" else ""
        val are = if threadStates.size > 1 then "are" else "is"
Matt Bovel's avatar
Matt Bovel committed
        throw new Exception(
          s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them."
        )
Matt Bovel's avatar
Matt Bovel committed
      else
        // Threads can be in Wait, Sync, SyncUnique, and VariableReadWrite mode.
        // Let's determine which ones can continue.
Matt Bovel's avatar
Matt Bovel committed
        val notFree = threadStates.collect { case (id, state) =>
          state.locks
        }.flatten.toSet
Matt Bovel's avatar
Matt Bovel committed
        val threadsNotBlocked = threadStates.toSeq.filter {
Matt Bovel's avatar
Matt Bovel committed
          case (id, v: VariableReadWrite) => true
          case (id, v: CanContinueIfAcquiresLock) =>
            !notFree(v.lockToAquire) || (v.locks contains v.lockToAquire)
          case _ => false
Matt Bovel's avatar
Matt Bovel committed
        }
        if threadsNotBlocked.isEmpty then
          val waiting = threadStates.keys.map(_.toString).mkString(", ")
          val s = if threadStates.size > 1 then "s" else ""
          val are = if threadStates.size > 1 then "are" else "is"
Matt Bovel's avatar
Matt Bovel committed
          val whoHasLock = threadStates.toSeq.flatMap { case (id, state) =>
            state.locks.map(lock => (lock, id))
          }.toMap
Matt Bovel's avatar
Matt Bovel committed
          val reason = threadStates.collect {
Matt Bovel's avatar
Matt Bovel committed
            case (id, state: CanContinueIfAcquiresLock)
                if !notFree(state.lockToAquire) =>
Matt Bovel's avatar
Matt Bovel committed
              s"Thread $id is waiting on lock ${state.lockToAquire} held by thread ${whoHasLock(state.lockToAquire)}"
          }.mkString("\n")
Matt Bovel's avatar
Matt Bovel committed
          throw new Exception(
            s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason"
          )
        else if threadsNotBlocked.size == 1
        then // Do not consume the schedule if only one thread can execute.
Matt Bovel's avatar
Matt Bovel committed
          Some(threadsNotBlocked(0))
        else
Matt Bovel's avatar
Matt Bovel committed
          val next = schedule.indexWhere(t =>
            threadsNotBlocked.exists { case (id, state) => id == t }
          )
Matt Bovel's avatar
Matt Bovel committed
          if next != -1 then
Matt Bovel's avatar
Matt Bovel committed
            // println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}")
            val chosenOne =
              schedule(next) // TODO: Make schedule a mutable list.
Matt Bovel's avatar
Matt Bovel committed
            schedule = schedule.take(next) ++ schedule.drop(next + 1)
            Some((chosenOne, threadStates(chosenOne)))
          else
            threadPreference = (threadPreference + 1) % threadsNotBlocked.size
Matt Bovel's avatar
Matt Bovel committed
            val chosenOne =
              threadsNotBlocked(threadPreference) // Maybe another strategy
Matt Bovel's avatar
Matt Bovel committed
            Some(chosenOne)
Matt Bovel's avatar
Matt Bovel committed
            // threadsNotBlocked.indexOf(threadId) >= 0
Matt Bovel's avatar
Matt Bovel committed
            /*
            val tnb = threadsNotBlocked.map(_._1).mkString(",")
            val s = if (schedule.isEmpty) "empty" else schedule.mkString(",")
            val only = if (schedule.isEmpty) "" else " only"
            throw new Exception(s"The schedule is $s but$only threads ${tnb} can continue")*/
    else canContinue

Matt Bovel's avatar
Matt Bovel committed
  /** This will be called before a schedulable operation begins. This should not
    * use synchronized
    */
Matt Bovel's avatar
Matt Bovel committed
  var numThreadsWaiting = new AtomicInteger(0)
Matt Bovel's avatar
Matt Bovel committed
  // var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice.
  var canContinue: Option[(Int, ThreadState)] =
    None // The result of the decision thread Id of the thread authorized to continue.
Matt Bovel's avatar
Matt Bovel committed
  private def waitForTurn =
    synchronized {
      if numThreadsWaiting.incrementAndGet() == threadStates.size then
        canContinue = decide()
        notifyAll()
Matt Bovel's avatar
Matt Bovel committed
      // waitingForDecision(threadId) = Some(numThreadsWaiting)
      // println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}")
Matt Bovel's avatar
Matt Bovel committed
      while !canProceed() do wait()
    }
    numThreadsWaiting.decrementAndGet()

Matt Bovel's avatar
Matt Bovel committed
  /** To be invoked when a thread is about to complete
    */
Matt Bovel's avatar
Matt Bovel committed
  private def removeFromSchedule(fakeid: Int) = synchronized {
Matt Bovel's avatar
Matt Bovel committed
    // println(s"$fakeid: I'm taking a decision because I finished")
Matt Bovel's avatar
Matt Bovel committed
    schedule = schedule.filterNot(_ == fakeid)
    threadStates -= fakeid
    if numThreadsWaiting.get() == threadStates.size then
      canContinue = decide()
      notifyAll()
  }

  def getOperationLog() = opLog