diff --git a/README.md b/README.md index 1dfaa600..1a007506 100644 --- a/README.md +++ b/README.md @@ -23,13 +23,13 @@ the project! To test Ox, use the following dependency, using either [sbt](https://www.scala-sbt.org): ```scala -"com.softwaremill.ox" %% "core" % "0.4.0" +"com.softwaremill.ox" %% "core" % "0.5.0" ``` Or [scala-cli](https://scala-cli.virtuslab.org): ```scala -//> using dep "com.softwaremill.ox::core:0.4.0" +//> using dep "com.softwaremill.ox::core:0.5.0" ``` Documentation is available at [https://ox.softwaremill.com](https://ox.softwaremill.com), ScalaDocs can be browsed at [https://javadoc.io](https://www.javadoc.io/doc/com.softwaremill.ox). diff --git a/generated-doc/out/channels/actors.md b/generated-doc/out/actors.md similarity index 96% rename from generated-doc/out/channels/actors.md rename to generated-doc/out/actors.md index 19c6aa91..022fbf5d 100644 --- a/generated-doc/out/channels/actors.md +++ b/generated-doc/out/actors.md @@ -4,8 +4,8 @@ Actors in Ox enable invoking methods on an object serially, keeping the behavior invocation. That is, even though invocations may happen from multiple threads, they are guaranteed to happen one after the other, not concurrently. -Actor invocations are fully type-safe, with minimal overhead. They use [channels](index.md) and -[scopes](../structured-concurrency/fork-join.md) behind the scenes. +Actor invocations are fully type-safe, with minimal overhead. They use [channels](streaming/channels.md) and +[scopes](structured-concurrency/fork-join.md) behind the scenes. One of the use-cases is integrating with external APIs, which are represented by an object containing mutable state. Such integrations must be protected and cannot be accessed by multiple threads concurrently. @@ -37,12 +37,11 @@ class Stateful: counter += delta counter -supervised { +supervised: val ref = Actor.create(new Stateful) ref.ask(_.increment(5)) // blocks until the invocation completes ref.ask(_.increment(4)) // returns 9 -} ``` If a non-fatal exception is thrown by the invocation, it's propagated to the caller, and the actor continues processing @@ -75,7 +74,7 @@ class Stateful: def work(howHard: Int): Unit = throw new RuntimeException("boom!") def close(): Unit = println("Closing") -supervised { +supervised: val ref = Actor.create(new Stateful, Some(_.close())) // fire-and-forget, exception causes the scope to close @@ -83,5 +82,4 @@ supervised { // preventing the scope from closing never -} ``` diff --git a/generated-doc/out/basics/direct-style.md b/generated-doc/out/basics/direct-style.md index f01876c1..e32a4b9a 100644 --- a/generated-doc/out/basics/direct-style.md +++ b/generated-doc/out/basics/direct-style.md @@ -41,18 +41,18 @@ of use and performance of imperative programming. This is a departure from a pur [cats-effect](https://github.com/typelevel/cats-effect) or [ZIO](https://zio.dev), in favor of running effectful computations imperatively. -Note, however, that in all other aspects direct style Scala remains functional: using immutable data structures, +Note, however, that in all other aspects direct-style Scala remains functional: using immutable data structures, higher order functions, typeclasses, restricting effects, separating code and data, favoring function composition, etc. Ox uses the above mentioned virtual threads in Java 21 to implement a safe approach to concurrency, combined with Go-like channels for inter-thread communication. Moreover, Ox supports and proposes an approach to error handling, along with multiple utility functions providing safe resiliency, resource management, scheduling and others. -The overarching goal of Ox is enabling safe direct style programming using the power of the Scala 3 language. While +The overarching goal of Ox is enabling safe direct-style programming using the power of the Scala 3 language. While still in its early days, a lot of functionality is available in ox today! -## Other direct style Scala projects +## Other direct-style Scala projects -The wider goal of direct style Scala is enabling teams to deliver working software quickly and with confidence. Our +The wider goal of direct-style Scala is enabling teams to deliver working software quickly and with confidence. Our other projects, including [sttp client](https://sttp.softwaremill.com) and [tapir](https://tapir.softwaremill.com), also include integrations directly tailored towards direct style. diff --git a/generated-doc/out/basics/error-handling.md b/generated-doc/out/basics/error-handling.md index 44a520d7..f1180b66 100644 --- a/generated-doc/out/basics/error-handling.md +++ b/generated-doc/out/basics/error-handling.md @@ -9,7 +9,7 @@ Ox uses two channels, through which errors can be signalled: Exceptions are always appropriately handled by computation combinators, such as the high-level concurrency operations [`par`](../high-level-concurrency/par.md) and [`race`](../high-level-concurrency/race.md), as well as by -[scopes](../structured-concurrency/fork-join.md) and [channels](../channels/index.md). +[scopes](../structured-concurrency/fork-join.md) and [streams](../streaming/index.md). The general rule for computation combinators is that using them should throw exactly the same exceptions, as if the provided code was executed without them. That is, no additional exceptions might be thrown, and no exceptions are diff --git a/generated-doc/out/basics/quick-example.md b/generated-doc/out/basics/quick-example.md index 95d3ddc9..a4f61d57 100644 --- a/generated-doc/out/basics/quick-example.md +++ b/generated-doc/out/basics/quick-example.md @@ -7,6 +7,7 @@ variants and functionalities! import ox.* import ox.either.ok import ox.channels.* +import ox.flow.Flow import ox.resilience.* import ox.scheduling.* import scala.concurrent.duration.* @@ -23,14 +24,13 @@ val result2: Either[Throwable, Int] = either.catching(timeout(1.second)(computat // structured concurrency & supervision supervised { - forkUser { + forkUser: sleep(1.second) println("Hello!") - } - forkUser { + + forkUser: sleep(500.millis) throw new RuntimeException("boom!") - } } // on exception, ends the scope & re-throws @@ -38,12 +38,12 @@ supervised { def computationR: Int = ??? retry(RetryConfig.backoff(3, 100.millis, 5.minutes, Jitter.Equal))(computationR) -// create channels & transform them using high-level operations -supervised { - Source.iterate(0)(_ + 1) // natural numbers - .transform(_.filter(_ % 2 == 0).map(_ + 1).take(10)) - .foreach(n => println(n.toString)) -} +// create a flow & transform using high-level operations +Flow.iterate(0)(_ + 1) // natural numbers + .filter(_ % 2 == 0) + .map(_ + 1) + .take(10) + .runForeach(n => println(n.toString)) // select from a number of channels val c = Channel.rendezvous[Int] diff --git a/generated-doc/out/basics/start-here.md b/generated-doc/out/basics/start-here.md index 26a9c572..b25a853b 100644 --- a/generated-doc/out/basics/start-here.md +++ b/generated-doc/out/basics/start-here.md @@ -4,10 +4,10 @@ ```scala // sbt dependency -"com.softwaremill.ox" %% "core" % "0.4.0" +"com.softwaremill.ox" %% "core" % "0.5.0" // scala-cli dependency -//> using dep com.softwaremill.ox::core:0.4.0 +//> using dep com.softwaremill.ox::core:0.5.0 ``` ## Scope of the Ox project @@ -52,7 +52,7 @@ We offer commercial support for Ox and related technologies, as well as developm * [Two types of futures](https://softwaremill.com/two-types-of-futures/) * [Supervision, Kafka and Java 21: what’s new in Ox](https://softwaremill.com/supervision-kafka-and-java-21-whats-new-in-ox/) * [Designing a (yet another) retry API](https://softwaremill.com/designing-a-yet-another-retry-api/) -* [Handling errors in direct style Scala](https://softwaremill.com/handling-errors-in-direct-style-scala/) +* [Handling errors in direct-style Scala](https://softwaremill.com/handling-errors-in-direct-style-scala/) ## Inspiration & building blocks diff --git a/generated-doc/out/best-practices.md b/generated-doc/out/best-practices.md index f184c988..f8116f48 100644 --- a/generated-doc/out/best-practices.md +++ b/generated-doc/out/best-practices.md @@ -1,6 +1,6 @@ # Best practices -While working on ox and integrating ox into our other open-source projects, we found a couple of patterns and best +While working on Ox and integrating ox into our other open-source projects, we found a couple of patterns and best practices which might be useful for anybody starting their journey with direct-style Scala and ox. ## Make scopes as small as possible @@ -8,7 +8,7 @@ practices which might be useful for anybody starting their journey with direct-s If you end up using concurrency scopes such as `supervised`, make sure that their lifetime is as short as possible. In some cases it might be necessary to start a "global" scope (e.g. for application-wide, long-running tasks), but even if so, don't let the global scope leak to any other parts of your code, and isolate its usage, e.g. using -[actors](channels/actors.md). +[actors](actors.md). For all other tasks, create short-lived scopes, which handle a single request, message from a queue or a single job instance. @@ -16,7 +16,7 @@ instance. ## Integrate with callback-based APIs using channels Callback-based APIs, including "reactive" ones, are by their nature non-structured, and don't play well with -structured concurrency. For such cases, [channels](channels/index.md) are an ideal tool. Sending or receiving to/from +structured concurrency. For such cases, [channels](streaming/channels.md) are an ideal tool. Sending or receiving to/from a channel doesn't require any context, and can be done from any thread. On the other hand, processing the data that is on the channel often involves concurrency and creating thread, which can be then done in a structured way. @@ -30,15 +30,14 @@ of structured concurrency is to localise thread creation as much as possible, an thread as an effect. `using Ox` partially circumvents this guarantee, hence use this with caution, and pay attention not to pass it through several layers of method calls, which might make the code hard to understand. -## Use `mapAsView` instead of `map` +## Use flows instead of channels -The `map` and `filter` channel operations must be run within a concurrency scope, and perform their processing on a -freshly created fork. While this still performs well, as creating virtual threads & channels is cheap, it might incur -an unnecessary overhead. If the mapping/filtering logic doesn't have effects, and isn't blocking, it's often sufficient -to create views of channels. +Transforming channels directly might lead to excessive concurrency, as each transformation typically starts a +background fork, processing the data and sending it to a new channel. While this still performs well, as creating +virtual threads & channels is cheap, it might incur an unnecessary overhead. -In a channel view, the processing logic is run lazily, on the thread that performs the `receive` operation. Channel -views can be created using `.mapAsView`, `.filterAsView`, and `.collectAsView`. +Instead, you can use [flows](streaming/flows.md) and their high-level API, which allows inserting asynchronous +boundaries when necessary, but otherwise runs the subsequent processing stages on the same thread. ## Avoid returning `Fork` diff --git a/generated-doc/out/channels/backpressure.md b/generated-doc/out/channels/backpressure.md deleted file mode 100644 index ad1253c4..00000000 --- a/generated-doc/out/channels/backpressure.md +++ /dev/null @@ -1,4 +0,0 @@ -# Backpressure - -Channels are back-pressured, as the `.send` operation is blocking until there's a receiver thread available, or if -there's enough space in the buffer. The processing space is hence bound by the total size of channel buffers. diff --git a/generated-doc/out/channels/channel-closed.md b/generated-doc/out/channels/channel-closed.md deleted file mode 100644 index 9ff1d1dc..00000000 --- a/generated-doc/out/channels/channel-closed.md +++ /dev/null @@ -1,36 +0,0 @@ -# Handling closed channels - -By default, `Sink.send` and `Source.receive` methods will throw a `ChannelClosedException`, if the channel is already -closed: - -```scala -enum ChannelClosedException(cause: Option[Throwable]) extends Exception(cause.orNull): - case Error(cause: Throwable) extends ChannelClosedException(Some(cause)) - case Done() extends ChannelClosedException(None) -``` - -Alternatively, you can call `Sink.sendSafe` or `Source.receiveSafe`, which return union types: - -```scala -trait Source[+T]: - def receive(): T - def receiveSafe(): T | ChannelClosed - -trait Sink[-T]: - def send(value: T): Unit - def sendSafe(value: T): Unit | ChannelClosed - def done(): Unit - def doneSafe(): Unit | ChannelClosed - def error(cause: Throwable): Unit - def errorSafe(cause: Throwable): Unit | ChannelClosed - -sealed trait ChannelClosed -object ChannelClosed: - case class Error(reason: Option[Exception]) extends ChannelClosed - case object Done extends ChannelClosed -``` - -That is, the result of a `safe` operation might be a value, or information that the channel is closed. - -Using extensions methods from `ChannelClosedUnion` it's possible to convert such union types to `Either`s, `Try`s or -exceptions, as well as `map` over such results. diff --git a/generated-doc/out/channels/discharging.md b/generated-doc/out/channels/discharging.md deleted file mode 100644 index 03908305..00000000 --- a/generated-doc/out/channels/discharging.md +++ /dev/null @@ -1,30 +0,0 @@ -# Discharging channels - -Values of a source can be discharged using methods such as `.foreach`, `.toList`, `.pipeTo` or `.drain`: - -```scala -import ox.supervised -import ox.channels.Source - -supervised { - val s = Source.fromValues(1, 2, 3) - s.toList: List[Int] // List(1, 2, 3) -} -``` - -These methods are blocking, as they drain the channel until no more values are available (when the channel is done). - -## Closed channels (done / error) - -If the channel encounters an error, the discharging method will throws a `ChannelClosedException.Error`. Similarly as -with `send` and `receive`, there's a `safe` variant for each discharing method, which returns a union type, e.g.: - -```scala -import ox.supervised -import ox.channels.{ChannelClosed, Source} - -supervised { - val s = Source.fromValues(1, 2, 3) - s.toList: List[Int] | ChannelClosed.Error // List(1, 2, 3) -} -``` diff --git a/generated-doc/out/channels/errors.md b/generated-doc/out/channels/errors.md deleted file mode 100644 index 7d91edb0..00000000 --- a/generated-doc/out/channels/errors.md +++ /dev/null @@ -1,9 +0,0 @@ -# Error propagation - -Errors are only propagated downstream, ultimately reaching the point where the source is discharged, typically leading -to an exception being thrown there. This, in turn, should end the enclosing [scope](../structured-concurrency/fork-join.md), and cancel any -other forks, that are still running and handling the upstream processing stages. - -The approach we decided to take (only propagating errors downstream) is one of the two possible designs - -with the other being re-throwing an exception when it's encountered. -Please see [the respective ADR](../adr/0001-error-propagation-in-channels.md) for a discussion. diff --git a/generated-doc/out/channels/index.md b/generated-doc/out/channels/index.md deleted file mode 100644 index 56ef0d7c..00000000 --- a/generated-doc/out/channels/index.md +++ /dev/null @@ -1,30 +0,0 @@ -# Channels - -A channel is like a queue (values can be sent/received), but additionally channels support: - -* completion (a source can be `done`) -* downstream error propagation -* `select`ing exactly one channel clause to complete, where clauses include send and receive operations - -Creating a channel is a light-weight operation: - -```scala -import ox.channels.* -val c = Channel.bufferedDefault[String] -``` - -This uses the default buffer size (16). It's also possible to create channels with other buffer sizes, as well as -rendezvous or unlimited channels: - -```scala -import ox.channels.* -val c1 = Channel.rendezvous[String] -val c2 = Channel.buffered[String](5) -val c3 = Channel.unlimited[String] -``` - -In rendezvous channels, a sender and receiver must "meet" to exchange a value. Hence, `.send` always blocks, unless -there's another thread waiting on a `.receive`. In buffered channels, `.send` only blocks when the buffer is full. -In an unlimited channel, sending never blocks. - -Channels implement two traits: `Source` and `Sink`. diff --git a/generated-doc/out/channels/io.md b/generated-doc/out/channels/io.md deleted file mode 100644 index 21b212f8..00000000 --- a/generated-doc/out/channels/io.md +++ /dev/null @@ -1,102 +0,0 @@ -# Files and I/O - -Ox allows creating a `Source` which reads from a file or `InpuStream`, as well as directing an existing source into a file or an `OutputStream`. These methods work only with a `Source[Chunk[Byte]]`. Ox takes care of closing files/streams after processing and on errors. - -## InputStream and OutputStream - -### Source.fromInputStream - -An `InputStream` can be converted to a `Source[Chunk[Byte]]`: - -```scala -import ox.channels.Source -import ox.supervised -import java.io.ByteArrayInputStream -import java.io.InputStream - -val inputStream: InputStream = new ByteArrayInputStream("some input".getBytes) -supervised { - Source - .fromInputStream(inputStream) // Source[Chunk[Byte]] - .decodeStringUtf8 - .map(_.toUpperCase) - .foreach(println) // "SOME INPUT" -} -``` - -You can define a custom chunk size instead of using the default: - - -```scala -import ox.channels.Source -import ox.supervised -import java.io.ByteArrayInputStream -import java.io.InputStream - -val inputStream: InputStream = new ByteArrayInputStream("some input".getBytes) -supervised { - Source - .fromInputStream(inputStream, chunkSize = 4) // Source[Chunk[Byte]] - .decodeStringUtf8 - .map(_.toUpperCase) - .foreach(println) // "SOME", " INPUT" -} -``` - -### source.toOutputStream - -A `Source[Chunk[Byte]]` can be directed to write to an `OutputStream`: - -```scala -import ox.channels.Source -import ox.supervised -import java.io.ByteArrayOutputStream - -val outputStream = new ByteArrayOutputStream() -supervised { - val source = Source.fromIterable(List("text1,", "text2")) - source - .encodeUtf8 - .toOutputStream(outputStream) -} -outputStream.toString // "TEXT1,TEXT2" -``` - -## Files - -### Source.fromFile - -You can obtain a `Source` of byte chunks read from a file for a given path: - -```scala -import ox.channels.Source -import ox.supervised -import java.nio.file.Paths - -supervised { - Source - .fromFile(Paths.get("/path/to/my/file.txt")) - .linesUtf8 - .map(_.toUpperCase) - .toList // List("FILE_LINE1", "FILE_LINE2") -} -``` - -Similarly to `.fromInputStream`, you can define custom chunk size using `Source.fromFile(path: Path, chunkSize: Int)`. - -### source.toFile - -A `Source[Chunk[Byte]]` can be written to a file under a given path: - -```scala -import ox.channels.Source -import ox.supervised -import java.nio.file.Paths - -supervised { - val source = Source.fromIterable(List("text1,", "text2")) - source - .encodeUtf8 - .toFile(Paths.get("/path/to/my/target/file.txt")) -} -``` diff --git a/generated-doc/out/channels/sinks.md b/generated-doc/out/channels/sinks.md deleted file mode 100644 index cc259da1..00000000 --- a/generated-doc/out/channels/sinks.md +++ /dev/null @@ -1,23 +0,0 @@ -# Sinks - -Data can be sent to a channel using `Sink.send`. Once no more data items are available, completion can be signalled -downstream using `Sink.done`. If there's an error when producing data, this can be signalled using `Sink.error`: - -```scala -import ox.{fork, supervised} -import ox.channels.* - -val c = Channel.rendezvous[String] -supervised { - fork { - c.send("Hello") - c.send("World") - c.done() - } - - // TODO: receive -} -``` - -`.send` blocks the thread, hence usually channels are shared across forks to communicate data between them. As Ox is -designed to work with Java 21+ and Virtual Threads, blocking is a cheap operation that might be done frequently. diff --git a/generated-doc/out/channels/sources.md b/generated-doc/out/channels/sources.md deleted file mode 100644 index 530cfe52..00000000 --- a/generated-doc/out/channels/sources.md +++ /dev/null @@ -1,31 +0,0 @@ -# Sources - -A source can be used to receive elements from a channel. - -```scala -trait Source[+T]: - def receive(): T -``` - -Same as `.send`, the `.receive` method might block the current thread. - -## Creating sources - -Sources can be created by instantiating a new channel, or using one of the many factory methods on the `Source` -companion object, e.g.: - -```scala -import ox.supervised -import ox.channels.Source - -import scala.concurrent.duration.* - -supervised { - Source.fromValues(1, 2, 3) - Source.tick(1.second, "x") - Source.iterate(0)(_ + 1) // natural numbers -} -``` - -Each such source creates a fork, which takes care of sending the elements to the channel, once capacity is available. -If the enclosing `supervised` scope ends, each such fork is cancelled. diff --git a/generated-doc/out/compare-gears.md b/generated-doc/out/compare-gears.md index 176fb99b..223bc81d 100644 --- a/generated-doc/out/compare-gears.md +++ b/generated-doc/out/compare-gears.md @@ -6,7 +6,7 @@ differences, listed below. Originally [posted on Reddit](https://www.reddit.com/r/scala/comments/1cdfaki/comment/l1c0pcn/): -1. The fundamental difference is in the timeframes and perspectives of the projects. With Ox, we are trying to provide people with the tools necessary to write direct style Scala now (2024). On the other hand, Gears is more of a research project, and coupled to Project Caprese, which will still run until 2028. That's not to say that Gears won't have a stable release before then - we're not aware of Gears's release plan - however the development goals of both projects seem different. Gears is more in an exploratory phase, while in Ox we are looking at a shorter time-to-market. +1. The fundamental difference is in the timeframes and perspectives of the projects. With Ox, we are trying to provide people with the tools necessary to write direct-style Scala now (2024). On the other hand, Gears is more of a research project, and coupled to Project Caprese, which will still run until 2028. That's not to say that Gears won't have a stable release before then - we're not aware of Gears's release plan - however the development goals of both projects seem different. Gears is more in an exploratory phase, while in Ox we are looking at a shorter time-to-market. 2. Ox only targets the JVM 21+, while Gears targets JVM 21+ and Native. While we don't rule out adding Native support, if it will be possible, it's not our immediate goal, because of (1). 3. This also influences features such as capture checking. It seems Gears will want to use the capture checker pretty early. We're hoping to do the same in Ox at some point, but that's only after the capture checker is relatively complete, and available in a stable (maybe LTS?) Scala release. So this might still take some time. 4. The scope of Ox is a bit wider than just concurrency: we're also looking at resiliency and general direct style utilities. One could of course debate, is the specialised-library approach taken by Gears better, or the more broad one taken by Ox. But I don't think there's a universal answer to that. diff --git a/generated-doc/out/conf.py b/generated-doc/out/conf.py index a664cc49..5efa5c62 100644 --- a/generated-doc/out/conf.py +++ b/generated-doc/out/conf.py @@ -69,9 +69,9 @@ # built documents. # # The short X.Y version. -version = u'0.0.x' +version = u'@VERSION@' # The full version, including alpha/beta/rc tags. -release = u'0.0.x' +release = u'@VERSION@' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/generated-doc/out/dictionary.md b/generated-doc/out/dictionary.md index 9b89c14f..e5bbc0fc 100644 --- a/generated-doc/out/dictionary.md +++ b/generated-doc/out/dictionary.md @@ -37,3 +37,9 @@ Errors: Other: * **computation combinator**: a method which takes user-provided functions and manages their execution, e.g. using concurrency, interruption, and appropriately handling errors; examples include `par`, `race`, `retry`, `timeout` + +Channels: +* **values** can be **sent** to a channel, or **received** from a channel + +Flows: +* when **run**, a flow **emits** **elements** \ No newline at end of file diff --git a/generated-doc/out/high-level-concurrency/race.md b/generated-doc/out/high-level-concurrency/race.md index 69401858..f193b261 100644 --- a/generated-doc/out/high-level-concurrency/race.md +++ b/generated-doc/out/high-level-concurrency/race.md @@ -1,9 +1,9 @@ # Race two computations -A number of computations can be raced against each other using the `race` method, for example: +A number of computations can be raced with each other using the `raceSuccess` method, for example: ```scala -import ox.{race, sleep} +import ox.{raceSuccess, sleep} import scala.concurrent.duration.* def computation1: Int = @@ -14,28 +14,28 @@ def computation2: Int = sleep(1.second) 2 -val result: Int = race(computation1, computation2) +val result: Int = raceSuccess(computation1, computation2) // 2 ``` -The losing computation is interrupted. `race` waits until both branches finish; this also applies to the losing one, +The losing computation is interrupted. `raceSuccess` waits until both branches finish; this also applies to the losing one, which might take a while to clean up after interruption. It is also possible to race a sequence of computations, given as `Seq[() => T]`. ## Race variants -* `race` returns the first result, or re-throws the last exception -* `raceResult` returns the first result, or re-throws the first exception (the first computation which finishes in any +* `raceSuccess` returns the first success, or if all fail, re-throws the first exception +* `raceResult` returns the first success, or if any fail, re-throws the first exception (the first computation which finishes in any way is the "winner") ## Using application errors -Some values might be considered as application errors. In a computation returns such an error, `race` continues waiting -if there are other computations in progress, same as when an exception is thrown. Ultimately, `race` either throws -the first exception, or the first application error that has been reported (whichever comes first). +Some values might be considered as application errors. If a computation returns such an error, `raceSuccess` continues waiting +if there are other computations in progress, same as when an exception is thrown. Ultimately, if no result is successful, +`raceSuccess` either throws the first exception, or the first application error that has been reported (whichever comes first). -It's possible to use an arbitrary [error mode](../basics/error-handling.md) by providing it as the initial argument to `race`. +It's possible to use an arbitrary [error mode](../basics/error-handling.md) by providing it as the initial argument to `raceSuccess`. Alternatively, a built-in version using `Either` is available as `raceEither`: ```scala diff --git a/generated-doc/out/index.md b/generated-doc/out/index.md index 694d1632..9017660b 100644 --- a/generated-doc/out/index.md +++ b/generated-doc/out/index.md @@ -42,6 +42,19 @@ In addition to this documentation, ScalaDocs can be browsed at [https://javadoc. structured-concurrency/error-handling-scopes structured-concurrency/fork-local structured-concurrency/interruptions + +.. toctree:: + :maxdepth: 2 + :caption: Streaming + + streaming/index + streaming/channels + streaming/transforming-channels + streaming/selecting-from-channels + streaming/flows + streaming/io + streaming/errors + streaming/backpressure .. toctree:: :maxdepth: 2 @@ -53,24 +66,9 @@ In addition to this documentation, ScalaDocs can be browsed at [https://javadoc. scheduled resources control-flow + actors utility -.. toctree:: - :maxdepth: 2 - :caption: Channels - - channels/index - channels/sinks - channels/sources - channels/channel-closed - channels/transforming-sources - channels/discharging - channels/select - channels/errors - channels/backpressure - channels/actors - channels/io - .. toctree:: :maxdepth: 2 :caption: Integrations diff --git a/generated-doc/out/kafka.md b/generated-doc/out/kafka.md index 1995c321..c2b5e538 100644 --- a/generated-doc/out/kafka.md +++ b/generated-doc/out/kafka.md @@ -3,54 +3,47 @@ Dependency: ```scala -"com.softwaremill.ox" %% "kafka" % "0.4.0" +"com.softwaremill.ox" %% "kafka" % "0.5.0" ``` -`Source`s which read from a Kafka topic, mapping stages and drains which publish to Kafka topics are available through -the `KafkaSource`, `KafkaStage` and `KafkaDrain` objects. In all cases either a manually constructed instance of a +`Flow`s which read from a Kafka topic, mapping stages and drains which publish to Kafka topics are available through +the `KafkaFlow`, `KafkaStage` and `KafkaDrain` objects. In all cases either a manually constructed instance of a `KafkaProducer` / `KafkaConsumer` is needed, or `ProducerSettings` / `ConsumerSetttings` need to be provided with the bootstrap servers, consumer group id, key / value serializers, etc. To read from a Kafka topic, use: ```scala -import ox.channels.ChannelClosed -import ox.kafka.{ConsumerSettings, KafkaSource, ReceivedMessage} +import ox.kafka.{ConsumerSettings, KafkaFlow, ReceivedMessage} import ox.kafka.ConsumerSettings.AutoOffsetReset -import ox.supervised -supervised { - val settings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest) - val topic = "my_topic" +val settings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest) +val topic = "my_topic" - val source = KafkaSource.subscribe(settings, topic) - - source.receive(): ReceivedMessage[String, String] | ChannelClosed -} +val source = KafkaFlow.subscribe(settings, topic) + .runForeach { (msg: ReceivedMessage[String, String]) => ??? } ``` To publish data to a Kafka topic: ```scala -import ox.channels.Source +import ox.flow.Flow import ox.kafka.{ProducerSettings, KafkaDrain} -import ox.{pipe, supervised} +import ox.pipe import org.apache.kafka.clients.producer.ProducerRecord -supervised { - val settings = ProducerSettings.default.bootstrapServers("localhost:9092") - Source - .fromIterable(List("a", "b", "c")) - .mapAsView(msg => ProducerRecord[String, String]("my_topic", msg)) - .pipe(KafkaDrain.publish(settings)) -} +val settings = ProducerSettings.default.bootstrapServers("localhost:9092") +Flow + .fromIterable(List("a", "b", "c")) + .map(msg => ProducerRecord[String, String]("my_topic", msg)) + .pipe(KafkaDrain.runPublish(settings)) ``` Quite often data to be published to a topic (`topic1`) is computed basing on data received from another topic (`topic2`). In such a case, it's possible to commit messages from `topic2`, after the messages to `topic1` are successfully published. -In order to do so, a `Source[SendPacket]` needs to be created. The definition of `SendPacket` is: +In order to do so, a `Flow[SendPacket]` needs to be created. The definition of `SendPacket` is: ```scala import org.apache.kafka.clients.producer.ProducerRecord @@ -61,27 +54,25 @@ case class SendPacket[K, V](send: List[ProducerRecord[K, V]], commit: List[Recei The `send` list contains the messages to be sent (each message is a Kafka `ProducerRecord`). The `commit` list contains the messages, basing on which the data to be sent was computed. These are the received messages, as produced by a -`KafkaSource`. When committing, for each topic-partition that appears in the received messages, the maximum offset is +`KafkaFlow`. When committing, for each topic-partition that appears in the received messages, the maximum offset is computed. For example: ```scala -import ox.kafka.{ConsumerSettings, KafkaDrain, KafkaSource, ProducerSettings, SendPacket} +import ox.kafka.{ConsumerSettings, KafkaDrain, KafkaFlow, ProducerSettings, SendPacket} import ox.kafka.ConsumerSettings.AutoOffsetReset -import ox.{pipe, supervised} +import ox.pipe import org.apache.kafka.clients.producer.ProducerRecord -supervised { - val consumerSettings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest) - val producerSettings = ProducerSettings.default.bootstrapServers("localhost:9092") - val sourceTopic = "source_topic" - val destTopic = "dest_topic" - - KafkaSource - .subscribe(consumerSettings, sourceTopic) - .map(in => (in.value.toLong * 2, in)) - .map((value, original) => SendPacket(ProducerRecord[String, String](destTopic, value.toString), original)) - .pipe(KafkaDrain.publishAndCommit(producerSettings)) -} +val consumerSettings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest) +val producerSettings = ProducerSettings.default.bootstrapServers("localhost:9092") +val sourceTopic = "source_topic" +val destTopic = "dest_topic" + +KafkaFlow + .subscribe(consumerSettings, sourceTopic) + .map(in => (in.value.toLong * 2, in)) + .map((value, original) => SendPacket(ProducerRecord[String, String](destTopic, value.toString), original)) + .pipe(KafkaDrain.runPublishAndCommit(producerSettings)) ``` The offsets are committed every second in a background process. @@ -89,19 +80,16 @@ The offsets are committed every second in a background process. To publish data as a mapping stage: ```scala -import ox.channels.Source +import ox.flow.Flow import ox.kafka.ProducerSettings import ox.kafka.KafkaStage.* -import ox.supervised import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} -supervised { - val settings = ProducerSettings.default.bootstrapServers("localhost:9092") - val metadatas: Source[RecordMetadata] = Source - .fromIterable(List("a", "b", "c")) - .mapAsView(msg => ProducerRecord[String, String]("my_topic", msg)) - .mapPublish(settings) +val settings = ProducerSettings.default.bootstrapServers("localhost:9092") +val metadatas: Flow[RecordMetadata] = Flow + .fromIterable(List("a", "b", "c")) + .map(msg => ProducerRecord[String, String]("my_topic", msg)) + .mapPublish(settings) - // process the metadatas source further -} +// process & run the metadatas flow further ``` diff --git a/generated-doc/out/mdc-logback.md b/generated-doc/out/mdc-logback.md index df05ae79..99fb4365 100644 --- a/generated-doc/out/mdc-logback.md +++ b/generated-doc/out/mdc-logback.md @@ -3,7 +3,7 @@ Dependency: ```scala -"com.softwaremill.ox" %% "mdc-logback" % "0.4.0" +"com.softwaremill.ox" %% "mdc-logback" % "0.5.0" ``` Ox provides support for setting inheritable MDC (mapped diagnostic context) values, when using the [Logback](https://logback.qos.ch) diff --git a/generated-doc/out/streaming/backpressure.md b/generated-doc/out/streaming/backpressure.md new file mode 100644 index 00000000..25876112 --- /dev/null +++ b/generated-doc/out/streaming/backpressure.md @@ -0,0 +1,3 @@ +# Backpressure + +Channels and running flows are back-pressured. The `Channel.send` operation is blocking until there's a receiver thread available, or if there's enough space in the buffer. The processing space is hence bound by the total size of channel buffers. diff --git a/generated-doc/out/streaming/channels.md b/generated-doc/out/streaming/channels.md new file mode 100644 index 00000000..e64e21cc --- /dev/null +++ b/generated-doc/out/streaming/channels.md @@ -0,0 +1,119 @@ +# Channels + +A channel is like a queue (values can be sent/received), but additionally channels support: + +* completion (a source can be `done`) +* downstream error propagation +* `select`ing exactly one channel clause to complete, where clauses include send and receive operations + +Creating a channel is a light-weight operation: + +```scala +import ox.channels.* +val c = Channel.bufferedDefault[String] +``` + +This uses the default buffer size (16). It's also possible to create channels with other buffer sizes, as well as +rendezvous or unlimited channels: + +```scala +import ox.channels.* +val c1 = Channel.rendezvous[String] +val c2 = Channel.buffered[String](5) +val c3 = Channel.unlimited[String] +``` + +In rendezvous channels, a sender and receiver must "meet" to exchange a value. Hence, `.send` always blocks, unless +there's another thread waiting on a `.receive`. In buffered channels, `.send` only blocks when the buffer is full. +In an unlimited channel, sending never blocks. + +Channels implement two traits: `Source` and `Sink`. + +## Sinks + +Data can be sent to a channel using `Sink.send`. Once no more data items are available, completion can be signalled +downstream using `Sink.done`. If there's an error when producing data, this can be signalled using `Sink.error`: + +```scala +import ox.{fork, supervised} +import ox.channels.* + +val c = Channel.rendezvous[String] +supervised: + fork: + c.send("Hello") + c.send("World") + c.done() + + // TODO: receive +``` + +`.send` blocks the thread, hence usually channels are shared across forks to communicate data between them. As Ox is +designed to work with Java 21+ and Virtual Threads, blocking is a cheap operation that might be done frequently. + +## Sources + +A source can be used to receive elements from a channel. + +```scala +trait Source[+T]: + def receive(): T +``` + +Same as `.send`, the `.receive` method might block the current thread. + +### Creating sources + +Sources can be created by instantiating a new channel. There are also some basic factory methods on the `Source` companion object. Finally, a [flow](flows.md) can be run to a channel if needed, e.g.: + +```scala +import ox.supervised +import ox.channels.Source +import ox.flow.Flow + +import scala.concurrent.duration.* + +supervised: + Source.fromValues(1, 2, 3) + Flow.tick(1.second, "x").runToChannel() + Flow.iterate(0)(_ + 1).runToChannel() // natural numbers +``` + +Typically, for each source created as shown above a fork is started, which sends the elements to the channel when capacity is available. If the enclosing `supervised` scope ends, each such fork is cancelled. + +## Handling closed channels + +By default, `Sink.send` and `Source.receive` methods will throw a `ChannelClosedException`, if the channel is already +closed: + +```scala +enum ChannelClosedException(cause: Option[Throwable]) extends Exception(cause.orNull): + case Error(cause: Throwable) extends ChannelClosedException(Some(cause)) + case Done() extends ChannelClosedException(None) +``` + +Alternatively, you can call `Sink.sendSafe` or `Source.receiveSafe`, which return union types: + +```scala +trait Source[+T]: + def receive(): T + def receiveSafe(): T | ChannelClosed + +trait Sink[-T]: + def send(value: T): Unit + def sendSafe(value: T): Unit | ChannelClosed + def done(): Unit + def doneSafe(): Unit | ChannelClosed + def error(cause: Throwable): Unit + def errorSafe(cause: Throwable): Unit | ChannelClosed + +sealed trait ChannelClosed +object ChannelClosed: + case class Error(reason: Option[Exception]) extends ChannelClosed + case object Done extends ChannelClosed +``` + +That is, the result of a `safe` operation might be a value, or information that the channel is closed. + +Using extensions methods from `ChannelClosedUnion` it's possible to convert such union types to `Either`s, `Try`s or +exceptions, as well as `map` over such results. diff --git a/generated-doc/out/streaming/errors.md b/generated-doc/out/streaming/errors.md new file mode 100644 index 00000000..8eb90cd8 --- /dev/null +++ b/generated-doc/out/streaming/errors.md @@ -0,0 +1,9 @@ +# Error propagation + +Errors are only propagated downstream, ultimately reaching the point where the flow is run / source is discharged. This leads to an exception being thrown there. + +When running flows, any [scopes](../structured-concurrency/fork-join.md) started as part of executing the flow's stages should have completed, before the exception is re-thrown by the `run...` method. + +For channel-transforming operations, once the exception reaches the enclosing scope, any forks should become interrupted, including any that are still running and are handling the upstream processing stages. + +The approach we decided to take (only propagating errors downstream) is one of the two possible designs - with the other being re-throwing an exception when it's encountered. Please see [the respective ADR](../adr/0001-error-propagation-in-channels.md) for a discussion. diff --git a/generated-doc/out/streaming/flows.md b/generated-doc/out/streaming/flows.md new file mode 100644 index 00000000..37576cef --- /dev/null +++ b/generated-doc/out/streaming/flows.md @@ -0,0 +1,146 @@ +# Flows + +A `Flow[T]` describes an asynchronous data transformation pipeline. When run, it emits elements of type `T`. + +Flows are lazy, evaluation (and any effects) happen only when the flow is run. Flows might be finite or infinite; in the latter case running a flow never ends normally; it might be interrupted, though. Finally, any exceptions that occur when evaluating the flow's logic will be thrown when running the flow, after any cleanup logic completes. + +## Creating flows + +There's a number of methods on the `Flow` companion object that can be used to create a flow: + +```scala +import ox.flow.Flow +import scala.concurrent.duration.* + +Flow.fromValues(1, 2, 3) // a finite flow +Flow.tick(1.second, "x") // an infinite flow, emitting "x" every second +Flow.iterate(0)(_ + 1) // natural numbers +``` + +Note that creating a flow as above doesn't emit any elements, or execute any of the flow's logic. Only when run, the elements are emitted and any effects that are part of the flow's stages happen. + +Flows can be also created by providing arbitrary element-emitting logic: + +```scala +import ox.flow.Flow + +def isNoon(): Boolean = ??? + +Flow.usingEmit: emit => + emit(1) + for i <- 4 to 50 do emit(i) + if isNoon() then emit(42) +``` + +Finally, flows can be created using [channel](channels.md) `Source`s: + +```scala +import ox.channels.Channel +import ox.flow.Flow +import ox.{fork, supervised} + +val ch = Channel.bufferedDefault[Int] +supervised: + fork: + ch.send(1) + ch.send(15) + ch.send(-2) + ch.done() + + Flow.fromSource(ch) // TODO: transform the flow further & run +``` + +## Transforming flows: basics + +Multiple transformation stages can be added to a flow, each time returning a new `Flow` instance, describing the extended pipeline. As before, no elements are emitted or transformed until the flow is run, as flows are lazy. There's a number of pre-defined transformation stages, many of them similar in function to corresponding methods on Scala's collections: + +```scala +import ox.flow.Flow + +Flow.fromValues(1, 2, 3, 5, 6) + .map(_ * 2) + .filter(_ % 2 == 0) + .take(3) + .zip(Flow.repeat("a number")) + .interleave(Flow.repeat((0, "also a number"))) + // etc., TODO: run the flow +``` + +You can also define arbitrary element-emitting logic, using each incoming element using `.mapUsingEmit`, similarly to `Flow.usingEmit` above. + +## Running flows + +Flows have to be run, for any processing to happen. This can be done with one of the `.run...` methods. For example: + +```scala +import ox.flow.Flow +import scala.concurrent.duration.* + +Flow.fromValues(1, 2, 3).runToList() // List(1, 2, 3) +Flow.fromValues(1, 2, 3).runForeach(println) +Flow.tick(1.second, "x").runDrain() // never finishes +``` + +Running a flow is a blocking operation. Unless asynchronous boundaries are present (explicit or implicit, more on this below), the entire processing happens on the calling thread. For example such a pipeline: + +```scala +import ox.flow.Flow + +Flow.fromValues(1, 2, 3, 5, 6) + .map(_ * 2) + .filter(_ % 2 == 0) + .runToList() +``` + +Processes the elements one-by-one on the thread that is invoking the run method. + +## Transforming flows: concurrency + +A number of flow transformations introduces asynchronous boundaries. For example, `.mapPar(parallelism)(mappingFunction)` describes a flow, which runs the pipeline defined so far in the background, emitting elements to a [channel](channels.md). Another [fork](../structured-concurrency/fork-join.md) reads these elements and runs up to `parallelism` invocations of `mappingFunction` concurrently. Mapped elements are then emitted by the returned flow. + +Behind the scenes, an `Ox` concurrency scope is created along with a number of forks. In case of any exceptions, everything is cleaned up before the flow propagates the exceptions. The `.mapPar` logic ensures that any exceptions from the preceding pipeline are propagated through the channel. + +Some other stages which introduce concurrency include `.merge`, `.interleave`, `.groupedWithin` and [I/O](io.md) stages. The created channels serve as buffers between the pipeline stages, and their capacity is defined by the `BufferCapacity` in scope (a default instance is available, if not provided explicitly). + +Explicit asynchronous boundaries can be inserted using `.async()`. This might be useful if producing the next element to emit, and consuming the previous should run concurrently; or if the processing times of the consumer varies, and the producer should buffer up elements. + +## Interoperability with channels + +Flows can be created from channels, and run to channels. For example: + +```scala +import ox.Ox +import ox.channels.{BufferCapacity, Source} +import ox.flow.Flow + +def transformChannel(ch: Source[String])(using Ox, BufferCapacity): Source[Int] = + Flow.fromSource(ch) + .mapConcat(_.split(" ")) + .mapConcat(_.toIntOption) + .filter(_ % 2 == 0) + .runToChannel() +``` + +The method above needs to be run within a concurrency scope, as `.runToChannel()` creates a background fork which runs the pipeline described by the flow, and emits its elements onto the returned channel. + +## Text transformations + +When dealing with flows of `Chunk[Byte]` or `String`s, you can leverage following built-in combinators for useful transformations: + +* `encodeUtf8` encodes a `Flow[String]` into a `Flow[Chunk[Byte]]` +* `linesUtf8` decodes a `Flow[Chunk[Byte]]` into a `Flow[String]`. Assumes that the input represents text with line breaks. The `String` elements emitted by resulting `Flow[String]` represent text lines. +* `decodeStringUtf8` to decode a `Flow[Chunk[Byte]]` into a `Flow[String]`, without handling line breaks, just processing input bytes as UTF-8 characters, even if a multi-byte character is divided into two chunks. + +Such operations may be useful when dealing with I/O like files, `InputStream`, etc. See [I/O](io.md). + +## Logging + +Ox does not have any integrations with logging libraries, but it provides a simple way to log elements emitted by flows using the `.tap` method: + +```scala +import ox.flow.Flow + +Flow.fromValues(1, 2, 3) + .tap(n => println(s"Received: $n")) + .runToList() +``` \ No newline at end of file diff --git a/generated-doc/out/streaming/index.md b/generated-doc/out/streaming/index.md new file mode 100644 index 00000000..c70712c0 --- /dev/null +++ b/generated-doc/out/streaming/index.md @@ -0,0 +1,48 @@ +# Streaming APIs + +Ox provides two complementary APIs for defining streaming data transformation pipelines. + +The first API uses an **imperative style** and is implemented using [channels](channels.md). As part of the code which defines how the data should be transformed, you can use the (blocking) `receive()` and `send()` methods on channels. You'll also often directly use [`Ox` concurrency scopes](../structured-concurrency/index.md) and [`fork`s](../structured-concurrency/fork-join.md). For example: + +```scala +import ox.* +import ox.channels.* + +def parseNumbers(incoming: Source[String])(using Ox, BufferCapacity): Source[Int] = + val results = BufferCapacity.newChannel[Int] + forkPropagate(results) { + repeatWhile: + incoming.receiveOrClosed() match + case ChannelClosed.Done => results.doneOrClosed(); false + case ChannelClosed.Error(r) => results.errorOrClosed(r); false + case t: String => + t.split(" ").flatMap(_.toIntOption).foreach: n => + println(s"Got: $n") + results.send(n); + true + } + results +``` + +The second API uses a **functional style**, implemented as [flows](flows.md). A flow lets you stack multiple data transformations using high-level methods such as `map`, `mapPar`, `grouped`, `async`, `merge` and more. For example: + +```scala +import ox.channels.BufferCapacity +import ox.flow.* + +def invokeService(n: Int): String = ??? + +def sendParsedNumbers(incoming: Flow[String])(using BufferCapacity): Unit = + incoming + .mapConcat(_.split(" ").flatMap(_.toIntOption)) + .tap(n => println(s"Got: $n")) + .mapPar(8)(invokeService) + .runForeach(r => println("Result: $r")) +``` + +A flow **describes** the operations to perform; only when one of its `run` method is invoked, actual data processing starts. That is, a flow is lazily-evaluated. As part of implementing the individual transformation stages of a flow, channels, concurrency scopes and forks as used. Flows are a higher-level API, built on top of channels and forks. + +While channels implement a "hot streams" approach to defining data transformation pipelines, flows correspond to "cold streams". + +You can use both approaches in a single pipeline, depending which approach better fits the task at hand. It's straightforward to convert a channel to a flow, and to run a flow to a channel. + diff --git a/generated-doc/out/streaming/io.md b/generated-doc/out/streaming/io.md new file mode 100644 index 00000000..a8ed493c --- /dev/null +++ b/generated-doc/out/streaming/io.md @@ -0,0 +1,88 @@ +# Files and I/O + +Ox allows creating a `Flow` which reads from a file or `InputStream`, as well as running a flow into a file or an `OutputStream`. The latter methods are available for `Flow[Chunk[Byte]]`. Ox takes care of closing files/streams after processing and on errors. + +## InputStream and OutputStream + +### Flow.fromInputStream + +A `Flow[Chunk[Byte]]` can be created using a `InputStream`: + +```scala +import ox.flow.Flow +import java.io.ByteArrayInputStream +import java.io.InputStream + +val inputStream: InputStream = new ByteArrayInputStream("some input".getBytes) + +Flow + .fromInputStream(inputStream) // Flow[Chunk[Byte]] + .decodeStringUtf8 + .map(_.toUpperCase) + .runForeach(println) // "SOME INPUT" +``` + +You can define a custom chunk size instead of using the default: + +```scala +import ox.flow.Flow +import java.io.ByteArrayInputStream +import java.io.InputStream + +val inputStream: InputStream = new ByteArrayInputStream("some input".getBytes) +Flow + .fromInputStream(inputStream, chunkSize = 4) // Flow[Chunk[Byte]] + .decodeStringUtf8 + .map(_.toUpperCase) + .runForeach(println) // "SOME", " INPUT" +``` + +### flow.toOutputStream + +A `Flow[Chunk[Byte]]` can be run to write to an `OutputStream`: + +```scala +import ox.flow.Flow +import java.io.ByteArrayOutputStream + +val outputStream = new ByteArrayOutputStream() + +val flow = Flow.fromIterable(List("text1,", "text2")) +flow + .encodeUtf8 + .runToOutputStream(outputStream) + +outputStream.toString // "TEXT1,TEXT2" +``` + +## Files + +### Flow.fromFile + +You can obtain a `Flow` of byte chunks read from a file for a given path: + +```scala +import ox.flow.Flow +import java.nio.file.Paths + +Flow + .fromFile(Paths.get("/path/to/my/file.txt")) + .linesUtf8 + .map(_.toUpperCase) + .runToList() // List("FILE_LINE1", "FILE_LINE2") +``` + +Similarly to `.fromInputStream`, you can define custom chunk size using `Flow.fromFile(path: Path, chunkSize: Int)`. + +### flow.toFile + +A `Flow[Chunk[Byte]]` can be written to a file under a given path: + +```scala +import ox.flow.Flow +import java.nio.file.Paths + +Flow.fromIterable(List("text1,", "text2")) + .encodeUtf8 + .runToFile(Paths.get("/path/to/my/target/file.txt")) +``` diff --git a/generated-doc/out/channels/select.md b/generated-doc/out/streaming/selecting-from-channels.md similarity index 98% rename from generated-doc/out/channels/select.md rename to generated-doc/out/streaming/selecting-from-channels.md index 6f89457a..8920ec74 100644 --- a/generated-doc/out/channels/select.md +++ b/generated-doc/out/streaming/selecting-from-channels.md @@ -24,6 +24,7 @@ As an example, this can be used as follows: ```scala import ox.supervised import ox.channels.* +import ox.flow.Flow import scala.annotation.tailrec import scala.concurrent.duration.* @@ -31,7 +32,7 @@ import scala.concurrent.duration.* case object Tick def consumer(strings: Source[String]): Nothing = supervised { - val tick = Source.tick(1.second, Tick) + val tick = Flow.tick(1.second, Tick).runToChannel() @tailrec def doConsume(acc: Int): Nothing = diff --git a/generated-doc/out/channels/transforming-sources.md b/generated-doc/out/streaming/transforming-channels.md similarity index 53% rename from generated-doc/out/channels/transforming-sources.md rename to generated-doc/out/streaming/transforming-channels.md index 4ad6d1d6..5bae71d9 100644 --- a/generated-doc/out/channels/transforming-sources.md +++ b/generated-doc/out/streaming/transforming-channels.md @@ -1,21 +1,19 @@ -# Transforming sources +# Transforming channels ## Transforming eagerly Sources can be transformed by receiving values, manipulating them and sending to other channels - this provides the highest flexibility and allows creating arbitrary channel topologies. -However, there's a number of common operations that are built-in as methods on `Source`, which allow transforming the -source. For example: +Some basic channel-transformaing operations are available as methods on `Source`. For example: ```scala import ox.supervised import ox.channels.{Channel, Source} -supervised { +supervised: val c = Channel.rendezvous[String] val c2: Source[Int] = c.map(s => s.length()) -} ``` The `.map` needs to be run within a scope, as it starts a new virtual thread (using `fork`), which: @@ -26,24 +24,25 @@ The `.map` needs to be run within a scope, as it starts a new virtual thread (us The new channel is returned to the user as the return value of `.map`. -Some other available combinators include `.filter`, `.take`, `.zip(otherSource)`, `.merge(otherSource)` etc. - To run multiple transformations within one virtual thread / fork, the `.transform` method is available: ```scala import ox.supervised import ox.channels.Source -supervised { - Source.iterate(0)(_ + 1) // natural numbers +supervised: + Source.fromIterable(1 to 1000) .transform(_.filter(_ % 2 == 0).map(_ + 1).take(10)) // take the 10 first even numbers, incremented by 1 .foreach(n => println(n.toString)) -} +``` + +```{note} +For more advanced transformation options, use [flows](flows.md). ``` ## Capacity of transformation stages -Most source transformation methods create new channels, on which the transformed values are produced. The capacity of +Most source and some flow transformation methods create new channels, on which the transformed values are produced. The capacity of these channels by default is 16 (buffered). This can be overridden by providing `BufferCapacity` given, e.g.: ```scala @@ -71,32 +70,31 @@ fork. Hence, creating views doesn't need to be run within a scope, and creating the view itself doesn't consume any elements from the source on which it is run. -## Text transformations +## Discharging channels -When dealing with Sources with chunks of bytes or Strings, you can leverage following built-in combinators for useful transformations: +Values of a source can be discharged using methods such as `.foreach`, `.toList`, `.pipeTo` or `.drain`: -* `encodeUtf8` encodes a `Source[String]` into a `Source[Chunk[Byte]]` -* `linesUtf8` decodes a `Source[Chunk[Byte]]` into a `Source[String]`. Assumes that the input represents text with line breaks. The `String` elements emitted by resulting `Source[String]` represent text lines. -* `decodeStringUtf8` to decode a `Source[Chunk[Byte]]` into a `Source[String]`, without handling line breaks, just processing input bytes as UTF-8 characters, even if a multi-byte character is divided into two chunks. +```scala +import ox.supervised +import ox.channels.Source -Such operations may be useful when dealing with I/O like files, `InputStream`, etc.. See [examples here](io.md). +supervised: + val s = Source.fromValues(1, 2, 3) + s.toList: List[Int] // List(1, 2, 3) +``` + +These methods are blocking, as they drain the channel until no more values are available (when the channel is done). -## Logging +### Closed channels (done / error) -Ox does not have any integrations with logging libraries, but it provides a simple way to log elements flowing through channels -using the `.tap` (eagerly evaluated) or `.tapAsView` (lazily evaluated) methods. +If the channel encounters an error, the discharging method will throws a `ChannelClosedException.Error`. Similarly as +with `send` and `receive`, there's a `safe` variant for each discharing method, which returns a union type, e.g.: ```scala import ox.supervised -import ox.channels.Source +import ox.channels.{ChannelClosed, Source} -supervised { - Source.fromValues(1, 2, 3) - .tap(n => println(s"Received: $n")) // prints as soon as the element is sent from the source - .toList - - Source.fromValues(1, 2, 3) - .tapAsView(n => println(s"Received: $n")) // prints when the element is consumed by `toList` - .toList -} -``` \ No newline at end of file +supervised: + val s = Source.fromValues(1, 2, 3) + s.toList: List[Int] | ChannelClosed.Error // List(1, 2, 3) +``` diff --git a/generated-doc/out/structured-concurrency/index.md b/generated-doc/out/structured-concurrency/index.md index 5bb9236f..9ce6aa3f 100644 --- a/generated-doc/out/structured-concurrency/index.md +++ b/generated-doc/out/structured-concurrency/index.md @@ -12,7 +12,7 @@ started. The scope only finishes once all threads started within finish (either it isn't possible to "leak" threads outside of a method. Threads become more a method's implementation detail, rather than an effect. -These characteristics make structured concurrency an ideal candidate to make concurrency safer in direct style +These characteristics make structured concurrency an ideal candidate to make concurrency safer in direct-style programming, while keeping blocking-like method calls. Structured concurrency enables local reasoning on the threading effects, which is also one of the prime tenets of functional programming!