Skip to content

Commit

Permalink
Channel reference in the ClosedChannel (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
Blef666 authored Jan 23, 2025
1 parent c874a2d commit ca4fbeb
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;

public class SourceOpsCollectTest {
@Test
Expand All @@ -26,7 +27,7 @@ void testMapOverSource() throws Exception {
assertEquals(10, s.receive());
assertEquals(20, s.receive());
assertEquals(30, s.receive());
assertEquals(new ChannelDone(), s.receiveOrClosed());
assertInstanceOf(ChannelDone.class, s.receiveOrClosed());
return null;
});
}
Expand All @@ -52,7 +53,7 @@ void testCollectOverSource() throws Exception {

assertEquals(20, s.receive());
assertEquals(40, s.receive());
assertEquals(new ChannelDone(), s.receiveOrClosed());
assertInstanceOf(ChannelDone.class, s.receiveOrClosed());
return null;
});
}
Expand All @@ -71,7 +72,7 @@ void testCollectOverSourceStressTest() throws Exception {
Source<Integer> s = SourceOps.forSource(scope, c).collect(x -> x * 10).toSource();

assertEquals(10, s.receive());
assertEquals(new ChannelDone(), s.receiveOrClosed());
assertInstanceOf(ChannelDone.class, s.receiveOrClosed());
return null;
});
}
Expand All @@ -94,7 +95,7 @@ void testCollectOverSourceUsingForSyntax() throws Exception {
assertEquals(2, s.receive());
assertEquals(4, s.receive());
assertEquals(6, s.receive());
assertEquals(new ChannelDone(), s.receiveOrClosed());
assertInstanceOf(ChannelDone.class, s.receiveOrClosed());
return null;
});
}
Expand Down
4 changes: 2 additions & 2 deletions channels/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ public void done() {

@Override
public Object doneOrClosed() {
return closeOrClosed(new ChannelDone());
return closeOrClosed(new ChannelDone(this));
}

@Override
Expand All @@ -722,7 +722,7 @@ public void error(Throwable reason) {

@Override
public Object errorOrClosed(Throwable reason) {
return closeOrClosed(new ChannelError(reason));
return closeOrClosed(new ChannelError(reason, this));
}

private Object closeOrClosed(ChannelClosed channelClosed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
*/
public sealed interface ChannelClosed permits ChannelDone, ChannelError {
ChannelClosedException toException();
Channel<?> channel();
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.softwaremill.jox;

public record ChannelDone() implements ChannelClosed {
public record ChannelDone(Channel<?> channel) implements ChannelClosed {
@Override
public ChannelClosedException toException() {
return new ChannelDoneException();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.softwaremill.jox;

public record ChannelError(Throwable cause) implements ChannelClosed {
public record ChannelError(Throwable cause, Channel<?> channel) implements ChannelClosed {
@Override
public ChannelClosedException toException() {
return new ChannelErrorException(cause);
Expand Down
14 changes: 10 additions & 4 deletions channels/src/main/java/com/softwaremill/jox/Select.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class Select {
* <p>
* If no clauses are given, throws {@link ChannelDoneException}.
*
* @param clauses The clauses, from which one will be selected. Not {@code null}.
* @param clauses The clauses, from which one will be selected. Array must not be empty or {@code null} and
* can't contain {@code null} values.
* @return The value returned by the selected clause.
* @throws ChannelClosedException When any of the channels is closed (done or in error).
*/
Expand All @@ -72,16 +73,21 @@ public static <U> U select(SelectClause<? extends U>... clauses) throws Interrup
* <p>
* If no clauses are given, returns {@link ChannelDone}.
*
* @param clauses The clauses, from which one will be selected. Not {@code null}.
* @param clauses The clauses, from which one will be selected. Array must not be empty or {@code null} and
* can't contain {@code null} values.
* @return Either the value returned by the selected clause, or {@link ChannelClosed}, when any of the channels
* is closed (done or in error).
*/
@SafeVarargs
public static <U> Object selectOrClosed(SelectClause<? extends U>... clauses) throws InterruptedException {
while (true) {
if (clauses.length == 0) {
if (clauses == null || clauses.length == 0) {
// no clauses given
return new ChannelDone();
throw new IllegalArgumentException("No clauses given");
}
if (Arrays.stream(clauses).anyMatch(Objects::isNull)){
// null clauses given
throw new IllegalArgumentException("Null clauses are not supported");
}

var r = doSelectOrClosed(clauses);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,29 @@
import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;

import static com.softwaremill.jox.TestUtil.*;
import static com.softwaremill.jox.TestUtil.forkCancelable;
import static com.softwaremill.jox.TestUtil.scoped;
import static org.junit.jupiter.api.Assertions.*;

public class ChannelClosedTest {
@Test
void testClosed_noValues_whenError() {
void testClosed_noValues_whenError() throws InterruptedException {
// given
Channel<Integer> c = new Channel<>();
RuntimeException reason = new RuntimeException();

// when
c.error(new RuntimeException());
c.error(reason);

// then
assertTrue(c.isClosedForReceive());
assertTrue(c.isClosedForSend());
assertEquals(new ChannelError(reason, c), c.receiveOrClosed());
}

@Test
void testClosed_noValues_whenDone() {
void testClosed_noValues_whenDone() throws InterruptedException {
// given
Channel<Integer> c = new Channel<>();

Expand All @@ -34,6 +35,7 @@ void testClosed_noValues_whenDone() {
// then
assertTrue(c.isClosedForReceive());
assertTrue(c.isClosedForSend());
assertEquals(new ChannelDone(c), c.receiveOrClosed());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ void pendingReceivesShouldGetNotifiedThatChannelIsDone() throws InterruptedExcep
c.done();

// then
assertEquals(new ChannelDone(), f.get());
assertEquals(new ChannelDone(c), f.get());

// should be rejected immediately
assertEquals(new ChannelDone(), c.receiveOrClosed());
assertEquals(new ChannelDone(c), c.receiveOrClosed());
});
}

Expand Down
23 changes: 18 additions & 5 deletions channels/src/test/java/com/softwaremill/jox/SelectReceiveTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static com.softwaremill.jox.Select.selectOrClosed;
import static com.softwaremill.jox.TestUtil.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class SelectReceiveTest {
@Test
Expand Down Expand Up @@ -139,7 +140,7 @@ void testSelectWhenDone() throws InterruptedException {
Object received = selectOrClosed(ch1.receiveClause(), ch2.receiveClause());

// then
assertEquals(new ChannelDone(), received);
assertEquals(new ChannelDone(ch1), received);
}

@TestWithCapacities
Expand Down Expand Up @@ -219,12 +220,24 @@ void testBufferExpandedWhenSelecting() throws InterruptedException {
}

@Test
void testSelectFromNone() throws InterruptedException {
assertEquals(new ChannelDone(), selectOrClosed());
void testSelectFromNone() {
assertThrows(IllegalArgumentException.class, Select::selectOrClosed);
}

@Test
public void testSelect_immediate_withError() throws InterruptedException {
void testSelectFromNullableList() throws InterruptedException {
// given
Channel<String> ch1 = new Channel<>(1);
Channel<String> ch2 = new Channel<>(1);
ch1.send("v");

// when
assertThrows(IllegalArgumentException.class,
() -> select(ch1.receiveClause(), null, ch2.receiveClause()));
}

@Test
void testSelect_immediate_withError() throws InterruptedException {
// given
Channel<String> ch1 = new Channel<>(2);
ch1.send("x");
Expand All @@ -237,6 +250,6 @@ public void testSelect_immediate_withError() throws InterruptedException {
var result = selectOrClosed(ch1.receiveClause(), ch2.receiveClause());

// then
assertEquals(new ChannelError(e), result);
assertEquals(new ChannelError(e, ch2), result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testSelectWhenDone() throws InterruptedException {
Object received = selectOrClosed(ch1.sendClause("v1"), ch2.sendClause("v2"));

// then
assertEquals(new ChannelDone(), received);
assertEquals(new ChannelDone(ch2), received);
}

@TestWithCapacities
Expand Down
4 changes: 2 additions & 2 deletions flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ public Flow<List<T>> groupedWeightedWithin(long minWeight, Duration duration, Fu
if (!buffer.isEmpty()) outputChannel.send(buffer);
outputChannel.done();
yield false;
case ChannelError(Throwable cause):
case ChannelError(Throwable cause, Channel<?> _):
// source returned error, propagate it and finish
if (timeoutFork != null) timeoutFork.cancelNow();
outputChannel.error(cause);
Expand Down Expand Up @@ -1319,7 +1319,7 @@ public <U> Flow<U> mapPar(int parallelism, ThrowingFunction<T, U> f) {
results.done();
return null;
}
case ChannelError(Throwable e) -> throw new IllegalStateException("inProgress should never be closed with an error", e);
case ChannelError(Throwable e, Channel<?> _) -> throw new IllegalStateException("inProgress should never be closed with an error", e);
case Object fork -> {
//noinspection unchecked
Optional<U> result = ((Fork<Optional<U>>) fork).join();
Expand Down
2 changes: 1 addition & 1 deletion flows/src/main/java/com/softwaremill/jox/flows/Flows.java
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public static <T> Flow<T> interleaveAll(List<Flow<T>> flows, int segmentSize, bo
currentSourceIndex = (currentSourceIndex + 1) % availableSources.size();
elementsRead = 0;
}
} else if (received instanceof ChannelError(Throwable cause)) {
} else if (received instanceof ChannelError(Throwable cause, Channel<?> _)) {
// if any source fails, propagate the error
results.errorOrClosed(cause);
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.softwaremill.jox.flows;

import com.softwaremill.jox.Channel;
import com.softwaremill.jox.ChannelError;
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.JoxScopeExecutionException;
Expand Down Expand Up @@ -144,7 +145,7 @@ void shouldPropagateErrorsInMappingFunction() throws Exception {
assertEquals("b", s.receive());
assertEquals("c", s.receive());
var result = s.receiveOrClosed();
if (result instanceof ChannelError(Throwable error)) {
if (result instanceof ChannelError(Throwable error, Channel<?> _)) {
assertEquals(boom, error);
}
return null;
Expand Down

0 comments on commit ca4fbeb

Please sign in to comment.