Skip to content

Commit

Permalink
Remove channel ops (hot streaming) (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Feb 4, 2025
1 parent 2fdcd3d commit ae08fa5
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 418 deletions.
105 changes: 13 additions & 92 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -449,16 +449,20 @@ e.g.
import com.softwaremill.jox.structured.JoxScopeExecutionException;
import com.softwaremill.jox.structured.Scopes;

...
try {
Scopes.supervised(scope -> {
throw new TestException("x");
});
} catch (JoxScopeExecutionException e) {
e.unwrapAndThrow(OtherException.class, TestException.class, YetAnotherException.class);
throw e;
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/* ... */
try {
Scopes.supervised(scope -> {
throw new TestException("x");
});
} catch (JoxScopeExecutionException e) {
e.unwrapAndThrow(OtherException.class, TestException.class, YetAnotherException.class);
throw e;
}
/* ... */
}
}
...
```

#### Other types of scopes & forks
Expand Down Expand Up @@ -546,89 +550,6 @@ public class Demo {
// result = 5
```

## (Hot) Streaming

### Dependency

Maven:

```xml

<dependency>
<groupId>com.softwaremill.jox</groupId>
<artifactId>channel-ops</artifactId>
<version>0.3.1</version>
</dependency>
```

Gradle:

```groovy
implementation 'com.softwaremill.jox:channel-ops:0.3.1'
```

### Usage

Using this module you can run operations on streams which require starting background threads. To do that,
you need to pass an active concurrency scope (started using `supervised`) to the `SourceOps` constructor.

Each method from `SourceOps` causes a new fork (virtual thread) to be started, which starts running its logic
immediately (producing elements / consuming and transforming elements from the given source). Thus, this is an
implementation of "hot streams".

#### Creating streams

Sources from iterables, or tick-sources, can be created by calling methods on `SourceOps`:

```java
import java.util.concurrent.ExecutionException;

import static com.softwaremill.jox.structured.Scopes.supervised;

public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
supervised(scope -> {
new SourceOps(scope)
.tick(500, "tick")
.toSource()
.forEach(v -> System.out.println(v));
return null; // unreachable, as `tick` produces infinitely many elements
});
}
}
```

A tick-source can also be used in the usual way, by calling `.receive` on it, or by using it in `select`'s clauses.

#### Transforming streams

Streams can be transformed by calling the appropriate methods on the object returned by
`SourceOps.forSource(scope, source)`.

`collect` combines the functionality of `map` and `filter`: elements are mapped, and when the mapping function returns
`null`, the element is skipped:

```java
import java.util.List;
import java.util.concurrent.ExecutionException;

import static com.softwaremill.jox.structured.Scopes.supervised;

public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
var result = supervised(scope -> new SourceOps(scope)
.fromIterable(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.collect(n -> {
if (n % 2 == 0) return null;
else return n * 10;
})
.toSource().toList());
System.out.println("result = " + result);
}
}
// result = [10, 30, 50, 70, 90]
```

## (Lazy) Streaming - Flows

### Dependency
Expand Down
38 changes: 0 additions & 38 deletions channel-ops/pom.xml

This file was deleted.

134 changes: 0 additions & 134 deletions channel-ops/src/main/java/com/softwaremill/jox/ops/SourceOps.java

This file was deleted.

This file was deleted.

Loading

0 comments on commit ae08fa5

Please sign in to comment.