Skip to content

Commit

Permalink
unify code formatting (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
Blef666 authored Feb 3, 2025
1 parent 8fe4f83 commit 2fdcd3d
Show file tree
Hide file tree
Showing 47 changed files with 1,740 additions and 575 deletions.
1,232 changes: 1,232 additions & 0 deletions .editorconfig

Large diffs are not rendered by default.

112 changes: 73 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ class Demo2 {
}
```

There is also a possibility to pass `Channel`'s buffer size via `ScopedValue`.
The channel must be created via `Channel.withScopedBufferSize()` to get the value.
There is also a possibility to pass `Channel`'s buffer size via `ScopedValue`.
The channel must be created via `Channel.withScopedBufferSize()` to get the value.

If no value is in the scope, the default buffer size `Channel.DEFAULT_BUFFER_SIZE` is used

Expand Down Expand Up @@ -432,8 +432,10 @@ Jox implements the "let it crash" model. When an error occurs, the entire scope
so that it can be properly handled. Moreover, no detail is lost: all exceptions are preserved, either as causes, or
suppressed exceptions.

As `JoxScopeExecutionException` is unchecked, we introduced utility method called `JoxScopeExecutionException#unwrapAndThrow`.
If the wrapped exception is instance of any of passed classes, this method unwraps original exception and throws it as checked exception, `throws` signature forces exception handling.
As `JoxScopeExecutionException` is unchecked, we introduced utility method called
`JoxScopeExecutionException#unwrapAndThrow`.
If the wrapped exception is instance of any of passed classes, this method unwraps original exception and throws it as
checked exception, `throws` signature forces exception handling.
If the wrapped exception is not instance of any of passed classes, **nothing happens**.
All suppressed exceptions are rewritten from `JoxScopeExecutionException`

Expand All @@ -442,6 +444,7 @@ Method does **not** rethrow `JoxScopeExecutionException` by default.
So it is advised to manually rethrow it after calling `unwrapAndThrow` method.

e.g.

```java
import com.softwaremill.jox.structured.JoxScopeExecutionException;
import com.softwaremill.jox.structured.Scopes;
Expand Down Expand Up @@ -651,27 +654,31 @@ implementation 'com.softwaremill.jox:flows:tbd'

A `Flow<T>` describes an asynchronous data transformation pipeline. When run, it emits elements of type `T`.

Flows are lazy, evaluation (and any effects) happen only when the flow is run. Flows might be finite or infinite; in the latter case running a flow never ends normally; it might be interrupted, though. Finally, any exceptions that occur when evaluating the flow's logic will be thrown when running the flow, after any cleanup logic completes.
Flows are lazy, evaluation (and any effects) happen only when the flow is run. Flows might be finite or infinite; in the
latter case running a flow never ends normally; it might be interrupted, though. Finally, any exceptions that occur when
evaluating the flow's logic will be thrown when running the flow, after any cleanup logic completes.

### Creating Flows

There are number of methods in the `Flows` class which allows to create a `Flow`.

```java
import java.time.Duration;

import com.softwaremill.jox.flows.Flows;

public class Demo {

public static void main(String[] args) {
Flows.fromValues(1, 2, 3); // a finite flow
Flows.tick(Duration.ofSeconds(1), "x"); // an infinite flow emitting "x" every second
Flows.iterate(0, i -> i + 1); // an infinite flow iterating from 0
}
public static void main(String[] args) {
Flows.fromValues(1, 2, 3); // a finite flow
Flows.tick(Duration.ofSeconds(1), "x"); // an infinite flow emitting "x" every second
Flows.iterate(0, i -> i + 1); // an infinite flow iterating from 0
}
}
```
Note that creating a flow as above doesn't emit any elements, or execute any of the flow's logic. Only when run, the elements are emitted and any effects that are part of the flow's stages happen.

Note that creating a flow as above doesn't emit any elements, or execute any of the flow's logic. Only when run, the
elements are emitted and any effects that are part of the flow's stages happen.

Flows can also be created using `Channel` `Source`s:

Expand Down Expand Up @@ -721,19 +728,27 @@ public class Demo {
}
```

The `FlowEmit` instance is used to emit elements by the flow, that is process them further, as defined by the downstream pipeline. This method only completes once the element is fully processed, and it might throw exceptions in case there's a processing error.
The `FlowEmit` instance is used to emit elements by the flow, that is process them further, as defined by the downstream
pipeline. This method only completes once the element is fully processed, and it might throw exceptions in case there's
a processing error.

As part of the callback, you can create `Scope`, fork background computations or run other flows asynchronously. However, take care **not** to share the `FlowEmit` instance across threads. That is, instances of `FlowEmit` are thread-unsafe and should only be used on the calling thread.
As part of the callback, you can create `Scope`, fork background computations or run other flows asynchronously.
However, take care **not** to share the `FlowEmit` instance across threads. That is, instances of `FlowEmit` are
thread-unsafe and should only be used on the calling thread.
The lifetime of `FlowEmit` should not extend over the duration of the invocation of `usingEmit`.

Any asynchronous communication should be best done with `Channel`s. You can then manually forward any elements received from a channel to `emit`, or use e.g. `FlowEmit.channelToEmit`.
Any asynchronous communication should be best done with `Channel`s. You can then manually forward any elements received
from a channel to `emit`, or use e.g. `FlowEmit.channelToEmit`.

### Transforming flows: basics

Multiple transformation stages can be added to a flow, each time returning a new `Flow` instance, describing the extended pipeline. As before, no elements are emitted or transformed until the flow is run, as flows are lazy. There's a number of pre-defined transformation stages:
Multiple transformation stages can be added to a flow, each time returning a new `Flow` instance, describing the
extended pipeline. As before, no elements are emitted or transformed until the flow is run, as flows are lazy. There's a
number of pre-defined transformation stages:

```java
import java.util.Map;

import com.softwaremill.jox.flows.Flows;

public class Demo {
Expand All @@ -749,7 +764,8 @@ public class Demo {
}
```

You can also define arbitrary element-emitting logic, using each incoming element using `.mapUsingEmit`, similarly to `Flows.usingEmit` above.
You can also define arbitrary element-emitting logic, using each incoming element using `.mapUsingEmit`, similarly to
`Flows.usingEmit` above.

### Running flows

Expand All @@ -762,15 +778,16 @@ import com.softwaremill.jox.flows.Flows;

public class Demo {

public static void main(String[] args) throws Exception {
Flows.fromValues(1, 2, 3).runToList(); // List(1, 2, 3)
Flows.fromValues(1, 2, 3).runForeach(System.out::println);
Flows.tick(Duration.ofSeconds(1), "x").runDrain(); // never finishes
}
public static void main(String[] args) throws Exception {
Flows.fromValues(1, 2, 3).runToList(); // List(1, 2, 3)
Flows.fromValues(1, 2, 3).runForeach(System.out::println);
Flows.tick(Duration.ofSeconds(1), "x").runDrain(); // never finishes
}
}
```

Running a flow is a blocking operation. Unless asynchronous boundaries are present (explicit or implicit, more on this below), the entire processing happens on the calling thread. For example such a pipeline:
Running a flow is a blocking operation. Unless asynchronous boundaries are present (explicit or implicit, more on this
below), the entire processing happens on the calling thread. For example such a pipeline:

```java
import com.softwaremill.jox.flows.Flows;
Expand All @@ -785,19 +802,28 @@ public class Demo {
}
}
```
Processes the elements one-by-one on the thread that is invoking the run method.

Processes the elements one-by-one on the thread that is invoking the run method.

### Transforming flows: concurrency

A number of flow transformations introduces asynchronous boundaries. For example, `.mapPar(int parallelism, Function<T,U> mappingFunction)` describes a flow,
which runs the pipeline defined so far in the background, emitting elements to a `channel`. Another `fork` reads these elements and runs up to `parallelism` invocations of `mappingFunction` concurrently. Mapped elements are then emitted by the returned flow.
A number of flow transformations introduces asynchronous boundaries. For example,
`.mapPar(int parallelism, Function<T,U> mappingFunction)` describes a flow,
which runs the pipeline defined so far in the background, emitting elements to a `channel`. Another `fork` reads these
elements and runs up to `parallelism` invocations of `mappingFunction` concurrently. Mapped elements are then emitted by
the returned flow.

Behind the scenes, a new concurrency `Scope` is created along with a number of forks. In case of any exceptions, everything is cleaned up before the flow propagates the exceptions. The `.mapPar` logic ensures that any exceptions from the preceding pipeline are propagated through the channel.
Behind the scenes, a new concurrency `Scope` is created along with a number of forks. In case of any exceptions,
everything is cleaned up before the flow propagates the exceptions. The `.mapPar` logic ensures that any exceptions from
the preceding pipeline are propagated through the channel.

Some other stages which introduce concurrency include `.merge`, `.interleave`, `.groupedWithin` and `I/O` stages. The created channels serve as buffers between the pipeline stages, and their capacity is defined by the `ScopedValue` `Channel.BUFFER_SIZE` in the scope, or default `Channel.DEFAULT_BUFFER_SIZE` is used.
Some other stages which introduce concurrency include `.merge`, `.interleave`, `.groupedWithin` and `I/O` stages. The
created channels serve as buffers between the pipeline stages, and their capacity is defined by the `ScopedValue`
`Channel.BUFFER_SIZE` in the scope, or default `Channel.DEFAULT_BUFFER_SIZE` is used.

Explicit asynchronous boundaries can be inserted using `.buffer()`. This might be useful if producing the next element to emit, and consuming the previous should run concurrently; or if the processing times of the consumer varies, and the producer should buffer up elements.
Explicit asynchronous boundaries can be inserted using `.buffer()`. This might be useful if producing the next element
to emit, and consuming the previous should run concurrently; or if the processing times of the consumer varies, and the
producer should buffer up elements.

### Interoperability with channels

Expand Down Expand Up @@ -825,20 +851,28 @@ public class Demo {
}
```

