Skip to content

Commit

Permalink
call onComplete only after last message has been requested and sent
Browse files Browse the repository at this point in the history
fixes #87
  • Loading branch information
C-Otto committed Jan 15, 2024
1 parent 8ce7f0f commit 3130481
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ private class PaymentStatusSubscription implements Subscription {
private final Subscriber<? super InstantWithString> subscriber;
private final List<InstantWithString> messagesForSubscriber;
private long requested;
private boolean completed;

public PaymentStatusSubscription(
Subscriber<? super InstantWithString> subscriber,
Expand All @@ -190,7 +191,11 @@ public void cancel() {
}

public void onComplete() {
subscriber.onComplete();
if (messagesForSubscriber.isEmpty()) {
subscriber.onComplete();
} else {
completed = true;
}
}

public void onNext(InstantWithString message) {
Expand All @@ -206,6 +211,9 @@ private void sendRequestedMessages() {
messagesForSubscriber.remove(0);
requested--;
}
if (completed) {
subscriber.onComplete();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,24 @@ public class ReactiveStreamReader<T> implements Subscriber<T> {
private final Integer expectedMessages;

private boolean done;
private final long messagesToRequest;

@CheckForNull
private Subscription subscription;

public ReactiveStreamReader(int expectedMessages) {
this(expectedMessages, Long.MAX_VALUE);
}

public ReactiveStreamReader(int expectedMessages, long messagesToRequest) {
this.expectedMessages = expectedMessages;
this.messagesToRequest = messagesToRequest;
}

@SuppressWarnings("PMD.NullAssignment")
public ReactiveStreamReader() {
this.expectedMessages = null;
messagesToRequest = Long.MAX_VALUE;
}

public static <T> List<T> readMessages(Publisher<T> publisher, int expectedMessages) {
Expand All @@ -45,7 +52,7 @@ public static <T> List<T> readAll(Publisher<T> publisher) {
return instance.getMessages();
}

private List<T> getMessages() {
public List<T> getMessages() {
await().atMost(2, SECONDS).until(() -> done);
if (expectedMessages != null) {
return messages.subList(0, expectedMessages);
Expand All @@ -56,16 +63,14 @@ private List<T> getMessages() {
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
subscription.request(messagesToRequest);
}

@Override
public void onNext(T message) {
messages.add(message);
if (expectedMessages != null && messages.size() == expectedMessages) {
done = true;
} else {
Objects.requireNonNull(subscription).request(1);
}
}

Expand All @@ -78,4 +83,8 @@ public void onError(Throwable throwable) {
public void onComplete() {
done = true;
}

public void requestAnotherMessage() {
Objects.requireNonNull(subscription).request(1);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.cotto.lndmanagej.pickhardtpayments.model;

import de.cotto.lndmanagej.ReactiveStreamReader;
import de.cotto.lndmanagej.model.Coins;
import de.cotto.lndmanagej.model.EdgeWithLiquidityInformation;
import de.cotto.lndmanagej.model.FailureCode;
Expand Down Expand Up @@ -134,6 +135,19 @@ void info() {
.contains("hallo!");
}

@Test
void last_message_requested_after_completion_triggered() {
PaymentStatus paymentStatus = PaymentStatus.createFor(PAYMENT_HASH);
paymentStatus.info("message");
ReactiveStreamReader<InstantWithString> instance = new ReactiveStreamReader<>(3, 2);
paymentStatus.subscribe(instance);
paymentStatus.failed("failed");
instance.requestAnotherMessage();
assertThat(instance.getMessages())
.map(InstantWithString::string)
.contains("failed");
}

@Nested
class Failed {
@Test
Expand Down

0 comments on commit 3130481

Please sign in to comment.