Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • lara/cs206-demos
  • gcharles/cs206-demos
  • gambhir/cs206-demos
3 results
Show changes
Showing
with 547 additions and 46 deletions
// Lecture 1 : 14/15
package lecture1;
class ExampleThread extends Thread {
int sleepiness;
public void run() {
int i = 0;
while (i < 8) {
System.out.println("Thread" + getName() + " has counter " + i);
i++;
try {sleep(sleepiness);}
catch (InterruptedException e) {
System.out.println("Thread was interrupted.");
}
}
}
public static void main(String[] args) {
ExampleThread t1 = new ExampleThread();
t1.sleepiness = 2;
ExampleThread t2 = new ExampleThread();
t2.sleepiness = 3;
System.out.println("Little threads did not start yet!");
t1.start();
t2.start();
System.out.println("Parent thread running!");
try {
sleep(29);
} catch (InterruptedException e) {
System.out.println("Parent thread was interrupted.");
}
System.out.println("Parent thread running!");
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
System.out.println("A thread was interruped!");
}
System.out.println("Done executing thread.");
}
}
// PDF: https://moodle.epfl.ch/pluginfile.php/3175938/mod_folder/content/0/week01-5-First-Class-Tasks.pdf
// Video: https://mediaspace.epfl.ch/playlist/dedicated/31866/0_c46icdbx/0_v7c766xv
// Slides: 7
package lecture1
import lecture1.{task, Task}
def parallel[A, B](taskA: => A, taskB: => B): (A, B) =
val tB: Task[B] = task {
taskB
}
val tA: A = taskA
(tA, tB.join)
\ No newline at end of file
(tA, tB.join)
package lecture1
def task[A](c: => A) : Task[A] = ???
trait Task[A] :
def join: A
\ No newline at end of file
package lecture13
import concurrent.{ExecutionContext, Future, Promise}
import concurrent.duration.DurationInt
import akka.actor.{Actor, ActorContext, ActorRef, ActorSystem, Props}
import akka.event.LoggingReceive
import akka.pattern.pipe
import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout
/** This demonstrates a simplified implementation of the ask pattern.
*
* @param promise
* the promise to be completed when a message is received
*/
class AskMiniActor(promise: Promise[Any]) extends Actor:
def receive = LoggingReceive {
case msg => promise.success(msg)
}
extension (receiver: ActorRef)
def ?(msg: Any)(using
// In this simplified implementation, we don't use the timeout.
timeout: Timeout,
// This only used for logging purposes (and to get the actor system in the
// real implementation), but is not used otherwise in the implementation.
sender: ActorRef,
// In the real implementation, the actor system is retrieved differently,
// but here we pass it as an additional implicit parameter for simplicity.
context: ActorContext
): Future[Any] =
context.system.log.debug(
s"Create mini actor to ask $msg from $sender to $receiver"
)
// Create a `Promise` that will be completed when a message is received.
val promise = Promise[Any]()
// Create a mini actor that will complete the `Promise` when it receives a
// message.
val miniActor = context.system.actorOf(Props(AskMiniActor(promise)))
// Send the message to the mini actor, *using the mini actor as the sender*.
// This part is important as it is the mini actor that needs to receive the
// response.
receiver.tell(msg, miniActor)
// Using the `Promise` from the Scala standard library, the corresponding
// `Future` can be retrieved with the method `future`.
promise.future
object Person:
enum Protocol:
case GetName
case GetAge
enum Response:
case Name(name: String)
case Age(age: Int)
case UnknownMessage
class Person(name: String, age: Int) extends Actor:
import Person.Protocol.*
import Person.Response.*
def receive = LoggingReceive {
case GetName => sender() ! Name(name)
case GetAge => sender() ! Age(age)
case _ => sender() ! UnknownMessage
}
object PersonsDatabase:
enum Protocol:
case CreatePerson(name: String, age: Int)
case GetPersonNames(ids: List[Int])
enum Response:
case PersonCreated(id: Int)
case PersonNames(names: List[String])
class PersonsDatabase extends Actor:
import Person.Protocol.*
import Person.Response.*
import PersonsDatabase.Protocol.*
import PersonsDatabase.Response.*
var persons: Map[Int, ActorRef] = Map.empty
var maxId = 0
given ExecutionContext = context.system.dispatcher
given Timeout = Timeout(200.millis)
def receive = LoggingReceive {
case CreatePerson(name, age) =>
val personRef = context.actorOf(Props(Person(name, age)))
persons = persons + (maxId -> personRef)
sender() ! PersonCreated(maxId)
maxId += 1
case GetPersonNames(ids) =>
// We ask every person for their name using the ask pattern. This
// gives us a list of `Future`s that will each be completed with a
// `Name` message.
val rawResponsesFutures: List[Future[Any]] =
ids.map(id => (persons(id) ? GetName))
// We first map each `Future[Any]` to a `Future[Name]` using the
// `mapTo` method. Then, we map each `Future[Name]` to a
// `Future[String]` using the `map` method.
val namesFutures: List[Future[String]] =
rawResponsesFutures.map(_.mapTo[Name].map(_.name))
// We use the `Future.sequence` method to convert a
// `List[Future[String]]` to a `Future[List[String]]`. The resulting
// future will be completed once all the futures in the input list
// are completed.
val futureOfNames: Future[List[String]] = Future.sequence(namesFutures)
// Finally, map the `Future[List[String]]` to a `Future[PersonNames]` and
// pipe it to the sender of the `GetPersonNames` message.
futureOfNames.map(PersonNames.apply).pipeTo(sender())
}
@main def askPatternDemo() =
new TestKit(ActorSystem("DebugSystem")) with ImplicitSender:
import Person.Protocol.*
import Person.Response.*
import PersonsDatabase.Protocol.*
import PersonsDatabase.Response.*
try
val personsDb = system.actorOf(Props(PersonsDatabase()))
personsDb ! CreatePerson("Anita", 20)
expectMsg(PersonCreated(0))
personsDb ! CreatePerson("Berta", 30)
expectMsg(PersonCreated(1))
personsDb ! CreatePerson("Cecilia", 40)
expectMsg(PersonCreated(2))
personsDb ! GetPersonNames(List(0, 1, 2))
expectMsg(PersonNames(List("Anita", "Berta", "Cecilia")))
finally shutdown(system)
package lecture13
import akka.actor.{Actor, ActorContext, ActorRef, ActorSystem, Props}
import akka.event.LoggingReceive
import akka.testkit.{ImplicitSender, TestKit}
class CounterRight extends Actor:
def counter(n: Int): Receive = {
case "increment" => context.become(counter(n + 1), discardOld = false)
// ^^^^^^^^^^^^^^^^^^
// This is needed.
case "get" => sender() ! n
case "decrement" => if n != 0 then context.unbecome()
}
def receive = counter(0)
class CounterWrong extends Actor:
def counter(n: Int): Receive = {
case "increment" => context.become(counter(n + 1))
case "get" => sender() ! n
case "decrement" => if n != 0 then context.unbecome()
}
def receive = counter(0)
@main def becomeDemo() =
new TestKit(ActorSystem("DebugSystem")) with ImplicitSender:
try
val counter = system.actorOf(Props(CounterRight()), "counter")
counter ! "increment"
counter ! "get"
expectMsg(1)
counter ! "increment"
counter ! "get"
expectMsg(2)
counter ! "decrement"
counter ! "get"
expectMsg(1)
// This is wrong if we use CounterWrong because it doesn't set the
// discardOld parameter to false. Therefore, it will not remember the
// previous state and reset the behavior to the initial one, which is
// `counter(0)`.
counter ! "decrement"
counter ! "get"
expectMsg(0)
finally shutdown(system)
// PDF: https://moodle.epfl.ch/pluginfile.php/3175938/mod_folder/content/0/week03-4-Scala-Parallel-Collections.pdf
// Video: https://mediaspace.epfl.ch/playlist/dedicated/31866/0_icv10qux/0_h4jt9i7k
// Slides: 27
package lecture3
import scala.collection.parallel.CollectionConverters.IterableIsParallelizable
import scala.collection.parallel.immutable.ParSet
// Note: we don't use `GenSet` because it is deprecated since Scala 2.13.
def parIntersectionWrong(a: ParSet[Int], b: ParSet[Int]) =
val result = collection.mutable.Set[Int]()
for x <- a do
if b.contains(x) then
result += x
result
@main def intersectionWrong =
val a = (0 until 10000).toSet
val b = (0 until 10000 by 4).toSet
val seqRes = a.intersect(b)
val parRes = parIntersectionWrong(a.par.toSet, b.par.toSet)
assert(seqRes.size == 2500)
assert(parRes.size != 2500)
// PDF: https://moodle.epfl.ch/pluginfile.php/3175938/mod_folder/content/0/week03-4-Scala-Parallel-Collections.pdf
// Video: https://mediaspace.epfl.ch/playlist/dedicated/31866/0_icv10qux/0_h4jt9i7k
// Slides: 28
package lecture3
import scala.collection.parallel.CollectionConverters.IterableIsParallelizable
import scala.collection.parallel.immutable.ParSet
import java.util.concurrent.ConcurrentSkipListSet
// Note: we don't use `GenSet` because it is deprecated since Scala 2.13.
def parIntersectionCorrect(a: ParSet[Int], b: ParSet[Int]) =
val result = new ConcurrentSkipListSet[Int]()
for x <- a do
if b.contains(x) then
result.add(x)
result
@main def intersectionCorrect =
val a = (0 until 10000).toSet
val b = (0 until 10000 by 4).toSet
val seqRes = a.intersect(b)
val parRes = parIntersectionCorrect(a.par.toSet, b.par.toSet)
assert(seqRes.size == 2500)
assert(parRes.size == 2500)
// PDF: https://moodle.epfl.ch/pluginfile.php/3175938/mod_folder/content/0/week03-4-Scala-Parallel-Collections.pdf
// Video: https://mediaspace.epfl.ch/playlist/dedicated/31866/0_icv10qux/0_h4jt9i7k
// Slides: 29
package lecture3
import scala.collection.parallel.CollectionConverters.IterableIsParallelizable
import scala.collection.parallel.immutable.ParSet
import java.util.concurrent.ConcurrentSkipListSet
// Note: we don't use `GenSet` because it is deprecated since Scala 2.13.
def intersectionParNoSideEffect(a: ParSet[Int], b: ParSet[Int]) =
if a.size < b.size then a.filter(b(_))
else b.filter(a(_))
@main def intersectionNoSideEffect =
val a = (0 until 10000).toSet
val b = (0 until 10000 by 4).toSet
val seqRes = a.intersect(b)
val parRes = intersectionParNoSideEffect(a.par.toSet, b.par.toSet)
assert(seqRes.size == 2500)
assert(parRes.size == 2500)
// PDF: https://moodle.epfl.ch/pluginfile.php/3175938/mod_folder/content/0/week03-4-Scala-Parallel-Collections.pdf
// Video: https://mediaspace.epfl.ch/playlist/dedicated/31866/0_icv10qux/0_h4jt9i7k
// Slides: 31
package lecture3
import scala.collection.*
import scala.collection.parallel.CollectionConverters.MapIsParallelizable
@main def parallelGraphContraction =
val graph = mutable.Map[Int, Int]() ++= (0 until 100000).map(i => (i, i + 1))
graph(graph.size - 1) = 0
for (k, v) <- graph.par do
graph(k) = graph(v)
val violation = graph.find {
case (i, v) => v != (i + 2) % graph.size
}
println(s"violation: $violation")
// PDF: https://moodle.epfl.ch/pluginfile.php/3175938/mod_folder/content/0/week03-4-Scala-Parallel-Collections.pdf
// Video: https://mediaspace.epfl.ch/playlist/dedicated/31866/0_icv10qux/0_h4jt9i7k
// Slides: 31
package lecture3
import scala.collection.*
import scala.collection.parallel.CollectionConverters.MapIsParallelizable
@main def parallelGraphContractionCorrect =
val graph =
concurrent.TrieMap[Int, Int]() ++= (0 until 100000).map(i => (i, i + 1))
graph(graph.size - 1) = 0
var previous = graph.snapshot()
for (k, v) <- graph.par do graph(k) = previous(v)
val violation = graph.find {
case (i, v) => v != (i + 2) % graph.size
}
println(s"violation: $violation")
package lecture6
import java.util.concurrent.ForkJoinPool
// Run using `running lecture6.ExecutorsCreate`.
@main def ExecutorsCreate =
val executor = new ForkJoinPool
executor.execute(new Runnable:
override def run() = log("This task is run asynchronously.")
)
Thread.sleep(500)
package lecture6
import scala.concurrent.ExecutionContext
@main def ExecutionContextCreate =
val ectx = ExecutionContext.global
ectx.execute(new Runnable:
override def run() = log("This task is run asynchronously.")
)
Thread.sleep(500)
package lecture6
import scala.concurrent.ExecutionContext
def execute(body: => Unit) =
ExecutionContext.global.execute(new Runnable:
override def run() = body
)
@main def SimpleExecute =
execute { log("This task is run asynchronously.") }
Thread.sleep(500)
package lecture6
import scala.concurrent.ExecutionContext
import lecture1.threadStart
class OnePlaceBuffer[Elem]:
var elem: Elem = _
var full = false
def put(e: Elem): Unit = this.synchronized {
while full do wait()
elem = e
full = true
log(f"Put $e in the buffer")
notifyAll()
}
def get(): Elem = this.synchronized {
while !full do wait()
full = false
notifyAll()
log(f"Got $elem from the buffer")
elem
}
@main def OnePlaceBufferDemo =
val buffer = OnePlaceBuffer[Int]()
val getThreads = for i <- 0 until 10 yield threadStart { buffer.get() }
val putThreads = for i <- 0 until 10 yield threadStart { buffer.put(i) }
putThreads.foreach(_.join())
getThreads.foreach(_.join())
package lecture6
class Fork(var index: Int)
def philosopherTurn(i: Int, l: Fork, r: Fork): Unit =
Thread.sleep(10)
var (left, right) =
if i % 2 == 0 then (l, r)
else (r, l)
// if l.index < r.index then (l, r) else (r, l)
left.synchronized {
log(f"Philosopher $i picks up left fork ${left.index}")
right.synchronized {
log(f"Philosopher $i picks up right fork ${right.index} - eating")
Thread.sleep(100)
log(f"Philosopher $i puts down right fork ${right.index}")
}
log(f"Philosopher $i puts down left fork ${left.index}")
}
@main def run() =
val n = 5
val k: Int = 3
val forks = new Array[Fork](n)
val philosophers = new Array[Thread](n)
for p <- 0 to n - 1 do
forks(p) = Fork(p)
for p <- 0 to n - 1 do
philosophers(p) = new Thread:
override def run() =
for _ <- 0 until k do
philosopherTurn(p, forks(p % n), forks((p + 1) % n))
philosophers(p).start
for p <- 0 to n - 1 do
philosophers(p).join()
package lecture6
val log = println(_)
package midterm22
import scala.collection.mutable.Set
@main def mock1() =
val values = Set[Int]()
for _ <- 1 to 100000 do
var sum = 0
val t1 = task { sum += 1 }
val t2 = task { sum += 1 }
t1.join()
t2.join()
values += sum
println(values)
package midterm22
import instrumentation.Monitor
class Account(private var amount: Int = 0) extends Monitor:
def transfer(target: Account, n: Int) =
this.synchronized {
target.synchronized {
this.amount -= n
target.amount += n
}
}
@main def mock2() =
val a = new Account(50)
val b = new Account(70)
val t1 = task { a.transfer(b, 10) }
val t2 = task { b.transfer(a, 10) }
t1.join()
t2.join()
package midterm22
import scala.collection.parallel.Task
import scala.collection.parallel.CollectionConverters.*
// Questions 1-3
// See tests in midterm22.Part1Test.
// Run with `sbt "testOnly midterm22.Part1Test2"`.
def parallel3[A, B, C](op1: => A, op2: => B, op3: => C): (A, B, C) =
val res1 = task { op1 }
val res2 = task { op2 }
val res3 = op3
(res1.join(), res2.join(), res3)
def find(arr: Array[Int], value: Int, threshold: Int): Option[Int] =
def findHelper(start: Int, end: Int): Option[Int] =
if end - start <= threshold then
var i = start
while i < end do
if arr(i) == value then return Some(value)
i += 1
None
else
val inc = (end - start) / 3
val (res1, res2, res3) = parallel3(
findHelper(start, start + inc),
findHelper(start + inc, start + 2 * inc),
findHelper(start + 2 * inc, end)
)
res1.orElse(res2).orElse(res3)
findHelper(0, arr.size)
def findAggregated(arr: Array[Int], value: Int): Option[Int] =
val no: Option[Int] = None
val yes: Option[Int] = Some(value)
def f = (x1: Option[Int], x2: Int) => if x2 == value then Some(x2) else x1
def g = (x1: Option[Int], x2: Option[Int]) => if x1 != None then x1 else x2
arr.par.aggregate(no)(f, g)
@main def part1() =
println(find(Array(1, 2, 3), 2, 1))
// See tests in Part1Test
package midterm22
import scala.annotation.nowarn
// Questions 4-7
// See tests in midterm22.Part2Test.
// Run with `sbt testOnly midterm22.Part2Test`.
/*
Answers to the exam questions:
When called with a Vector:
The total amount of work is O(n), as it is dominated by the time needed to
read the array. More precisely W(n) = c + 2*W(n/2) = O(n).
The depth is O(log(n)), because every recursion takes constant time
and we divide the size of the input by 2 every time, i.e. D(n) = c + D(n/2) = O(log(n)).
Note however that in practice it is often still faster to manipulate
start and end indices rather than using take and drop.
When called with a List:
Every recursion takes up to time O(n) rather than constant time.
The total amount of work is O(n) times the number of recursion, because
take and drop takes time O(n) on lists. Precisely, W(n) = n + 2*W(n/2) = O(log(n)*n)
The depth is computed similarly: D(n) = n + D(n/2) = O(n), i.e.
Note: these are theoretical results. In practice, you should always double-check
that kind of conclusions with benchmarks. We did so in
`midterm-code/src/main/scala/bench`. Results are available in `bench-results`.
From these results, we can conclude that
1. Vectors are indeed faster in this case, and
2. parallelization of `contains` yields a 2x speedup.
*/
@nowarn
def contains[A](l: Iterable[A], elem: A): Boolean =
val n = l.size
if n <= 5 then
for i <- l do
if i == elem then
return true
false
else
val (p0, p1) = parallel(
contains(l.take(n / 2), elem),
contains(l.drop(n / 2), elem)
)
p0 || p1