The method above needs to be run within a concurrency scope, as `.runToChannel()` creates a background fork which runs the pipeline described by the flow, and emits its elements onto the returned channel.
The method above needs to be run within a concurrency scope, as `.runToChannel()` creates a background fork which runs
the pipeline described by the flow, and emits its elements onto the returned channel.

### Text transformations and I/O operations

For smooth operations on `byte[]`, we've created a wrapper class `ByteChunk`. And for smooth type handling we created a dedicated `ByteFlow`, a subtype of `Flow<ByteChunk>`.
To be able to utilize text and I/O operations, you need to create or transform into `ByteFlow`. It can be created via `Flows.fromByteArray` or `Flows.fromByteChunk`.
`Flow` containing `byte[]` or `ByteChunk` can be transformed by using `toByteFlow()` method. Any other flow can be transformed by using `toByteFlow()` with mapping function.
For smooth operations on `byte[]`, we've created a wrapper class `ByteChunk`. And for smooth type handling we created a
dedicated `ByteFlow`, a subtype of `Flow<ByteChunk>`.
To be able to utilize text and I/O operations, you need to create or transform into `ByteFlow`. It can be created via
`Flows.fromByteArray` or `Flows.fromByteChunk`.
`Flow` containing `byte[]` or `ByteChunk` can be transformed by using `toByteFlow()` method. Any other flow can be
transformed by using `toByteFlow()` with mapping function.

