From ae08fa5864153e0438c048a97c9ccb0edc416742 Mon Sep 17 00:00:00 2001 From: Adam Warski Date: Tue, 4 Feb 2025 09:25:01 +0100 Subject: [PATCH] Remove channel ops (hot streaming) (#93) --- README.md | 105 ++------------ channel-ops/pom.xml | 38 ----- .../com/softwaremill/jox/ops/SourceOps.java | 134 ------------------ .../jox/ops/SourceOpsCollectTest.java | 102 ------------- .../jox/ops/SourceOpsTickTest.java | 51 ------- pom.xml | 1 - 6 files changed, 13 insertions(+), 418 deletions(-) delete mode 100644 channel-ops/pom.xml delete mode 100644 channel-ops/src/main/java/com/softwaremill/jox/ops/SourceOps.java delete mode 100644 channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsCollectTest.java delete mode 100644 channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsTickTest.java diff --git a/README.md b/README.md index 618771d..6191efc 100644 --- a/README.md +++ b/README.md @@ -449,16 +449,20 @@ e.g. import com.softwaremill.jox.structured.JoxScopeExecutionException; import com.softwaremill.jox.structured.Scopes; -... -try { - Scopes.supervised(scope -> { - throw new TestException("x"); - }); -} catch (JoxScopeExecutionException e) { - e.unwrapAndThrow(OtherException.class, TestException.class, YetAnotherException.class); - throw e; +public class Demo { + public static void main(String[] args) throws ExecutionException, InterruptedException { + /* ... */ + try { + Scopes.supervised(scope -> { + throw new TestException("x"); + }); + } catch (JoxScopeExecutionException e) { + e.unwrapAndThrow(OtherException.class, TestException.class, YetAnotherException.class); + throw e; + } + /* ... */ + } } -... ``` #### Other types of scopes & forks @@ -546,89 +550,6 @@ public class Demo { // result = 5 ``` -## (Hot) Streaming - -### Dependency - -Maven: - -```xml - - - com.softwaremill.jox - channel-ops - 0.3.1 - -``` - -Gradle: - -```groovy -implementation 'com.softwaremill.jox:channel-ops:0.3.1' -``` - -### Usage - -Using this module you can run operations on streams which require starting background threads. To do that, -you need to pass an active concurrency scope (started using `supervised`) to the `SourceOps` constructor. - -Each method from `SourceOps` causes a new fork (virtual thread) to be started, which starts running its logic -immediately (producing elements / consuming and transforming elements from the given source). Thus, this is an -implementation of "hot streams". - -#### Creating streams - -Sources from iterables, or tick-sources, can be created by calling methods on `SourceOps`: - -```java -import java.util.concurrent.ExecutionException; - -import static com.softwaremill.jox.structured.Scopes.supervised; - -public class Demo { - public static void main(String[] args) throws ExecutionException, InterruptedException { - supervised(scope -> { - new SourceOps(scope) - .tick(500, "tick") - .toSource() - .forEach(v -> System.out.println(v)); - return null; // unreachable, as `tick` produces infinitely many elements - }); - } -} -``` - -A tick-source can also be used in the usual way, by calling `.receive` on it, or by using it in `select`'s clauses. - -#### Transforming streams - -Streams can be transformed by calling the appropriate methods on the object returned by -`SourceOps.forSource(scope, source)`. - -`collect` combines the functionality of `map` and `filter`: elements are mapped, and when the mapping function returns -`null`, the element is skipped: - -```java -import java.util.List; -import java.util.concurrent.ExecutionException; - -import static com.softwaremill.jox.structured.Scopes.supervised; - -public class Demo { - public static void main(String[] args) throws ExecutionException, InterruptedException { - var result = supervised(scope -> new SourceOps(scope) - .fromIterable(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) - .collect(n -> { - if (n % 2 == 0) return null; - else return n * 10; - }) - .toSource().toList()); - System.out.println("result = " + result); - } -} -// result = [10, 30, 50, 70, 90] -``` - ## (Lazy) Streaming - Flows ### Dependency diff --git a/channel-ops/pom.xml b/channel-ops/pom.xml deleted file mode 100644 index 41b3fa9..0000000 --- a/channel-ops/pom.xml +++ /dev/null @@ -1,38 +0,0 @@ - - - 4.0.0 - - - com.softwaremill.jox - parent - 0.3.1 - - - channel-ops - 0.3.1 - jar - - - - org.junit.jupiter - junit-jupiter - test - - - org.awaitility - awaitility - test - - - com.softwaremill.jox - channels - 0.3.1 - - - com.softwaremill.jox - structured - 0.3.1 - - - diff --git a/channel-ops/src/main/java/com/softwaremill/jox/ops/SourceOps.java b/channel-ops/src/main/java/com/softwaremill/jox/ops/SourceOps.java deleted file mode 100644 index 04d54d0..0000000 --- a/channel-ops/src/main/java/com/softwaremill/jox/ops/SourceOps.java +++ /dev/null @@ -1,134 +0,0 @@ -package com.softwaremill.jox.ops; - -import com.softwaremill.jox.*; -import com.softwaremill.jox.structured.Scope; - -import java.util.Iterator; -import java.util.function.Function; - -public class SourceOps { - private final Scope scope; - private final int defaultCapacity; - - public SourceOps(Scope scope) { - this(scope, 16); - } - - public SourceOps(Scope scope, int defaultCapacity) { - this.scope = scope; - this.defaultCapacity = defaultCapacity; - } - - public static ForSource forSource(Scope scope, Source s) { - var sourceOps = new SourceOps(scope); - return sourceOps.new ForSource<>(s); - } - - public class ForSource { - private final Source source; - - ForSource(Source source) { - this.source = source; - } - - public Source toSource() { - return source; - } - - /** - * Applies the given mapping function {@code f} to each element received from this source, and sends the - * results to the returned channel. If {@code f} returns {@code null}, the value will be skipped. - *

- * Errors from this channel are propagated to the returned channel. Any exceptions that occur when invoking - * {@code f} are propagated as errors to the returned channel as well. - *

- * For a lazily-evaluated version, see {@link Channel#collectAsView(Function)}. - * - * @param f The mapping function. - * @return Ops on a source, onto which results of the mapping function will be sent. - */ - public ForSource collect(Function f) { - var c2 = new Channel(defaultCapacity); - scope.fork(() -> { - var repeat = true; - while (repeat) { - switch (source.receiveOrClosed()) { - case ChannelDone cd -> { - c2.doneOrClosed(); - repeat = false; - } - case ChannelError ce -> { - c2.errorOrClosed(ce.cause()); - repeat = false; - } - case Object t -> { - try { - var u = f.apply((T) t); - if (u != null) { - repeat = !(c2.sendOrClosed(u) instanceof ChannelClosed); - } // else skip & continue - } catch (Exception e) { - c2.errorOrClosed(e); - } - } - } - } - return null; - }); - return new ForSource(c2); - } - } - - // - - public ForSource fromIterator(Iterator i) { - var c = new Channel(defaultCapacity); - scope.fork(() -> { - try { - while (i.hasNext()) { - c.sendOrClosed(i.next()); - } - c.doneOrClosed(); - } catch (Exception e) { - c.errorOrClosed(e); - } - return null; - }); - return new ForSource(c); - } - - public ForSource fromIterable(Iterable i) { - return fromIterator(i.iterator()); - } - - /** - * Creates a rendezvous channel (without a buffer, regardless of the default capacity), to which the given value is - * sent repeatedly, at least {@code intervalMillis}ms apart between each two elements. The first value is sent - * immediately. - *

- * The interval is measured between the subsequent invocations of the {@code send(value)} method. Hence, if there's - * a slow consumer, the next tick can be sent right after the previous one is received (if it was received later - * than the inter-tick interval duration). However, ticks don't accumulate, e.g. when the consumer is so slow that - * multiple intervals pass between {@code send} invocations. - *

- * Must be run within a scope, since a child fork is created which sends the ticks, and waits until the next tick - * can be sent. - * - * @param intervalMillis The temporal spacing between subsequent ticks. - * @param tickValue The value to send to the channel on every tick. - * @return Ops on a source to which the tick values are sent. - */ - public ForSource tick(long intervalMillis, T tickValue) { - var c = new Channel(); - scope.fork(() -> { - while (true) { - var start = System.nanoTime(); - c.sendOrClosed(tickValue); - var end = System.nanoTime(); - var sleep = intervalMillis * 1_000_000 - (end - start); - if (sleep > 0) Thread.sleep(sleep / 1_000_000, (int) sleep % 1_000_000); - } - }); - return new ForSource(c); - } -} diff --git a/channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsCollectTest.java b/channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsCollectTest.java deleted file mode 100644 index 5ec7be2..0000000 --- a/channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsCollectTest.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.softwaremill.jox.ops; - -import com.softwaremill.jox.Channel; -import com.softwaremill.jox.ChannelDone; -import com.softwaremill.jox.Source; -import com.softwaremill.jox.structured.Scopes; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; - -public class SourceOpsCollectTest { - @Test - void testMapOverSource() throws Exception { - Scopes.supervised(scope -> { - Channel c = new Channel(); - scope.fork(() -> { - c.send(1); - c.send(2); - c.send(3); - c.done(); - return null; - }); - - Source s = SourceOps.forSource(scope, c).collect(x -> x * 10).toSource(); - - assertEquals(10, s.receive()); - assertEquals(20, s.receive()); - assertEquals(30, s.receive()); - assertInstanceOf(ChannelDone.class, s.receiveOrClosed()); - return null; - }); - } - - @Test - void testCollectOverSource() throws Exception { - Scopes.supervised(scope -> { - Channel c = new Channel(); - scope.fork(() -> { - c.send(1); - c.send(2); - c.send(3); - c.send(4); - c.send(5); - c.done(); - return null; - }); - - Source s = SourceOps.forSource(scope, c).collect(x -> { - if (x % 2 == 0) return x * 10; - else return null; - }).toSource(); - - assertEquals(20, s.receive()); - assertEquals(40, s.receive()); - assertInstanceOf(ChannelDone.class, s.receiveOrClosed()); - return null; - }); - } - - @Test - void testCollectOverSourceStressTest() throws Exception { - for (int i = 0; i < 100000; i++) { - Scopes.supervised(scope -> { - Channel c = new Channel(); - scope.fork(() -> { - c.send(1); - c.done(); - return null; - }); - - Source s = SourceOps.forSource(scope, c).collect(x -> x * 10).toSource(); - - assertEquals(10, s.receive()); - assertInstanceOf(ChannelDone.class, s.receiveOrClosed()); - return null; - }); - } - } - - @Test - void testCollectOverSourceUsingForSyntax() throws Exception { - Scopes.supervised(scope -> { - Channel c = new Channel(); - scope.fork(() -> { - c.send(1); - c.send(2); - c.send(3); - c.done(); - return null; - }); - - Source s = SourceOps.forSource(scope, c).collect(x -> x * 2).toSource(); - - assertEquals(2, s.receive()); - assertEquals(4, s.receive()); - assertEquals(6, s.receive()); - assertInstanceOf(ChannelDone.class, s.receiveOrClosed()); - return null; - }); - } -} diff --git a/channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsTickTest.java b/channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsTickTest.java deleted file mode 100644 index 5f27da3..0000000 --- a/channel-ops/src/test/java/com/softwaremill/jox/ops/SourceOpsTickTest.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.softwaremill.jox.ops; - -import com.softwaremill.jox.structured.Scopes; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class SourceOpsTickTest { - @Test - void testTickRegularly() throws Exception { - Scopes.supervised(scope -> { - long start = System.currentTimeMillis(); - var c = new SourceOps(scope).tick(100, "tick").toSource(); - - assertEquals("tick", c.receive()); - long elapsed = System.currentTimeMillis() - start; - assertTrue(elapsed >= 0L && elapsed <= 50L); - - assertEquals("tick", c.receive()); - elapsed = System.currentTimeMillis() - start; - assertTrue(elapsed >= 100L && elapsed <= 150L); - - assertEquals("tick", c.receive()); - elapsed = System.currentTimeMillis() - start; - assertTrue(elapsed >= 200L && elapsed <= 250L); - - return null; - }); - } - - @Test - void testTickImmediatelyInCaseOfSlowConsumerAndThenResumeNormal() throws Exception { - Scopes.supervised(scope -> { - long start = System.currentTimeMillis(); - var c = new SourceOps(scope).tick(100, "tick").toSource(); - - // Simulating a slow consumer - Thread.sleep(200); - assertEquals("tick", c.receive()); // a tick should be waiting - long elapsed = System.currentTimeMillis() - start; - assertTrue(elapsed >= 200L && elapsed <= 250L); - - assertEquals("tick", c.receive()); // and immediately another, as the interval between send-s has passed - elapsed = System.currentTimeMillis() - start; - assertTrue(elapsed >= 200L && elapsed <= 250L); - - return null; - }); - } -} diff --git a/pom.xml b/pom.xml index 56a7aa7..9e8e391 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,6 @@ channels structured bench - channel-ops flows