Skip to content

Commit

Permalink
Merge pull request #24 from natsukagami/more-efficient-future-stuff
Browse files Browse the repository at this point in the history
Renaming `result`/`value` to general `awaitResult`/`await`, along other improvements
  • Loading branch information
natsukagami authored Dec 10, 2023
2 parents 86159db + e7a94a8 commit 8f32c1b
Show file tree
Hide file tree
Showing 19 changed files with 266 additions and 363 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,14 @@ which takes two futures and if they both complete successfully returns their res
extension [T](f1: Future[T])

def zip[U](f2: Future[U])(using Async): Future[(T, U)] = Future:
Async.await(Async.either(f1, f2)) match
Async.either(f1, f2).awaitResult match
case Left(Success(x1)) => (x1, f2.value)
case Right(Success(x2)) => (f1.value, x2)
case Left(Failure(ex)) => throw ex
case Right(Failure(ex)) => throw ex

def alt(f2: Future[T])(using Async): Future[T] = Future:
Async.await(Async.either(f1, f2)) match
Async.either(f1, f2).awaitResult match
case Left(Success(x1)) => x1
case Right(Success(x2)) => x2
case Left(_: Failure[?]) => f2.value
Expand Down
10 changes: 5 additions & 5 deletions jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import scala.concurrent.ExecutionContext
Async.blocking:
val server = Future:
PIOHelper.withSocketUDP(8134): serverSocket =>
val got: DatagramPacket = serverSocket.receive().result.get
val got: DatagramPacket = serverSocket.receive().awaitResult.get
val messageReceived = String(got.getData.slice(0, got.getLength), "UTF-8")
val responseMessage = (messageReceived.toInt + 1).toString.getBytes
serverSocket.send(ByteBuffer.wrap(responseMessage), got.getAddress.toString.substring(1), got.getPort)
Expand All @@ -25,10 +25,10 @@ import scala.concurrent.ExecutionContext
Future:
PIOHelper.withSocketUDP(): clientSocket =>
val data: Array[Byte] = value.toString.getBytes
clientSocket.send(ByteBuffer.wrap(data), "localhost", 8134).result.get
val responseDatagram = clientSocket.receive().result.get
clientSocket.send(ByteBuffer.wrap(data), "localhost", 8134).awaitResult.get
val responseDatagram = clientSocket.receive().awaitResult.get
val messageReceived = String(responseDatagram.getData.slice(0, responseDatagram.getLength), "UTF-8").toInt
println("Sent " + value.toString + " and got " + messageReceived.toString + " in return.")

Async.await(client(100))
Async.await(server)
client(100).await
server.await
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import scala.concurrent.ExecutionContext
given ExecutionContext = ExecutionContext.global
Async.blocking:
PIOHelper.withFile("/home/julian/Desktop/x.txt", StandardOpenOption.READ, StandardOpenOption.WRITE): f =>
Async.await(f.writeString("Hello world! (1)"))
println(Async.await(f.readString(1024)).get)
Async.await(f.writeString("Hello world! (2)"))
println(Async.await(f.readString(1024)).get)
f.writeString("Hello world! (1)").await
println(f.readString(1024).await)
f.writeString("Hello world! (2)").await
println(f.readString(1024).await)
4 changes: 2 additions & 2 deletions jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import scala.concurrent.ExecutionContext
Async.blocking:
PIOHelper.withFile("/home/julian/Desktop/x.txt", StandardOpenOption.READ): f =>
val b = ByteBuffer.allocate(1024)
val retCode = f.read(b).result.get
val retCode = f.read(b).awaitResult.get
assert(retCode >= 0)
val s = StandardCharsets.UTF_8.decode(b.slice(0, retCode)).toString()
println("Read size with read(): " + retCode.toString())
println("Data: " + s)

println("Read with readString():")
println(Async.await(f.readString(1000)).get)
println(f.readString(1000).awaitResult)
24 changes: 12 additions & 12 deletions jvm/src/main/scala/measurements/measureTimes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def measureIterations[T](action: () => T): Int =
Async.blocking:
val f = Future:
var z = 1
f.result
f.awaitResult

println("Thread joins per second: " + (threadJoins / 60))
println("Future joins per second: " + (futureJoins / 60))
Expand All @@ -64,26 +64,26 @@ def measureIterations[T](action: () => T): Int =

