From 17f5bc8faa6f9efd9899ba1067e414bccdef8645 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Fri, 8 Dec 2023 19:52:02 +0100 Subject: [PATCH 01/11] Rename result => awaitResult, value => await, make them Source methods --- README.md | 4 +- .../examples/clientAndServerUDP.scala | 10 +- .../examples/readAndWriteFile.scala | 8 +- .../PosixLikeIO/examples/readWholeFile.scala | 4 +- .../scala/measurements/measureTimes.scala | 24 ++--- jvm/src/test/scala/CancellationBehavior.scala | 2 +- .../main/scala/async/ForkJoinSupport.scala | 2 +- shared/src/main/scala/async/Async.scala | 15 +-- .../main/scala/async/CompletionGroup.scala | 2 +- shared/src/main/scala/async/channels.scala | 5 +- shared/src/main/scala/async/futures.scala | 59 +++++------- .../src/test/scala/CancellationBehavior.scala | 12 +-- shared/src/test/scala/ChannelBehavior.scala | 42 ++++----- shared/src/test/scala/FutureBehavior.scala | 92 +++++++++---------- shared/src/test/scala/ListenerBehavior.scala | 14 +-- shared/src/test/scala/SourceBehavior.scala | 34 +++---- .../src/test/scala/TaskScheduleBehavior.scala | 10 +- shared/src/test/scala/Timer.scala | 6 +- 18 files changed, 165 insertions(+), 180 deletions(-) diff --git a/README.md b/README.md index c3efc5f8..8a8278bd 100644 --- a/README.md +++ b/README.md @@ -187,14 +187,14 @@ which takes two futures and if they both complete successfully returns their res extension [T](f1: Future[T]) def zip[U](f2: Future[U])(using Async): Future[(T, U)] = Future: - Async.await(Async.either(f1, f2)) match + Async.either(f1, f2).awaitResult match case Left(Success(x1)) => (x1, f2.value) case Right(Success(x2)) => (f1.value, x2) case Left(Failure(ex)) => throw ex case Right(Failure(ex)) => throw ex def alt(f2: Future[T])(using Async): Future[T] = Future: - Async.await(Async.either(f1, f2)) match + Async.either(f1, f2).awaitResult match case Left(Success(x1)) => x1 case Right(Success(x2)) => x2 case Left(_: Failure[?]) => f2.value diff --git a/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala b/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala index 83bae882..5d98f674 100644 --- a/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala +++ b/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala @@ -15,7 +15,7 @@ import scala.concurrent.ExecutionContext Async.blocking: val server = Future: PIOHelper.withSocketUDP(8134): serverSocket => - val got: DatagramPacket = serverSocket.receive().result.get + val got: DatagramPacket = serverSocket.receive().awaitResult.get val messageReceived = String(got.getData.slice(0, got.getLength), "UTF-8") val responseMessage = (messageReceived.toInt + 1).toString.getBytes serverSocket.send(ByteBuffer.wrap(responseMessage), got.getAddress.toString.substring(1), got.getPort) @@ -25,10 +25,10 @@ import scala.concurrent.ExecutionContext Future: PIOHelper.withSocketUDP(): clientSocket => val data: Array[Byte] = value.toString.getBytes - clientSocket.send(ByteBuffer.wrap(data), "localhost", 8134).result.get - val responseDatagram = clientSocket.receive().result.get + clientSocket.send(ByteBuffer.wrap(data), "localhost", 8134).awaitResult.get + val responseDatagram = clientSocket.receive().awaitResult.get val messageReceived = String(responseDatagram.getData.slice(0, responseDatagram.getLength), "UTF-8").toInt println("Sent " + value.toString + " and got " + messageReceived.toString + " in return.") - Async.await(client(100)) - Async.await(server) + client(100).await + server.await diff --git a/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala b/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala index 6be2a21a..f63b0e50 100644 --- a/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala +++ b/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala @@ -13,7 +13,7 @@ import scala.concurrent.ExecutionContext given ExecutionContext = ExecutionContext.global Async.blocking: PIOHelper.withFile("/home/julian/Desktop/x.txt", StandardOpenOption.READ, StandardOpenOption.WRITE): f => - Async.await(f.writeString("Hello world! (1)")) - println(Async.await(f.readString(1024)).get) - Async.await(f.writeString("Hello world! (2)")) - println(Async.await(f.readString(1024)).get) + f.writeString("Hello world! (1)").await + println(f.readString(1024).await) + f.writeString("Hello world! (2)").await + println(f.readString(1024).await) diff --git a/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala b/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala index d8163c6d..3924114f 100644 --- a/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala +++ b/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala @@ -14,11 +14,11 @@ import scala.concurrent.ExecutionContext Async.blocking: PIOHelper.withFile("/home/julian/Desktop/x.txt", StandardOpenOption.READ): f => val b = ByteBuffer.allocate(1024) - val retCode = f.read(b).result.get + val retCode = f.read(b).awaitResult.get assert(retCode >= 0) val s = StandardCharsets.UTF_8.decode(b.slice(0, retCode)).toString() println("Read size with read(): " + retCode.toString()) println("Data: " + s) println("Read with readString():") - println(Async.await(f.readString(1000)).get) + println(f.readString(1000).awaitResult) diff --git a/jvm/src/main/scala/measurements/measureTimes.scala b/jvm/src/main/scala/measurements/measureTimes.scala index ff4e4d45..4a5beb42 100644 --- a/jvm/src/main/scala/measurements/measureTimes.scala +++ b/jvm/src/main/scala/measurements/measureTimes.scala @@ -46,7 +46,7 @@ def measureIterations[T](action: () => T): Int = Async.blocking: val f = Future: var z = 1 - f.result + f.awaitResult println("Thread joins per second: " + (threadJoins / 60)) println("Future joins per second: " + (futureJoins / 60)) @@ -64,26 +64,26 @@ def measureIterations[T](action: () => T): Int = val c1: Double = measureIterations: () => Async.blocking: - Async.await(Async.race(Future { Thread.sleep(10) }, Future { Thread.sleep(100) }, Future { Thread.sleep(50) })) - Async.await(Async.race(Future { Thread.sleep(50) }, Future { Thread.sleep(10) }, Future { Thread.sleep(100) })) - Async.await(Async.race(Future { Thread.sleep(100) }, Future { Thread.sleep(50) }, Future { Thread.sleep(10) })) + Async.race(Future { Thread.sleep(10) }, Future { Thread.sleep(100) }, Future { Thread.sleep(50) }).await + Async.race(Future { Thread.sleep(50) }, Future { Thread.sleep(10) }, Future { Thread.sleep(100) }).await + Async.race(Future { Thread.sleep(100) }, Future { Thread.sleep(50) }, Future { Thread.sleep(10) }).await val c2: Double = measureIterations: () => Async.blocking: val f11 = Future { Thread.sleep(10) } val f12 = Future { Thread.sleep(50) } val f13 = Future { Thread.sleep(100) } - f11.result + f11.awaitResult val f21 = Future { Thread.sleep(100) } val f22 = Future { Thread.sleep(10) } val f23 = Future { Thread.sleep(50) } - f22.result + f22.awaitResult val f31 = Future { Thread.sleep(50) } val f32 = Future { Thread.sleep(100) } val f33 = Future { Thread.sleep(10) } - f33.result + f33.awaitResult val c1_seconds_wasted_for_waits = c1 * 0.01 val c1_per_second_adjusted = c1 / 3 / (60 - c1_seconds_wasted_for_waits) @@ -105,9 +105,9 @@ def measureIterations[T](action: () => T): Int = val c1: Double = measureIterations: () => Async.blocking: - Async.await(Async.race(Future { Thread.sleep(10) }, Future { Thread.sleep(100) }, Future { Thread.sleep(50) })) - Async.await(Async.race(Future { Thread.sleep(50) }, Future { Thread.sleep(10) }, Future { Thread.sleep(100) })) - Async.await(Async.race(Future { Thread.sleep(100) }, Future { Thread.sleep(50) }, Future { Thread.sleep(10) })) + Async.race(Future { Thread.sleep(10) }, Future { Thread.sleep(100) }, Future { Thread.sleep(50) }).await + Async.race(Future { Thread.sleep(50) }, Future { Thread.sleep(10) }, Future { Thread.sleep(100) }).await + Async.race(Future { Thread.sleep(100) }, Future { Thread.sleep(50) }, Future { Thread.sleep(10) }).await val c2: Double = measureIterations: () => @volatile var i1 = true @@ -433,7 +433,7 @@ def measureRunTimes[T](action: () => T): TimeMeasurementResult = dataAlmostJson.append(measure("PosixLikeIO", timesInner = if size < 100 then 100 else 10): () => Async.blocking: PIOHelper.withFile("/tmp/FIO/x.txt", StandardOpenOption.CREATE, StandardOpenOption.WRITE): f => - f.writeString(bigString.substring(0, size)).result + f.writeString(bigString.substring(0, size)).awaitResult ) println("done 1") @@ -466,7 +466,7 @@ def measureRunTimes[T](action: () => T): TimeMeasurementResult = dataAlmostJson.append(measure("PosixLikeIO", timesInner = if size < 100 then 100 else 10): () => Async.blocking: PIOHelper.withFile("/tmp/FIO/x.txt", StandardOpenOption.READ): f => - f.readString(size).result + f.readString(size).awaitResult ) println("done 1") diff --git a/jvm/src/test/scala/CancellationBehavior.scala b/jvm/src/test/scala/CancellationBehavior.scala index 8f06a113..b0ec2663 100644 --- a/jvm/src/test/scala/CancellationBehavior.scala +++ b/jvm/src/test/scala/CancellationBehavior.scala @@ -17,4 +17,4 @@ class JVMCancellationBehavior extends munit.FunSuite: val f = Future: Thread.sleep(5000) 1 - f.result + f.awaitResult diff --git a/native/src/main/scala/async/ForkJoinSupport.scala b/native/src/main/scala/async/ForkJoinSupport.scala index f43b5261..75cda15f 100644 --- a/native/src/main/scala/async/ForkJoinSupport.scala +++ b/native/src/main/scala/async/ForkJoinSupport.scala @@ -88,7 +88,7 @@ class SuspendExecutorWithSleep(exec: ExecutionContext) val cancellable = schedule(millis.millis, () => resolver.resolve(())) resolver.onCancel(cancellable.cancel) .link() - .value + .await } class ForkJoinSupport extends SuspendExecutorWithSleep(new ForkJoinPool()) diff --git a/shared/src/main/scala/async/Async.scala b/shared/src/main/scala/async/Async.scala index 81cbf07d..87fb45b6 100644 --- a/shared/src/main/scala/async/Async.scala +++ b/shared/src/main/scala/async/Async.scala @@ -57,9 +57,6 @@ object Async: /** The currently executing Async context */ inline def current(using async: Async): Async = async - /** Await source result in currently executing Async context */ - inline def await[T](src: Source[T])(using async: Async): T = async.await(src) - def group[T](body: Async ?=> T)(using async: Async): T = withNewCompletionGroup(CompletionGroup(async.group.handleCompletion).link())(body) @@ -114,10 +111,16 @@ object Async: resultOpt /** Utility method for direct waiting with `Async`. */ - def await(using Async) = Async.await(this) - + final def awaitResult(using ac: Async) = ac.await(this) end Source + extension [T](src: Source[scala.util.Try[T]]) + /** Waits for an item to arrive from the source, then automatically unwraps it. */ + inline def await(using Async) = src.awaitResult.get + extension [E, T](src: Source[Either[E, T]]) + /** Waits for an item to arrive from the source, then automatically unwraps it. */ + inline def await(using Async) = src.awaitResult.right.get + /** An original source has a standard definition of `onComplete` in terms of `poll` and `addListener`. Implementations * should be the resource owner to handle listener queue and completion using an object monitor on the instance. */ @@ -263,7 +266,7 @@ object Async: * [[select]] is run in the same async context as the calling context of [[select]]. */ def select[T](cases: SelectCase[T]*)(using Async) = - val (input, which) = raceWithOrigin(cases.map(_._1)*).await + val (input, which) = raceWithOrigin(cases.map(_._1)*).awaitResult val (_, handler) = cases.find(_._1 == which).get handler.asInstanceOf[input.type => T](input) diff --git a/shared/src/main/scala/async/CompletionGroup.scala b/shared/src/main/scala/async/CompletionGroup.scala index 390bf49c..760bf53f 100644 --- a/shared/src/main/scala/async/CompletionGroup.scala +++ b/shared/src/main/scala/async/CompletionGroup.scala @@ -25,7 +25,7 @@ class CompletionGroup(val handleCompletion: Cancellable => Async ?=> Unit = _ => private[async] def waitCompletion()(using Async): Unit = synchronized: if members.nonEmpty && cancelWait.isEmpty then cancelWait = Some(Promise()) - cancelWait.foreach(cWait => Async.await(cWait.future)) + cancelWait.foreach(cWait => cWait.future.await) signalCompletion() /** Add given member to the members set. If the group has already been cancelled, cancels that member immediately. */ diff --git a/shared/src/main/scala/async/channels.scala b/shared/src/main/scala/async/channels.scala index b074794b..f708bb43 100644 --- a/shared/src/main/scala/async/channels.scala +++ b/shared/src/main/scala/async/channels.scala @@ -3,7 +3,6 @@ import scala.collection.mutable import mutable.{ArrayBuffer, ListBuffer} import scala.util.{Failure, Success, Try} -import Async.await import scala.util.control.Breaks.{break, breakable} import gears.async.Async.Source @@ -26,7 +25,7 @@ trait SendableChannel[-T]: /** Send [[x]] over the channel, blocking (asynchronously with [[Async]]) until the item has been sent or, if the * channel is buffered, queued. Throws [[ChannelClosedException]] if the channel was closed. */ - def send(x: T)(using Async): Unit = Async.await(sendSource(x)) match + def send(x: T)(using Async): Unit = sendSource(x).awaitResult match case Right(_) => () case Left(_) => throw ChannelClosedException() end SendableChannel @@ -45,7 +44,7 @@ trait ReadableChannel[+T]: /** Read an item from the channel, blocking (asynchronously with [[Async]]) until the item has been received. Returns * `Failure(ChannelClosedException)` if the channel was closed. */ - def read()(using Async): Res[T] = await(readSource) + def read()(using Async): Res[T] = readSource.awaitResult end ReadableChannel /** A generic channel that can be sent to, received from and closed. */ diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index 165ae332..b6494739 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -17,19 +17,7 @@ import scala.util.control.NonFatal /** A cancellable future that can suspend waiting for other asynchronous sources */ -trait Future[+T] extends Async.OriginalSource[Try[T]], Cancellable: - - /** Wait for this future to be completed and return its result */ - def result(using async: Async): Try[T] - - /** Wait for this future to be completed, return its value in case of success, or rethrow exception in case of - * failure. - */ - def value(using async: Async): T = result.get - - /** Eventually stop computation of this future and fail with a `Cancellation` exception. - */ - def cancel(): Unit +trait Future[+T] extends Async.OriginalSource[Try[T]], Cancellable object Future: @@ -64,12 +52,7 @@ object Future: def cancel(): Unit = cancelRequest = true - - // Future method implementations - - def result(using async: Async): Try[T] = - val r = async.await(this) - if cancelRequest then Failure(new CancellationException()) else r + complete(Failure(new CancellationException)) /** Complete future with result. If future was cancelled in the meantime, return a CancellationException failure * instead. Note: @uncheckedVariance is safe here since `complete` is called from only two places: @@ -219,18 +202,18 @@ object Future: * fail with the failure that was returned first. */ def zip[U](f2: Future[U])(using Async): Future[(T, U)] = Future: - Async.await(Async.either(f1, f2)) match - case Left(Success(x1)) => (x1, f2.value) - case Right(Success(x2)) => (f1.value, x2) + Async.either(f1, f2).awaitResult match + case Left(Success(x1)) => (x1, f2.await) + case Right(Success(x2)) => (f1.await, x2) case Left(Failure(ex)) => throw ex case Right(Failure(ex)) => throw ex /** Parallel composition of tuples of futures. Future.Success(EmptyTuple) might be treated as Nil. */ def *:[U <: Tuple](f2: Future[U])(using Async): Future[T *: U] = Future: - Async.await(Async.either(f1, f2)) match - case Left(Success(x1)) => x1 *: f2.value - case Right(Success(x2)) => f1.value *: x2 + Async.either(f1, f2).awaitResult match + case Left(Success(x1)) => x1 *: f2.await + case Right(Success(x2)) => f1.await *: x2 case Left(Failure(ex)) => throw ex case Right(Failure(ex)) => throw ex @@ -238,25 +221,25 @@ object Future: * success that was returned first. Otherwise, fail with the failure that was returned last. */ def alt(f2: Future[T])(using Async): Future[T] = Future: - Async.await(Async.either(f1, f2)) match + Async.either(f1, f2).awaitResult match case Left(Success(x1)) => x1 case Right(Success(x2)) => x2 - case Left(_: Failure[?]) => f2.value - case Right(_: Failure[?]) => f1.value + case Left(_: Failure[?]) => f2.await + case Right(_: Failure[?]) => f1.await /** Like `alt` but the slower future is cancelled. If either task succeeds, succeed with the success that was * returned first and the other is cancelled. Otherwise, fail with the failure that was returned last. */ def altWithCancel(f2: Future[T])(using Async): Future[T] = Future: - Async.await(Async.either(f1, f2)) match + Async.either(f1, f2).awaitResult match case Left(Success(x1)) => f2.cancel() x1 case Right(Success(x2)) => f1.cancel() x2 - case Left(_: Failure[?]) => f2.value - case Right(_: Failure[?]) => f1.value + case Left(_: Failure[?]) => f2.await + case Right(_: Failure[?]) => f1.await end extension @@ -302,15 +285,15 @@ object Future: /** `.await` for all futures in the sequence, returns the results in a sequence, or throws if any futures fail. */ def awaitAll(using Async) = val collector = Collector(fs*) - for _ <- fs do collector.results.read().right.get.value - fs.map(_.value) + for _ <- fs do collector.results.read().right.get.await + fs.map(_.await) /** Like [[awaitAll]], but cancels all futures as soon as one of them fails. */ def awaitAllOrCancel(using Async) = val collector = Collector(fs*) try - for _ <- fs do collector.results.read().right.get.value - fs.map(_.value) + for _ <- fs do collector.results.read().right.get.await + fs.map(_.await) catch case NonFatal(e) => fs.foreach(_.cancel()) @@ -327,7 +310,7 @@ object Future: val collector = Collector(fs*) @scala.annotation.tailrec def loop(attempt: Int): T = - collector.results.read().right.get.result match + collector.results.read().right.get.awaitResult match case Failure(exception) => if attempt == fs.length then /* everything failed */ throw exception else loop(attempt + 1) case Success(value) => @@ -462,13 +445,13 @@ end Task private def altAndAltCImplementation[T](shouldCancel: Boolean, futures: Future[T]*)(using Async): Future[T] = Future[T]: val fs: Seq[Future[(Try[T], Int)]] = futures.zipWithIndex.map({ (f, i) => Future: - try (Success(f.value), i) + try (Success(f.await), i) catch case e => (Failure(e), i) }) @tailrec def helper(failed: Int, fs: Seq[(Future[(Try[T], Int)], Int)]): Try[T] = - Async.await(Async.race(fs.map(_._1)*)) match + Async.race(fs.map(_._1)*).awaitResult match case Success((Success(x), i)) => if (shouldCancel) { for ((f, j) <- futures.zipWithIndex) { diff --git a/shared/src/test/scala/CancellationBehavior.scala b/shared/src/test/scala/CancellationBehavior.scala index 56cd5406..f882a8f4 100644 --- a/shared/src/test/scala/CancellationBehavior.scala +++ b/shared/src/test/scala/CancellationBehavior.scala @@ -77,7 +77,7 @@ class CancellationBehavior extends munit.FunSuite: val promise = Future.Promise[Unit]() Async.group: startFuture(info, promise.complete(Success(()))) - Async.await(promise.future) + promise.future.await info.assertCancelled() test("nested link group"): @@ -89,13 +89,13 @@ class CancellationBehavior extends munit.FunSuite: info1, { Async.group: startFuture(info2, promise2.complete(Success(()))) - Async.await(promise2.future) + promise2.future.await info2.assertCancelled() - Future.now(Success(())).value // check cancellation + Future.now(Success(())).await // check cancellation promise1.complete(Success(())) } ) - Async.await(promise1.future) + promise1.future.await info1.assertCancelled() info2.assertCancelled() @@ -123,6 +123,6 @@ class CancellationBehavior extends munit.FunSuite: Async.group: Async.current.group.cancel() // cancel now val f = startFuture(info, promise.complete(Success(()))) - Async.await(promise.future) - Async.await(f) + promise.future.awaitResult + f.awaitResult info.assertCancelled() diff --git a/shared/src/test/scala/ChannelBehavior.scala b/shared/src/test/scala/ChannelBehavior.scala index f9e18557..59f445a6 100644 --- a/shared/src/test/scala/ChannelBehavior.scala +++ b/shared/src/test/scala/ChannelBehavior.scala @@ -42,8 +42,8 @@ class ChannelBehavior extends munit.FunSuite { val f2 = Future: c.read() - f1.result - f2.result + f1.awaitResult + f2.awaitResult } test("sending is nonblocking in empty BufferedChannel") { @@ -59,8 +59,8 @@ class ChannelBehavior extends munit.FunSuite { val f2 = Future: c.read() - f1.result - f2.result + f1.awaitResult + f2.awaitResult } test("sending is blocking in full BufferedChannel") { @@ -80,8 +80,8 @@ class ChannelBehavior extends munit.FunSuite { val f2 = Future: c.read() - f1.result - f2.result + f1.awaitResult + f2.awaitResult } test("read blocks until value is available in SyncChannel") { @@ -100,9 +100,9 @@ class ChannelBehavior extends munit.FunSuite { c.read() assertEquals(touched, true) - f1.result - f11.result - f2.result + f1.awaitResult + f11.awaitResult + f2.awaitResult } test("read blocks until value is available in BufferedChannel") { @@ -121,9 +121,9 @@ class ChannelBehavior extends munit.FunSuite { c.read() assertEquals(touched, true) - f1.result - f11.result - f2.result + f1.awaitResult + f11.awaitResult + f2.awaitResult } test("values arrive in order") { @@ -199,7 +199,7 @@ class ChannelBehavior extends munit.FunSuite { for (i <- 1 to 10000) sum += c.read().right.get - f2.result + f2.awaitResult assertEquals(sum, 50005000L) } } @@ -236,15 +236,15 @@ class ChannelBehavior extends munit.FunSuite { gotCount.incrementAndGet() } - f21.result - f22.result + f21.awaitResult + f22.awaitResult while (gotCount.get() < 30000) { c.read() gotCount.incrementAndGet() } - f11.result - f12.result - f13.result + f11.awaitResult + f12.awaitResult + f13.awaitResult } } @@ -260,7 +260,7 @@ class ChannelBehavior extends munit.FunSuite { )* ) var sum = 0 - for i <- 0 until 1000 do sum += Async.await(race) + for i <- 0 until 1000 do sum += race.awaitResult assertEquals(sum, (0 until 1000).sum) } @@ -286,7 +286,7 @@ class ChannelBehavior extends munit.FunSuite { (for i <- 0 until 1000 yield ch.sendSource(i))* ) Future { - while Async.await(race).isRight do { + while race.awaitResult.isRight do { timesSent += 1 } } @@ -318,7 +318,7 @@ class ChannelBehavior extends munit.FunSuite { a.close() b.close() - assert(Async.race(a.readSource, b.readSource).await.isLeft) + assert(Async.race(a.readSource, b.readSource).awaitResult.isLeft) } test("ChannelMultiplexer multiplexes - all subscribers read the same stream") { diff --git a/shared/src/test/scala/FutureBehavior.scala b/shared/src/test/scala/FutureBehavior.scala index d9136a01..a2f49e15 100644 --- a/shared/src/test/scala/FutureBehavior.scala +++ b/shared/src/test/scala/FutureBehavior.scala @@ -27,11 +27,11 @@ class FutureBehavior extends munit.FunSuite { } val res = c .alt(Future { - val res = a.value + b.value + val res = a.await + b.await res }) .alt(c) - .value + .await res val y = Future: val a = Future { @@ -40,7 +40,7 @@ class FutureBehavior extends munit.FunSuite { val b = Future { 11 } - val res = a.zip(b).value + val res = a.zip(b).await res val z = Future: val a = Future { @@ -49,11 +49,11 @@ class FutureBehavior extends munit.FunSuite { val b = Future { true } - val res = a.alt(b).value + val res = a.alt(b).await res val _: Future[Int | Boolean] = z - assertEquals(x.value, 33) - assertEquals(y.value, (22, 11)) + assertEquals(x.await, 33) + assertEquals(y.await, (22, 11)) } test("Constant returns") { @@ -61,14 +61,14 @@ class FutureBehavior extends munit.FunSuite { for (i <- -5 to 5) val f1 = Future { i } val f2 = Future.now(Success(i)) - assertEquals(f1.value, i) - assertEquals(f1.value, f2.value) + assertEquals(f1.await, i) + assertEquals(f1.await, f2.await) } test("Future.now returning") { Async.blocking: val f = Future.now(Success(116)) - assertEquals(f.value, 116) + assertEquals(f.await, 116) } test("Constant future with timeout") { @@ -77,7 +77,7 @@ class FutureBehavior extends munit.FunSuite { sleep(50) 55 } - assertEquals(f.value, 55) + assertEquals(f.await, 55) } test("alt") { @@ -87,10 +87,10 @@ class FutureBehavior extends munit.FunSuite { val fail1 = Future.now(Failure(error)) val succeed = Future.now(Success(13)) - assert(Set(10, 20).contains(Future { 10 }.alt(Future { 20 }).value)) - assertEquals(fail.alt(succeed).value, 13) - assertEquals(succeed.alt(fail).value, 13) - assertEquals(fail.alt(fail1).result, Failure(error)) + assert(Set(10, 20).contains(Future { 10 }.alt(Future { 20 }).await)) + assertEquals(fail.alt(succeed).await, 13) + assertEquals(succeed.alt(fail).await, 13) + assertEquals(fail.alt(fail1).awaitResult, Failure(error)) } test("altC of 2 futures") { @@ -101,7 +101,7 @@ class FutureBehavior extends munit.FunSuite { touched += 1 }.alt(Future { 10 - }).result + }).awaitResult sleep(300) assertEquals(touched, 1) Async.blocking: @@ -109,7 +109,7 @@ class FutureBehavior extends munit.FunSuite { Future { sleep(200) touched += 1 - }.altWithCancel(Future { 10 }).result + }.altWithCancel(Future { 10 }).awaitResult sleep(300) assertEquals(touched, 0) } @@ -129,13 +129,13 @@ class FutureBehavior extends munit.FunSuite { Future { 5 } - ).result + ).awaitResult sleep(200) assertEquals(touched.get(), 2) } Async.blocking: var touched = 0 - altC(Future { sleep(100); touched += 1 }, Future { sleep(100); touched += 1 }, Future { 5 }).result + altC(Future { sleep(100); touched += 1 }, Future { sleep(100); touched += 1 }, Future { 5 }).awaitResult sleep(200) assertEquals(touched, 0) } @@ -147,23 +147,23 @@ class FutureBehavior extends munit.FunSuite { val fail1 = Future.now(Failure(error)) val succeed = Future.now(Success(13)) - assertEquals(Future { 10 }.zip(Future { 20 }).value, (10, 20)) - assertEquals(fail.zip(succeed).result, Failure(error)) - assertEquals(succeed.zip(fail).result, Failure(error)) - assertEquals(fail.zip(fail1).result, Failure(error)) + assertEquals(Future { 10 }.zip(Future { 20 }).await, (10, 20)) + assertEquals(fail.zip(succeed).awaitResult, Failure(error)) + assertEquals(succeed.zip(fail).awaitResult, Failure(error)) + assertEquals(fail.zip(fail1).awaitResult, Failure(error)) } test("result wraps exceptions") { Async.blocking: for (i <- -5 to 5) val error = new AssertionError(i) - assertEquals(Future { throw error }.result, Failure(error)) + assertEquals(Future { throw error }.awaitResult, Failure(error)) } test("result wraps values") { Async.blocking: for (i <- -5 to 5) - assertEquals(Future { i }.result, Success(i)) + assertEquals(Future { i }.awaitResult, Success(i)) } test("value propagates exceptions exceptions through futures") { @@ -173,9 +173,9 @@ class FutureBehavior extends munit.FunSuite { val f1 = Future { throw e } - (f1.value, f1.value) + (f1.await, f1.await) } - try f.value + try f.await catch case e1: AssertionError => assertEquals(e, e1) case z => @@ -191,9 +191,9 @@ class FutureBehavior extends munit.FunSuite { Future { sleep(Random.between(0, 15L)) 10 - }.value - }.value - }.value + }.await + }.await + }.await assertEquals(z, 10) } @@ -204,7 +204,7 @@ class FutureBehavior extends munit.FunSuite { 10 } f.cancel() - f.result match + f.awaitResult match case _: Failure[CancellationException] => () case _ => assert(false) } @@ -219,7 +219,7 @@ class FutureBehavior extends munit.FunSuite { }.unlink() 10 } - assertEquals(f.value, 10) + assertEquals(f.await, 10) assertEquals(zombieModifiedThis, false) Thread.sleep(300) assertEquals(zombieModifiedThis, true) @@ -239,7 +239,7 @@ class FutureBehavior extends munit.FunSuite { Future { 30 } - ).value + ).await ) ) } @@ -249,13 +249,13 @@ class FutureBehavior extends munit.FunSuite { val z1 = Future { sleep(500); 10 } *: Future { sleep(10); 222 } *: Future { sleep(150); 333 } *: Future { EmptyTuple } - assertEquals(z1.value, (10, 222, 333)) + assertEquals(z1.await, (10, 222, 333)) } test("zip on tuples with last zip") { Async.blocking: val z1 = Future { 10 } *: Future { 222 }.zip(Future { 333 }) - assertEquals(z1.value, (10, 222, 333)) + assertEquals(z1.await, (10, 222, 333)) } test("zip(3) first error") { @@ -274,7 +274,7 @@ class FutureBehavior extends munit.FunSuite { } *: Future { sleep(Random.between(50, 100)); throw e3 - } *: Future.now(Success(EmptyTuple))).result, + } *: Future.now(Success(EmptyTuple))).awaitResult, Failure(e3) ) } @@ -291,7 +291,7 @@ class FutureBehavior extends munit.FunSuite { futures.foreach(_.cancel()) val exceptionSet = mutable.Set[Throwable]() for (f <- futures) { - f.result match { + f.awaitResult match { case Failure(e) => exceptionSet.add(e) case _ => assert(false) } @@ -307,7 +307,7 @@ class FutureBehavior extends munit.FunSuite { j = i } val f2 = Future.now(Success(i)) - assertEquals(j, f2.value) + assertEquals(j, f2.await) assertEquals(j, i) } @@ -322,9 +322,9 @@ class FutureBehavior extends munit.FunSuite { } sleep(50) f.cancel() - f.result + f.awaitResult assertEquals(touched, true) - f.result match + f.awaitResult match case Failure(ex) if ex.isInstanceOf[CancellationException] => () case _ => assert(false) } @@ -335,7 +335,7 @@ class FutureBehavior extends munit.FunSuite { p.complete(Success(10)) val f = p.future f.cancel() - f.result match + f.awaitResult match case Failure(ex) if ex.isInstanceOf[CancellationException] => () case _ => assert(false) } @@ -353,7 +353,7 @@ class FutureBehavior extends munit.FunSuite { } sleep(50) f1.cancel() - f1.result + f1.awaitResult assertEquals(touched1, true) assertEquals(touched2, false) } @@ -372,7 +372,7 @@ class FutureBehavior extends munit.FunSuite { Future { sleep(Random.between(30, 50)); 10000 * i + 333 } - ).result, + ).awaitResult, Success(10000 * i + 333) ) } @@ -398,7 +398,7 @@ class FutureBehavior extends munit.FunSuite { sleep(Random.between(0, 250)); throw e3 } - ).result, + ).awaitResult, Failure(e2) ) } @@ -410,7 +410,7 @@ class FutureBehavior extends munit.FunSuite { val collector = Future.Collector(futs*) var sum = 0 - for i <- range do sum += collector.results.read().right.get.value + for i <- range do sum += collector.results.read().right.get.await assertEquals(sum, range.sum) } @@ -427,8 +427,8 @@ class FutureBehavior extends munit.FunSuite { collector += r var sum = 0 - for i <- range do sum += collector.results.read().right.get.value - for i <- range do sum += collector.results.read().right.get.value + for i <- range do sum += collector.results.read().right.get.await + for i <- range do sum += collector.results.read().right.get.await assertEquals(sum, 2 * range.sum) } diff --git a/shared/src/test/scala/ListenerBehavior.scala b/shared/src/test/scala/ListenerBehavior.scala index d0037011..2cf14618 100644 --- a/shared/src/test/scala/ListenerBehavior.scala +++ b/shared/src/test/scala/ListenerBehavior.scala @@ -24,10 +24,10 @@ class ListenerBehavior extends munit.FunSuite: val prom1 = Promise[Unit]() val prom2 = Promise[Unit]() Async.blocking: - val raced = race(Future { prom1.future.value; 10 }, Future { prom2.future.value; 20 }) + val raced = race(Future { prom1.future.await; 10 }, Future { prom2.future.await; 20 }) assert(!raced.poll(Listener.acceptingListener((x, _) => fail(s"race uncomplete $x")))) prom1.complete(Success(())) - assertEquals(Async.await(raced).get, 10) + assertEquals(raced.await, 10) test("lock two listeners"): val listener1 = Listener.acceptingListener[Int]((x, _) => assertEquals(x, 1)) @@ -117,7 +117,7 @@ class ListenerBehavior extends munit.FunSuite: source1.completeWith(1) assert(source1.listener.isEmpty) assert(source2.listener.isEmpty) - f.value + f.await test("race successful with wait"): val source1 = TSource() @@ -132,8 +132,8 @@ class ListenerBehavior extends munit.FunSuite: listener.waitWaiter() listener.continue() val f2 = Future(l2.completeNow(1, source2)) - assert(f1.value || f2.value) - assert(!f1.value || !f2.value) + assert(f1.await || f2.await) + assert(!f1.await || !f2.await) assert(source1.listener.isEmpty) assert(source2.listener.isEmpty) @@ -172,7 +172,7 @@ class ListenerBehavior extends munit.FunSuite: other.waitWaiter() assert(source2.listener.get.completeNow(1, source2)) other.continue() - assertEquals(f1.value, s1listener) + assertEquals(f1.await, s1listener) test("lockBoth ordering"): val ordering = Buffer[Long]() @@ -330,7 +330,7 @@ private class NumberedTestListener private (sleep: AtomicBoolean, fail: Boolean, if sleep.getAndSet(false) then Async.blocking: waiter = Some(Promise()) - waiter.get.future.value + waiter.get.future.await waiter.foreach: promise => promise.complete(Success(())) waiter = None diff --git a/shared/src/test/scala/SourceBehavior.scala b/shared/src/test/scala/SourceBehavior.scala index 67dde865..a361fb25 100644 --- a/shared/src/test/scala/SourceBehavior.scala +++ b/shared/src/test/scala/SourceBehavior.scala @@ -45,7 +45,7 @@ class SourceBehavior extends munit.FunSuite { 10 } f.onComplete(Listener.acceptingListener { (_, _) => itRan = true }) - Async.await(f) + f.await Thread.sleep(100) // onComplete of await and manual may be scheduled assertEquals(itRan, true) } @@ -60,9 +60,9 @@ class SourceBehavior extends munit.FunSuite { Future { sleep(20) 10 - }.value - }.value - }.value + }.await + }.await + }.await val timeAfter = System.currentTimeMillis() assert(timeAfter - timeBefore >= 50 + 70 + 20) } @@ -73,7 +73,7 @@ class SourceBehavior extends munit.FunSuite { val f = Future { Future { sleep(300) } 1 - }.value + }.await val timeAfter = System.currentTimeMillis() assert(timeAfter - timeBefore < 290) } @@ -85,7 +85,7 @@ class SourceBehavior extends munit.FunSuite { 1 } assertEquals(f.poll(), None) - Async.await(f) + f.await assertEquals(f.poll(), Some(Success(1))) } @@ -101,7 +101,7 @@ class SourceBehavior extends munit.FunSuite { f.onComplete(Listener.acceptingListener { (_, _) => bRan = true }) assertEquals(aRan, false) assertEquals(bRan, false) - Async.await(f) + f.await Thread.sleep(100) // onComplete of await and manual may be scheduled assertEquals(aRan, true) assertEquals(bRan, true) @@ -121,7 +121,7 @@ class SourceBehavior extends munit.FunSuite { assertEquals(aRan, false) assertEquals(bRan, false) f.dropListener(l) - Async.await(f) + f.await Thread.sleep(100) // onComplete of await and manual may be scheduled assertEquals(aRan, false) assertEquals(bRan, true) @@ -130,9 +130,9 @@ class SourceBehavior extends munit.FunSuite { test("map") { Async.blocking: val f: Future[Int] = Future { 10 } - assertEquals(Async.await(f.map({ case Success(i) => i + 1 })), 11) + assertEquals(f.map({ case Success(i) => i + 1 }).awaitResult, 11) val g: Future[Int] = Future.now(Failure(AssertionError(1123))) - assertEquals(Async.await(g.map({ case Failure(_) => 17 })), 17) + assertEquals(g.map({ case Failure(_) => 17 }).awaitResult, 17) } test("all listeners in chain fire") { @@ -148,9 +148,9 @@ class SourceBehavior extends munit.FunSuite { g.onComplete(Listener.acceptingListener { (_, _) => bRan.complete(Success(())) }) assertEquals(aRan.future.poll(), None) assertEquals(bRan.future.poll(), None) - Async.await(f) + f.await Thread.sleep(100) // onComplete of await and manual may be scheduled - aRan.future.zip(bRan.future).alt(Future(sleep(600))).value + aRan.future.zip(bRan.future).alt(Future(sleep(600))).await } test("either") { @@ -158,7 +158,7 @@ class SourceBehavior extends munit.FunSuite { Async.blocking: val f1 = Future { sleep(300); touched = true; 10 } val f2 = Future { sleep(50); 40 } - val g = Async.await(either(f1, f2)) + val g = either(f1, f2).awaitResult assertEquals(g, Right(Success(40))) sleep(350) assertEquals(touched, true) @@ -167,19 +167,19 @@ class SourceBehavior extends munit.FunSuite { test("source values") { Async.blocking: val src = Async.Source.values(1, 2) - assertEquals(src.await, 1) - assertEquals(src.await, 2) + assertEquals(src.awaitResult, 1) + assertEquals(src.awaitResult, 2) Async.blocking: val src = Async.Source.values(1) - assertEquals(src.await, 1) + assertEquals(src.awaitResult, 1) assertEquals( Async .race( src, // this should block forever, so never resolve! Future { sleep(200); 0 } ) - .await, + .awaitResult, Success(0) ) } diff --git a/shared/src/test/scala/TaskScheduleBehavior.scala b/shared/src/test/scala/TaskScheduleBehavior.scala index 37aad460..5fb93cb1 100644 --- a/shared/src/test/scala/TaskScheduleBehavior.scala +++ b/shared/src/test/scala/TaskScheduleBehavior.scala @@ -16,7 +16,7 @@ class TaskScheduleBehavior extends munit.FunSuite { val f = Task { i += 1 }.schedule(TaskSchedule.Every(100, 3)).run - f.result + f.awaitResult assertEquals(i, 3) val end = System.currentTimeMillis() assert(end - start >= 200) @@ -30,7 +30,7 @@ class TaskScheduleBehavior extends munit.FunSuite { val f = Task { i += 1 }.schedule(TaskSchedule.ExponentialBackoff(50, 2, 5)).run - f.result + f.awaitResult assertEquals(i, 5) val end = System.currentTimeMillis() assert(end - start >= 50 + 100 + 200 + 400) @@ -44,7 +44,7 @@ class TaskScheduleBehavior extends munit.FunSuite { val f = Task { i += 1 }.schedule(TaskSchedule.FibonacciBackoff(10, 6)).run - f.result + f.awaitResult assertEquals(i, 6) val end = System.currentTimeMillis() assert(end - start >= 0 + 10 + 10 + 20 + 30 + 50) @@ -61,7 +61,7 @@ class TaskScheduleBehavior extends munit.FunSuite { Failure(AssertionError()) } else Success(i) } - val ret = t.schedule(TaskSchedule.RepeatUntilSuccess(150)).run.result + val ret = t.schedule(TaskSchedule.RepeatUntilSuccess(150)).run.awaitResult assertEquals(ret.get.get, 4) val end = System.currentTimeMillis() assert(end - start >= 4 * 150) @@ -79,7 +79,7 @@ class TaskScheduleBehavior extends munit.FunSuite { Success(i) } else Failure(ex) } - val ret = t.schedule(TaskSchedule.RepeatUntilFailure(150)).run.result + val ret = t.schedule(TaskSchedule.RepeatUntilFailure(150)).run.awaitResult assertEquals(ret.get, Failure(ex)) val end = System.currentTimeMillis() assert(end - start >= 4 * 150) diff --git a/shared/src/test/scala/Timer.scala b/shared/src/test/scala/Timer.scala index f7c43eea..7934ef0f 100644 --- a/shared/src/test/scala/Timer.scala +++ b/shared/src/test/scala/Timer.scala @@ -20,13 +20,13 @@ class TimerTest extends munit.FunSuite { Async.blocking: val timer = Timer(1.second) Future { timer.run() } - assert(Async.await(timer.src) == timer.TimerEvent.Tick) + assert(timer.src.awaitResult == timer.TimerEvent.Tick) } def timeoutCancellableFuture[T](d: Duration, f: Future[T])(using Async, AsyncOperations): Future[T] = val t = Future { sleep(d.toMillis) } Future: - val g = Async.await(Async.either(t, f)) + val g = Async.either(t, f).awaitResult g match case Left(_) => f.cancel() @@ -44,7 +44,7 @@ class TimerTest extends munit.FunSuite { sleep(1000) touched = true ) - Async.await(t) + t.await assert(!touched) sleep(2000) assert(!touched) From 7e3b0f7c8d235d9953cd308b195408ce10ac4583 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Fri, 8 Dec 2023 23:48:05 +0100 Subject: [PATCH 02/11] use more efficient alt/altCancel --- shared/src/main/scala/async/futures.scala | 106 +++++++----------- shared/src/test/scala/ChannelBehavior.scala | 4 +- shared/src/test/scala/FutureBehavior.scala | 92 +-------------- .../src/test/scala/TaskScheduleBehavior.scala | 2 +- 4 files changed, 42 insertions(+), 162 deletions(-) diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index b6494739..1f5e88c3 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -52,7 +52,6 @@ object Future: def cancel(): Unit = cancelRequest = true - complete(Failure(new CancellationException)) /** Complete future with result. If future was cancelled in the meantime, return a CancellationException failure * instead. Note: @uncheckedVariance is safe here since `complete` is called from only two places: @@ -201,45 +200,56 @@ object Future: /** Parallel composition of two futures. If both futures succeed, succeed with their values in a pair. Otherwise, * fail with the failure that was returned first. */ - def zip[U](f2: Future[U])(using Async): Future[(T, U)] = Future: - Async.either(f1, f2).awaitResult match - case Left(Success(x1)) => (x1, f2.await) - case Right(Success(x2)) => (f1.await, x2) - case Left(Failure(ex)) => throw ex - case Right(Failure(ex)) => throw ex + def zip[U](f2: Future[U]): Future[(T, U)] = + Future.withResolver: r => + Async + .either(f1, f2) + .onComplete(Listener { (v, _) => + v match + case Left(Success(x1)) => + f2.onComplete(Listener { (x2, _) => r.complete(x2.map((x1, _))) }) + case Right(Success(x2)) => + f1.onComplete(Listener { (x1, _) => r.complete(x1.map((_, x2))) }) + case Left(Failure(ex)) => r.reject(ex) + case Right(Failure(ex)) => r.reject(ex) + }) /** Parallel composition of tuples of futures. Future.Success(EmptyTuple) might be treated as Nil. */ - def *:[U <: Tuple](f2: Future[U])(using Async): Future[T *: U] = Future: - Async.either(f1, f2).awaitResult match - case Left(Success(x1)) => x1 *: f2.await - case Right(Success(x2)) => f1.await *: x2 - case Left(Failure(ex)) => throw ex - case Right(Failure(ex)) => throw ex + def *:[U <: Tuple](f2: Future[U]): Future[T *: U] = Future.withResolver: r => + Async + .either(f1, f2) + .onComplete(Listener { (v, _) => + v match + case Left(Success(x1)) => + f2.onComplete(Listener { (x2, _) => r.complete(x2.map(x1 *: _)) }) + case Right(Success(x2)) => + f1.onComplete(Listener { (x1, _) => r.complete(x1.map(_ *: x2)) }) + case Left(Failure(ex)) => r.reject(ex) + case Right(Failure(ex)) => r.reject(ex) + }) /** Alternative parallel composition of this task with `other` task. If either task succeeds, succeed with the * success that was returned first. Otherwise, fail with the failure that was returned last. */ - def alt(f2: Future[T])(using Async): Future[T] = Future: - Async.either(f1, f2).awaitResult match - case Left(Success(x1)) => x1 - case Right(Success(x2)) => x2 - case Left(_: Failure[?]) => f2.await - case Right(_: Failure[?]) => f1.await + def alt(f2: Future[T]): Future[T] = altImpl(false)(f2) /** Like `alt` but the slower future is cancelled. If either task succeeds, succeed with the success that was * returned first and the other is cancelled. Otherwise, fail with the failure that was returned last. */ - def altWithCancel(f2: Future[T])(using Async): Future[T] = Future: - Async.either(f1, f2).awaitResult match - case Left(Success(x1)) => - f2.cancel() - x1 - case Right(Success(x2)) => - f1.cancel() - x2 - case Left(_: Failure[?]) => f2.await - case Right(_: Failure[?]) => f1.await + def altWithCancel(f2: Future[T]): Future[T] = altImpl(true)(f2) + + inline def altImpl(inline withCancel: Boolean)(f2: Future[T]): Future[T] = Future.withResolver: r => + Async + .raceWithOrigin(f1, f2) + .onComplete(Listener { case ((v, which), _) => + v match + case Success(value) => + inline if withCancel then (if which == f1 then f2 else f1).cancel() + r.resolve(value) + case Failure(_) => + (if which == f1 then f2 else f1).onComplete(Listener((v, _) => r.complete(v))) + }) end extension @@ -442,44 +452,6 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T): end Task -private def altAndAltCImplementation[T](shouldCancel: Boolean, futures: Future[T]*)(using Async): Future[T] = Future[T]: - val fs: Seq[Future[(Try[T], Int)]] = futures.zipWithIndex.map({ (f, i) => - Future: - try (Success(f.await), i) - catch case e => (Failure(e), i) - }) - - @tailrec - def helper(failed: Int, fs: Seq[(Future[(Try[T], Int)], Int)]): Try[T] = - Async.race(fs.map(_._1)*).awaitResult match - case Success((Success(x), i)) => - if (shouldCancel) { - for ((f, j) <- futures.zipWithIndex) { - if (j != i) f.cancel() - } - } - Success(x) - case Success((Failure(e), i)) => - if (failed + 1 == futures.length) - Failure(e) - else - helper(failed + 1, fs.filter({ case (_, j) => j != i })) - case _ => assert(false) - - helper(0, fs.zipWithIndex).get - -/** `alt` defined for multiple futures, not only two. If either task succeeds, succeed with the success that was - * returned first. Otherwise, fail with the failure that was returned last. - */ -def alt[T](futures: Future[T]*)(using Async): Future[T] = - altAndAltCImplementation(false, futures*) - -/** `altC` defined for multiple futures, not only two. If either task succeeds, succeed with the success that was - * returned first and cancel all other tasks. Otherwise, fail with the failure that was returned last. - */ -def altC[T](futures: Future[T]*)(using Async): Future[T] = - altAndAltCImplementation(true, futures*) - def uninterruptible[T](body: Async ?=> T)(using ac: Async): T = val tracker = Cancellable.Tracking().link() diff --git a/shared/src/test/scala/ChannelBehavior.scala b/shared/src/test/scala/ChannelBehavior.scala index 59f445a6..f4e13b84 100644 --- a/shared/src/test/scala/ChannelBehavior.scala +++ b/shared/src/test/scala/ChannelBehavior.scala @@ -7,9 +7,7 @@ import gears.async.{ SyncChannel, Task, TaskSchedule, - UnboundedChannel, - alt, - altC + UnboundedChannel } import gears.async.default.given import gears.async.AsyncOperations.* diff --git a/shared/src/test/scala/FutureBehavior.scala b/shared/src/test/scala/FutureBehavior.scala index a2f49e15..f32ae266 100644 --- a/shared/src/test/scala/FutureBehavior.scala +++ b/shared/src/test/scala/FutureBehavior.scala @@ -1,4 +1,4 @@ -import gears.async.{Async, Future, Task, TaskSchedule, alt, altC, uninterruptible} +import gears.async.{Async, Future, Task, TaskSchedule, uninterruptible} import gears.async.default.given import gears.async.Future.{*:, Promise, zip} import gears.async.AsyncOperations.* @@ -114,32 +114,6 @@ class FutureBehavior extends munit.FunSuite { assertEquals(touched, 0) } - test("altC of multiple futures") { - Async.blocking { - var touched = java.util.concurrent.atomic.AtomicInteger(0) - alt( - Future { - sleep(100) - touched.incrementAndGet() - }, - Future { - sleep(100) - touched.incrementAndGet() - }, - Future { - 5 - } - ).awaitResult - sleep(200) - assertEquals(touched.get(), 2) - } - Async.blocking: - var touched = 0 - altC(Future { sleep(100); touched += 1 }, Future { sleep(100); touched += 1 }, Future { 5 }).awaitResult - sleep(200) - assertEquals(touched, 0) - } - test("zip") { Async.blocking: val error = new AssertionError() @@ -225,25 +199,6 @@ class FutureBehavior extends munit.FunSuite { assertEquals(zombieModifiedThis, true) } - test("n-ary alt") { - Async.blocking: - assert( - Set(10, 20, 30).contains( - alt( - Future { - 10 - }, - Future { - 20 - }, - Future { - 30 - } - ).await - ) - ) - } - test("zip on tuples with EmptyTuple") { Async.blocking: val z1 = Future { sleep(500); 10 } *: Future { sleep(10); 222 } *: Future { sleep(150); 333 } *: Future { @@ -358,51 +313,6 @@ class FutureBehavior extends munit.FunSuite { assertEquals(touched2, false) } - test("n-ary alt first success") { - Async.blocking: - for (i <- 1 to 20) - assertEquals( - alt( - Future { - sleep(Random.between(200, 300)); 10000 * i + 111 - }, - Future { - sleep(Random.between(200, 300)); 10000 * i + 222 - }, - Future { - sleep(Random.between(30, 50)); 10000 * i + 333 - } - ).awaitResult, - Success(10000 * i + 333) - ) - } - - test("n-ary alt last failure") { - Async.blocking: - for (_ <- 1 to 20) - val e1 = AssertionError(111) - val e2 = AssertionError(211) - val e3 = AssertionError(311) - - assertEquals( - alt( - Future { - sleep(Random.between(0, 250)); - throw e1 - }, - Future { - sleep(Random.between(500, 1000)); - throw e2 - }, - Future { - sleep(Random.between(0, 250)); - throw e3 - } - ).awaitResult, - Failure(e2) - ) - } - test("future collector") { Async.blocking: val range = (0 to 10) diff --git a/shared/src/test/scala/TaskScheduleBehavior.scala b/shared/src/test/scala/TaskScheduleBehavior.scala index 5fb93cb1..e7c2b71d 100644 --- a/shared/src/test/scala/TaskScheduleBehavior.scala +++ b/shared/src/test/scala/TaskScheduleBehavior.scala @@ -1,4 +1,4 @@ -import gears.async.{Async, Future, Task, TaskSchedule, alt} +import gears.async.{Async, Future, Task, TaskSchedule} import gears.async.default.given import Future.{*:, zip} From 81b67f7a246eff79792af30acc6407f8128df857 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Sat, 9 Dec 2023 00:08:27 +0100 Subject: [PATCH 03/11] Slight improvement of the tests --- shared/src/test/scala/FutureBehavior.scala | 11 ++++++++++- .../test/scala/{Timer.scala => TimerBehavior.scala} | 12 ++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) rename shared/src/test/scala/{Timer.scala => TimerBehavior.scala} (77%) diff --git a/shared/src/test/scala/FutureBehavior.scala b/shared/src/test/scala/FutureBehavior.scala index f32ae266..d800f62c 100644 --- a/shared/src/test/scala/FutureBehavior.scala +++ b/shared/src/test/scala/FutureBehavior.scala @@ -287,14 +287,23 @@ class FutureBehavior extends munit.FunSuite { test("Promise can be cancelled") { Async.blocking: val p = Promise[Int]() - p.complete(Success(10)) val f = p.future f.cancel() + p.complete(Success(10)) f.awaitResult match case Failure(ex) if ex.isInstanceOf[CancellationException] => () case _ => assert(false) } + test("Promise can't be cancelled after completion") { + Async.blocking: + val p = Promise[Int]() + p.complete(Success(10)) + val f = p.future + f.cancel() + assertEquals(f.await, 10) + } + test("Nesting of cancellations") { Async.blocking: var touched1 = false diff --git a/shared/src/test/scala/Timer.scala b/shared/src/test/scala/TimerBehavior.scala similarity index 77% rename from shared/src/test/scala/Timer.scala rename to shared/src/test/scala/TimerBehavior.scala index 7934ef0f..d124833e 100644 --- a/shared/src/test/scala/Timer.scala +++ b/shared/src/test/scala/TimerBehavior.scala @@ -5,7 +5,7 @@ import scala.util.{Success, Failure} import java.util.concurrent.TimeoutException import java.util.concurrent.CancellationException -class TimerTest extends munit.FunSuite { +class TimerBehavior extends munit.FunSuite { import gears.async.default.given test("sleeping does sleep") { @@ -16,14 +16,14 @@ class TimerTest extends munit.FunSuite { assert(now2 - now1 > 150, now2 - now1) } - test("TimerSleep1Second") { + test("timer does sleep") { Async.blocking: val timer = Timer(1.second) Future { timer.run() } assert(timer.src.awaitResult == timer.TimerEvent.Tick) } - def timeoutCancellableFuture[T](d: Duration, f: Future[T])(using Async, AsyncOperations): Future[T] = + def `cancel future after timeout`[T](d: Duration, f: Future[T])(using Async, AsyncOperations): Future[T] = val t = Future { sleep(d.toMillis) } Future: val g = Async.either(t, f).awaitResult @@ -35,16 +35,16 @@ class TimerTest extends munit.FunSuite { t.cancel() v.get - test("testTimeoutFuture") { + test("racing with a sleeping future") { var touched = false Async.blocking: - val t = timeoutCancellableFuture( + val t = `cancel future after timeout`( 250.millis, Future: sleep(1000) touched = true ) - t.await + assert(t.awaitResult.isFailure) assert(!touched) sleep(2000) assert(!touched) From 05f8e8b97b6cd86a3beb2772fda80fca7a5e40fd Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Sat, 9 Dec 2023 00:49:06 +0100 Subject: [PATCH 04/11] Guarantee that future cancellation is done once --- shared/src/main/scala/async/futures.scala | 33 +++++++++++++--------- shared/src/test/scala/FutureBehavior.scala | 20 +++++++++++++ 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index 1f5e88c3..2c69989a 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -6,13 +6,14 @@ import AsyncOperations.sleep import scala.collection.mutable import mutable.ListBuffer +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.CancellationException import scala.compiletime.uninitialized import scala.util.{Failure, Success, Try} import scala.annotation.unchecked.uncheckedVariance -import java.util.concurrent.CancellationException import scala.annotation.tailrec import scala.util -import java.util.concurrent.atomic.AtomicLong import scala.util.control.NonFatal /** A cancellable future that can suspend waiting for other asynchronous sources @@ -29,7 +30,7 @@ object Future: private class CoreFuture[+T] extends Future[T]: @volatile protected var hasCompleted: Boolean = false - protected var cancelRequest = false + protected var cancelRequest = AtomicBoolean(false) private var result: Try[T] = uninitialized // guaranteed to be set if hasCompleted = true private val waiting: mutable.Set[Listener[Try[T]]] = mutable.Set() @@ -51,7 +52,11 @@ object Future: // Cancellable method implementations def cancel(): Unit = - cancelRequest = true + setCancelled() + + /** Sets the cancellation state and returns `true` if the future has not been completed and cancelled before. */ + protected def setCancelled(): Boolean = + !hasCompleted && cancelRequest.compareAndSet(false, true) /** Complete future with result. If future was cancelled in the meantime, return a CancellationException failure * instead. Note: @uncheckedVariance is safe here since `complete` is called from only two places: @@ -80,7 +85,7 @@ object Future: private var innerGroup: CompletionGroup = CompletionGroup() private def checkCancellation(): Unit = - if cancelRequest then throw new CancellationException() + if cancelRequest.get() then throw new CancellationException() private class FutureAsync(val group: CompletionGroup)(using label: ac.support.Label[Unit]) extends Async(using ac.support, ac.scheduler): @@ -123,9 +128,7 @@ object Future: override def withGroup(group: CompletionGroup) = FutureAsync(group) - override def cancel(): Unit = - super.cancel() - this.innerGroup.cancel() + override def cancel(): Unit = if setCancelled() then this.innerGroup.cancel() link() ac.support.scheduleBoundary: @@ -180,10 +183,10 @@ object Future: override def complete(result: Try[T]): Unit = super.complete(result) override def cancel(): Unit = - super.cancel() - cancelHandle() - reject(CancellationException()) - this.unlink() + if setCancelled() then + cancelHandle() + reject(CancellationException()) + this.unlink() } body(future) future @@ -256,7 +259,10 @@ object Future: /** A promise defines a future that is be completed via the promise's `complete` method. */ class Promise[T]: - private val myFuture = CoreFuture[T]() + private val myFuture = new CoreFuture[T]: + fut => + override def cancel(): Unit = + if setCancelled() then fut.complete(Failure(new CancellationException())) /** The future defined by this promise */ val future: Future[T] = myFuture @@ -265,7 +271,6 @@ object Future: * `CancellationException` failure instead. */ def complete(result: Try[T]): Unit = myFuture.complete(result) - end Promise /** Collects a list of futures into a channel of futures, arriving as they finish. */ diff --git a/shared/src/test/scala/FutureBehavior.scala b/shared/src/test/scala/FutureBehavior.scala index d800f62c..ce8d8ee6 100644 --- a/shared/src/test/scala/FutureBehavior.scala +++ b/shared/src/test/scala/FutureBehavior.scala @@ -9,6 +9,7 @@ import scala.concurrent.ExecutionContext import scala.util.{Failure, Success, Try} import scala.util.Random import scala.collection.mutable.Set +import java.util.concurrent.atomic.AtomicInteger class FutureBehavior extends munit.FunSuite { given ExecutionContext = ExecutionContext.global @@ -304,6 +305,25 @@ class FutureBehavior extends munit.FunSuite { assertEquals(f.await, 10) } + test("Future.withResolver cancel handler is run once") { + val num = AtomicInteger(0) + val fut = Future.withResolver { _.onCancel { () => num.incrementAndGet() } } + Async.blocking: + (1 to 20) + .map(_ => Future { fut.cancel() }) + .awaitAll + assertEquals(num.get(), 1) + } + + test("Future.withResolver cancel handler is not run after being completed") { + val num = AtomicInteger(0) + val fut = Future.withResolver[Int]: r => + r.onCancel { () => num.incrementAndGet() } + r.resolve(1) + fut.cancel() + assertEquals(num.get(), 0) + } + test("Nesting of cancellations") { Async.blocking: var touched1 = false From 4be93c2af8dc6cee87a4651c50854ba2c1055c7d Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Sun, 10 Dec 2023 19:10:41 +0100 Subject: [PATCH 05/11] Always try to unlink when cancelling a withResolver promise --- shared/src/main/scala/async/futures.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index 2c69989a..a241b6db 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -186,7 +186,7 @@ object Future: if setCancelled() then cancelHandle() reject(CancellationException()) - this.unlink() + this.unlink() } body(future) future From ccba434334b5794cd105b5bf924d7f5283fa103f Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Sun, 10 Dec 2023 19:11:19 +0100 Subject: [PATCH 06/11] Remove redundant polling check in addListener --- shared/src/main/scala/async/futures.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index a241b6db..9ace1593 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -43,8 +43,7 @@ object Future: else false def addListener(k: Listener[Try[T]]): Unit = synchronized: - if hasCompleted then k.completeNow(result, this) - else waiting += k + waiting += k def dropListener(k: Listener[Try[T]]): Unit = synchronized: waiting -= k From a07afc2310245bf115070bed390451562eede258 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Sun, 10 Dec 2023 19:50:10 +0100 Subject: [PATCH 07/11] Automatically unlink futures when they are completed --- shared/src/main/scala/async/futures.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index 9ace1593..4c510424 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -53,6 +53,13 @@ object Future: def cancel(): Unit = setCancelled() + override def link(group: CompletionGroup): this.type = + // though hasCompleted is accessible without "synchronized", + // we want it not to be run while the future was trying to complete. + synchronized: + if group == CompletionGroup.Unlinked || !hasCompleted then super.link(group) + else this + /** Sets the cancellation state and returns `true` if the future has not been completed and cancelled before. */ protected def setCancelled(): Boolean = !hasCompleted && cancelRequest.compareAndSet(false, true) @@ -72,6 +79,7 @@ object Future: hasCompleted = true val ws = waiting.toList waiting.clear() + unlink() ws for listener <- toNotify do listener.completeNow(result, this) @@ -138,7 +146,6 @@ object Future: }).recoverWith { case _: InterruptedException | _: CancellationException => Failure(new CancellationException()) }))(using FutureAsync(CompletionGroup.Unlinked)) - signalCompletion()(using ac) end RunnableFuture @@ -185,7 +192,6 @@ object Future: if setCancelled() then cancelHandle() reject(CancellationException()) - this.unlink() } body(future) future From d3dc6d3865dcb2c11b72ed3386f4195a38d8c4da Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Sun, 10 Dec 2023 19:51:19 +0100 Subject: [PATCH 08/11] Address review comments --- shared/src/main/scala/async/futures.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index 4c510424..ca70829d 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -7,7 +7,6 @@ import scala.collection.mutable import mutable.ListBuffer import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.CancellationException import scala.compiletime.uninitialized import scala.util.{Failure, Success, Try} @@ -61,7 +60,7 @@ object Future: else this /** Sets the cancellation state and returns `true` if the future has not been completed and cancelled before. */ - protected def setCancelled(): Boolean = + protected final def setCancelled(): Boolean = !hasCompleted && cancelRequest.compareAndSet(false, true) /** Complete future with result. If future was cancelled in the meantime, return a CancellationException failure From 21aa64e4ba87ff76e4f0cec4c4928dbaf41ec10d Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Sun, 10 Dec 2023 20:03:17 +0100 Subject: [PATCH 09/11] Future should cancel its group when the main body is completed --- shared/src/main/scala/async/futures.scala | 7 ++++--- shared/src/test/scala/FutureBehavior.scala | 13 +++++++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index ca70829d..6b60614c 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -56,7 +56,7 @@ object Future: // though hasCompleted is accessible without "synchronized", // we want it not to be run while the future was trying to complete. synchronized: - if group == CompletionGroup.Unlinked || !hasCompleted then super.link(group) + if !hasCompleted || group == CompletionGroup.Unlinked then super.link(group) else this /** Sets the cancellation state and returns `true` if the future has not been completed and cancelled before. */ @@ -138,13 +138,14 @@ object Future: link() ac.support.scheduleBoundary: - Async.withNewCompletionGroup(innerGroup)(complete(Try({ + val result = Async.withNewCompletionGroup(innerGroup)(Try({ val r = body checkCancellation() r }).recoverWith { case _: InterruptedException | _: CancellationException => Failure(new CancellationException()) - }))(using FutureAsync(CompletionGroup.Unlinked)) + })(using FutureAsync(CompletionGroup.Unlinked)) + complete(result) end RunnableFuture diff --git a/shared/src/test/scala/FutureBehavior.scala b/shared/src/test/scala/FutureBehavior.scala index ce8d8ee6..2ef0bcc7 100644 --- a/shared/src/test/scala/FutureBehavior.scala +++ b/shared/src/test/scala/FutureBehavior.scala @@ -184,6 +184,19 @@ class FutureBehavior extends munit.FunSuite { case _ => assert(false) } + test("future should cancel its group when the main body is completed") { + Async.blocking: + var touched = false + val fut = Future: + Future: + sleep(2000) + touched = true + 10 + assertEquals(fut.await, 10) + sleep(2000) + assertEquals(touched, false) + } + test("zombie threads exist and run to completion after the Async.blocking barrier") { var zombieModifiedThis = false Async.blocking: From 59c518f2ba735263e531b73c47ef22bc53a31b84 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Sun, 10 Dec 2023 20:05:09 +0100 Subject: [PATCH 10/11] Make sure test runs without racing on same sleep time --- shared/src/test/scala/FutureBehavior.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/shared/src/test/scala/FutureBehavior.scala b/shared/src/test/scala/FutureBehavior.scala index 2ef0bcc7..0b0813d2 100644 --- a/shared/src/test/scala/FutureBehavior.scala +++ b/shared/src/test/scala/FutureBehavior.scala @@ -189,11 +189,12 @@ class FutureBehavior extends munit.FunSuite { var touched = false val fut = Future: Future: - sleep(2000) + sleep(1000) touched = true + sleep(500) 10 assertEquals(fut.await, 10) - sleep(2000) + sleep(1000) assertEquals(touched, false) } From e7a94a8790f0606cd1b0030c87af0b3b5af20965 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20M=C3=BCller?= Date: Sun, 10 Dec 2023 20:06:37 +0100 Subject: [PATCH 11/11] Drop handleCompletion from CompletionGroup --- shared/src/main/scala/async/Async.scala | 9 +-------- shared/src/main/scala/async/Cancellable.scala | 6 ------ shared/src/main/scala/async/CompletionGroup.scala | 6 ++---- 3 files changed, 3 insertions(+), 18 deletions(-) diff --git a/shared/src/main/scala/async/Async.scala b/shared/src/main/scala/async/Async.scala index 87fb45b6..a3c0c720 100644 --- a/shared/src/main/scala/async/Async.scala +++ b/shared/src/main/scala/async/Async.scala @@ -58,14 +58,7 @@ object Async: inline def current(using async: Async): Async = async def group[T](body: Async ?=> T)(using async: Async): T = - withNewCompletionGroup(CompletionGroup(async.group.handleCompletion).link())(body) - - def withCompletionHandler[T](handler: Cancellable => Async ?=> Unit)(body: Async ?=> T)(using async: Async): T = - val combined = (c: Cancellable) => - (async: Async) ?=> - handler(c) - async.group.handleCompletion(c) - withNewCompletionGroup(CompletionGroup(combined).link())(body) + withNewCompletionGroup(CompletionGroup().link())(body) /** Runs a body within another completion group. When the body returns, the group is cancelled and its completion * awaited with the `Unlinked` group. diff --git a/shared/src/main/scala/async/Cancellable.scala b/shared/src/main/scala/async/Cancellable.scala index 218385e8..59b3bf16 100644 --- a/shared/src/main/scala/async/Cancellable.scala +++ b/shared/src/main/scala/async/Cancellable.scala @@ -25,12 +25,6 @@ trait Cancellable: def unlink(): this.type = link(CompletionGroup.Unlinked) - /** Signal completion of this cancellable to its group. */ - def signalCompletion()(using Async): this.type = - this.group.handleCompletion(this) - this.unlink() - this - end Cancellable object Cancellable: diff --git a/shared/src/main/scala/async/CompletionGroup.scala b/shared/src/main/scala/async/CompletionGroup.scala index 760bf53f..3ed56767 100644 --- a/shared/src/main/scala/async/CompletionGroup.scala +++ b/shared/src/main/scala/async/CompletionGroup.scala @@ -5,10 +5,8 @@ import scala.util.Success /** A group of cancellable objects that are completed together. Cancelling the group means cancelling all its * uncompleted members. - * @param handleCompletion - * a function that gets applied to every member when it is completed or cancelled */ -class CompletionGroup(val handleCompletion: Cancellable => Async ?=> Unit = _ => {}) extends Cancellable.Tracking: +class CompletionGroup extends Cancellable.Tracking: private val members: mutable.Set[Cancellable] = mutable.Set() private var canceled: Boolean = false private var cancelWait: Option[Promise[Unit]] = None @@ -26,7 +24,7 @@ class CompletionGroup(val handleCompletion: Cancellable => Async ?=> Unit = _ => synchronized: if members.nonEmpty && cancelWait.isEmpty then cancelWait = Some(Promise()) cancelWait.foreach(cWait => cWait.future.await) - signalCompletion() + unlink() /** Add given member to the members set. If the group has already been cancelled, cancels that member immediately. */ def add(member: Cancellable): Unit =