Skip to content

Commit

Permalink
Merge pull request #2280 from ozangunalp/mutiny_2.4.0
Browse files Browse the repository at this point in the history
Bump Smallrye Mutiny from 2.1.0 to 2.4.0
  • Loading branch information
cescoffier authored Sep 11, 2023
2 parents d22b256 + acbb2f6 commit 5943f38
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@

<micrometer.version>1.11.3</micrometer.version>

<mutiny.version>2.1.0</mutiny.version>
<mutiny.version>2.4.0</mutiny.version>
<artemis.version>2.30.0</artemis.version>
<commons-io.version>2.13.0</commons-io.version>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,22 +552,25 @@ private Uni<ConsumerRecords<K, V>> poll() {

<T> Multi<T> process(Set<String> topics, Function<Multi<ConsumerRecord<K, V>>, Multi<T>> plugFunction) {
return Multi.createFrom().deferred(() -> {
if (!polling.compareAndSet(false, true)) {
return Multi.createFrom().failure(new IllegalStateException("Consumer already in use"));
}
getOrCreateConsumer().subscribe(topics, this);
return getConsumeMulti().plug(plugFunction);
});
}

<T> Multi<T> processBatch(Set<String> topics, Function<ConsumerRecords<K, V>, Multi<T>> consumerRecordFunction) {
return Multi.createFrom().deferred(() -> {
if (!polling.compareAndSet(false, true)) {
return Multi.createFrom().failure(new IllegalStateException("Consumer already in use"));
}
getOrCreateConsumer().subscribe(topics, this);
return getProcessingMulti(consumerRecordFunction);
});
}

private <T> Multi<T> getProcessingMulti(Function<ConsumerRecords<K, V>, Multi<T>> consumerRecordFunction) {
if (!polling.compareAndSet(false, true)) {
return Multi.createFrom().failure(new IllegalStateException("Consumer already in use"));
}
return poll().repeat().indefinitely()
.onItem().transformToMulti(consumerRecordFunction)
.concatenate()
Expand All @@ -577,9 +580,6 @@ private <T> Multi<T> getProcessingMulti(Function<ConsumerRecords<K, V>, Multi<T>
}

private Multi<ConsumerRecord<K, V>> getConsumeMulti() {
if (!polling.compareAndSet(false, true)) {
return Multi.createFrom().failure(new IllegalStateException("Consumer already in use"));
}
return poll().repeat().indefinitely()
.onItem().transformToMulti(cr -> Multi.createFrom().items(StreamSupport.stream(cr.spliterator(), false)))
.concatenate()
Expand Down Expand Up @@ -625,6 +625,9 @@ private Multi<ConsumerRecord<K, V>> getConsumeMulti() {
public ConsumerTask<K, V> fromOffsets(Map<TopicPartition, Long> offsets,
Function<Multi<ConsumerRecord<K, V>>, Multi<ConsumerRecord<K, V>>> plugFunction) {
return new ConsumerTask<>(Multi.createFrom().deferred(() -> {
if (!polling.compareAndSet(false, true)) {
return Multi.createFrom().failure(new IllegalStateException("Consumer already in use"));
}
getOrCreateConsumer().unsubscribe();
Set<TopicPartition> topicPartitions = offsets.keySet();
getOrCreateConsumer().assign(topicPartitions);
Expand Down Expand Up @@ -699,6 +702,9 @@ public ConsumerTask<K, V> fromOffsets(Map<TopicPartition, Long> offsets, long nu
public ConsumerTask<K, V> fromTopics(Set<String> topics,
Function<Multi<ConsumerRecord<K, V>>, Multi<ConsumerRecord<K, V>>> plugFunction) {
return new ConsumerTask<>(Multi.createFrom().deferred(() -> {
if (!polling.compareAndSet(false, true)) {
return Multi.createFrom().failure(new IllegalStateException("Consumer already in use"));
}
getOrCreateConsumer().subscribe(topics, this);
return getConsumeMulti().plug(plugFunction);
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ public SELF awaitCompletion(BiConsumer<Throwable, Boolean> assertion, Duration d
*
* @return self
*/
public SELF stop() {
subscriber.cancel();
public synchronized SELF stop() {
subscriber.onComplete();
subscriber.cancel();
return self();
}

Expand Down

0 comments on commit 5943f38

Please sign in to comment.