val c1: Double = measureIterations: () =>
Async.blocking:
Async.await(Async.race(Future { Thread.sleep(10) }, Future { Thread.sleep(100) }, Future { Thread.sleep(50) }))
Async.await(Async.race(Future { Thread.sleep(50) }, Future { Thread.sleep(10) }, Future { Thread.sleep(100) }))
Async.await(Async.race(Future { Thread.sleep(100) }, Future { Thread.sleep(50) }, Future { Thread.sleep(10) }))
Async.race(Future { Thread.sleep(10) }, Future { Thread.sleep(100) }, Future { Thread.sleep(50) }).await
Async.race(Future { Thread.sleep(50) }, Future { Thread.sleep(10) }, Future { Thread.sleep(100) }).await
Async.race(Future { Thread.sleep(100) }, Future { Thread.sleep(50) }, Future { Thread.sleep(10) }).await

val c2: Double = measureIterations: () =>
Async.blocking:
val f11 = Future { Thread.sleep(10) }
val f12 = Future { Thread.sleep(50) }
val f13 = Future { Thread.sleep(100) }
f11.result
f11.awaitResult

val f21 = Future { Thread.sleep(100) }
val f22 = Future { Thread.sleep(10) }
val f23 = Future { Thread.sleep(50) }
f22.result
f22.awaitResult

val f31 = Future { Thread.sleep(50) }
val f32 = Future { Thread.sleep(100) }
val f33 = Future { Thread.sleep(10) }
f33.result
f33.awaitResult

val c1_seconds_wasted_for_waits = c1 * 0.01
val c1_per_second_adjusted = c1 / 3 / (60 - c1_seconds_wasted_for_waits)
Expand All @@ -105,9 +105,9 @@ def measureIterations[T](action: () => T): Int =

val c1: Double = measureIterations: () =>
Async.blocking:
Async.await(Async.race(Future { Thread.sleep(10) }, Future { Thread.sleep(100) }, Future { Thread.sleep(50) }))
Async.await(Async.race(Future { Thread.sleep(50) }, Future { Thread.sleep(10) }, Future { Thread.sleep(100) }))
Async.await(Async.race(Future { Thread.sleep(100) }, Future { Thread.sleep(50) }, Future { Thread.sleep(10) }))
Async.race(Future { Thread.sleep(10) }, Future { Thread.sleep(100) }, Future { Thread.sleep(50) }).await
Async.race(Future { Thread.sleep(50) }, Future { Thread.sleep(10) }, Future { Thread.sleep(100) }).await
Async.race(Future { Thread.sleep(100) }, Future { Thread.sleep(50) }, Future { Thread.sleep(10) }).await

val c2: Double = measureIterations: () =>
@volatile var i1 = true
Expand Down Expand Up @@ -433,7 +433,7 @@ def measureRunTimes[T](action: () => T): TimeMeasurementResult =
dataAlmostJson.append(measure("PosixLikeIO", timesInner = if size < 100 then 100 else 10): () =>
Async.blocking:
PIOHelper.withFile("/tmp/FIO/x.txt", StandardOpenOption.CREATE, StandardOpenOption.WRITE): f =>
f.writeString(bigString.substring(0, size)).result
f.writeString(bigString.substring(0, size)).awaitResult
)
println("done 1")

Expand Down Expand Up @@ -466,7 +466,7 @@ def measureRunTimes[T](action: () => T): TimeMeasurementResult =
dataAlmostJson.append(measure("PosixLikeIO", timesInner = if size < 100 then 100 else 10): () =>
Async.blocking:
PIOHelper.withFile("/tmp/FIO/x.txt", StandardOpenOption.READ): f =>
f.readString(size).result
f.readString(size).awaitResult
)
println("done 1")

Expand Down
2 changes: 1 addition & 1 deletion jvm/src/test/scala/CancellationBehavior.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ class JVMCancellationBehavior extends munit.FunSuite:
val f = Future:
Thread.sleep(5000)
1
f.result
f.awaitResult
2 changes: 1 addition & 1 deletion native/src/main/scala/async/ForkJoinSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class SuspendExecutorWithSleep(exec: ExecutionContext)
val cancellable = schedule(millis.millis, () => resolver.resolve(()))
resolver.onCancel(cancellable.cancel)
.link()
.value
.await
}

class ForkJoinSupport extends SuspendExecutorWithSleep(new ForkJoinPool())
24 changes: 10 additions & 14 deletions shared/src/main/scala/async/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,8 @@ object Async:
/** The currently executing Async context */
inline def current(using async: Async): Async = async

/** Await source result in currently executing Async context */
inline def await[T](src: Source[T])(using async: Async): T = async.await(src)

def group[T](body: Async ?=> T)(using async: Async): T =
withNewCompletionGroup(CompletionGroup(async.group.handleCompletion).link())(body)