#### Text operations

* `encodeUtf8` encodes a `Flow<String>` into a `ByteFlow`
* `linesUtf8` decodes a `ByteFlow` into a `Flow<String>`. Assumes that the input represents text with line breaks. The `String` elements emitted by resulting `Flow<String>` represent text lines.
* `decodeStringUtf8` to decode a `ByteFlow` into a `Flow<String>`, without handling line breaks, just processing input bytes as UTF-8 characters, even if a multi-byte character is divided into two chunks.
* `linesUtf8` decodes a `ByteFlow` into a `Flow<String>`. Assumes that the input represents text with line breaks. The
`String` elements emitted by resulting `Flow<String>` represent text lines.
* `decodeStringUtf8` to decode a `ByteFlow` into a `Flow<String>`, without handling line breaks, just processing input
bytes as UTF-8 characters, even if a multi-byte character is divided into two chunks.

#### I/O Operations

* `runToInputStream(UnsupervisedScope scope)` runs given flow asynchronously into returned `InputStream`
* `runToOutputStream(OutputStream outputStream)` runs given flow into provided `OutputStream`
* `runToFile(Path path)` runs given flow into file. If file does not exist, it's created.
Expand All @@ -847,7 +881,8 @@ It is also possible to create Flow from `inputStream` or `path` using `Flows` fa