def withCompletionHandler[T](handler: Cancellable => Async ?=> Unit)(body: Async ?=> T)(using async: Async): T =
val combined = (c: Cancellable) =>
(async: Async) ?=>
handler(c)
async.group.handleCompletion(c)
withNewCompletionGroup(CompletionGroup(combined).link())(body)
withNewCompletionGroup(CompletionGroup().link())(body)

/** Runs a body within another completion group. When the body returns, the group is cancelled and its completion
* awaited with the `Unlinked` group.
Expand Down Expand Up @@ -114,10 +104,16 @@ object Async:
resultOpt

/** Utility method for direct waiting with `Async`. */
def await(using Async) = Async.await(this)

final def awaitResult(using ac: Async) = ac.await(this)
end Source

extension [T](src: Source[scala.util.Try[T]])
/** Waits for an item to arrive from the source, then automatically unwraps it. */
inline def await(using Async) = src.awaitResult.get
extension [E, T](src: Source[Either[E, T]])
/** Waits for an item to arrive from the source, then automatically unwraps it. */
inline def await(using Async) = src.awaitResult.right.get

/** An original source has a standard definition of `onComplete` in terms of `poll` and `addListener`. Implementations
* should be the resource owner to handle listener queue and completion using an object monitor on the instance.
*/
Expand Down Expand Up @@ -263,7 +259,7 @@ object Async:
* [[select]] is run in the same async context as the calling context of [[select]].
*/
def select[T](cases: SelectCase[T]*)(using Async) =
val (input, which) = raceWithOrigin(cases.map(_._1)*).await
val (input, which) = raceWithOrigin(cases.map(_._1)*).awaitResult
val (_, handler) = cases.find(_._1 == which).get
handler.asInstanceOf[input.type => T](input)

Expand Down
6 changes: 0 additions & 6 deletions shared/src/main/scala/async/Cancellable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@ trait Cancellable:
def unlink(): this.type =
link(CompletionGroup.Unlinked)

/** Signal completion of this cancellable to its group. */
def signalCompletion()(using Async): this.type =
this.group.handleCompletion(this)
this.unlink()
this

end Cancellable

object Cancellable:
Expand Down
8 changes: 3 additions & 5 deletions shared/src/main/scala/async/CompletionGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import scala.util.Success

/** A group of cancellable objects that are completed together. Cancelling the group means cancelling all its
* uncompleted members.
* @param handleCompletion
* a function that gets applied to every member when it is completed or cancelled
*/
class CompletionGroup(val handleCompletion: Cancellable => Async ?=> Unit = _ => {}) extends Cancellable.Tracking:
class CompletionGroup extends Cancellable.Tracking:
private val members: mutable.Set[Cancellable] = mutable.Set()
private var canceled: Boolean = false
private var cancelWait: Option[Promise[Unit]] = None
Expand All @@ -25,8 +23,8 @@ class CompletionGroup(val handleCompletion: Cancellable => Async ?=> Unit = _ =>
private[async] def waitCompletion()(using Async): Unit =
synchronized:
if members.nonEmpty && cancelWait.isEmpty then cancelWait = Some(Promise())
cancelWait.foreach(cWait => Async.await(cWait.future))
signalCompletion()
cancelWait.foreach(cWait => cWait.future.await)
unlink()

/** Add given member to the members set. If the group has already been cancelled, cancels that member immediately. */
def add(member: Cancellable): Unit =
Expand Down
5 changes: 2 additions & 3 deletions shared/src/main/scala/async/channels.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import scala.collection.mutable
import mutable.{ArrayBuffer, ListBuffer}

import scala.util.{Failure, Success, Try}
import Async.await

import scala.util.control.Breaks.{break, breakable}
import gears.async.Async.Source
Expand All @@ -26,7 +25,7 @@ trait SendableChannel[-T]:
/** Send [[x]] over the channel, blocking (asynchronously with [[Async]]) until the item has been sent or, if the
* channel is buffered, queued. Throws [[ChannelClosedException]] if the channel was closed.
*/
def send(x: T)(using Async): Unit = Async.await(sendSource(x)) match
def send(x: T)(using Async): Unit = sendSource(x).awaitResult match
case Right(_) => ()
case Left(_) => throw ChannelClosedException()
end SendableChannel
Expand All @@ -45,7 +44,7 @@ trait ReadableChannel[+T]:
/** Read an item from the channel, blocking (asynchronously with [[Async]]) until the item has been received. Returns
* `Failure(ChannelClosedException)` if the channel was closed.
*/
def read()(using Async): Res[T] = await(readSource)
def read()(using Async): Res[T] = readSource.awaitResult
end ReadableChannel

/** A generic channel that can be sent to, received from and closed. */
Expand Down
Loading

0 comments on commit 8f32c1b

Please sign in to comment.