### Logging

Jox does not have any integrations with logging libraries, but it provides a simple way to log elements emitted by flows using the `.tap` method:
Jox does not have any integrations with logging libraries, but it provides a simple way to log elements emitted by flows
using the `.tap` method:

```java
import com.softwaremill.jox.flows.Flows;
Expand All @@ -874,19 +909,18 @@ process. Hence, the scope should remain active as long as the publisher is used.
Internally, elements emitted by the flow are buffered, using a buffer of capacity given by the `Channel.BUFFER_SIZE` in
scope.

To obtain a `org.reactivestreams.Publisher` instance, you'll need to add the `reactive-streams` dependency and
To obtain a `org.reactivestreams.Publisher` instance, you'll need to add the `reactive-streams` dependency and
use `org.reactivestreams.FlowAdapters`.


#### Publisher -> Flow

A `java.util.concurrent.Flow.Publisher` can be converted to a `Flow` using `Flow.fromPublisher`.

Internally, elements published to the subscription are buffered, using a buffer of capacity given by the
`Channel.BUFFER_SIZE` in scope. That's also how many elements will be at most requested from the publisher at a time.

To convert a `org.reactivestreams.Publisher` instance, you'll need the same dependency as above and use `org.reactivestreams.FlowAdapters`.

To convert a `org.reactivestreams.Publisher` instance, you'll need the same dependency as above and use
`org.reactivestreams.FlowAdapters`.

## Feedback

Expand Down
2 changes: 1 addition & 1 deletion channels/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ operations on these (previous) segments, and we'll end up wanting to remove such
/**
* Can be used with {@link Channel#withScopedBufferSize()} to pass buffer size value from scope.
* e.g. `ScopedValues.where(BUFFER_SIZE, 8).run(() -> Channel.withScopedBufferSize())` will create a channel with buffer size = 8
* **/
**/
public static final ScopedValue<Integer> BUFFER_SIZE = ScopedValue.newInstance();
public static final int DEFAULT_BUFFER_SIZE = 16;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
*/
public sealed interface ChannelClosed permits ChannelDone, ChannelError {
ChannelClosedException toException();

Channel<?> channel();
}
1 change: 0 additions & 1 deletion channels/src/main/java/com/softwaremill/jox/Segment.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.concurrent.atomic.AtomicInteger;

final class Segment {
static final int SEGMENT_SIZE = 32; // 2^5
Expand Down
2 changes: 1 addition & 1 deletion channels/src/main/java/com/softwaremill/jox/Select.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static <U> Object selectOrClosed(SelectClause<? extends U>... clauses) th
// no clauses given
throw new IllegalArgumentException("No clauses given");
}
if (Arrays.stream(clauses).anyMatch(Objects::isNull)){
if (Arrays.stream(clauses).anyMatch(Objects::isNull)) {
// null clauses given
throw new IllegalArgumentException("Null clauses are not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;

import static java.util.concurrent.TimeUnit.SECONDS;
import static com.softwaremill.jox.TestUtil.*;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static com.softwaremill.jox.TestUtil.*;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;

public class ChannelRendezvousTest {
@Test
Expand Down
Loading

0 comments on commit 2fdcd3d

Please sign in to comment.