Skip to content

Commit

Permalink
feat(core): handle multiple payloads in methodLevelListener
Browse files Browse the repository at this point in the history
  • Loading branch information
timonback committed Nov 13, 2023
1 parent 34c1a26 commit e4545cd
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.toMessageObjectOrComposition;
Expand All @@ -44,11 +42,6 @@ public abstract class AbstractClassLevelListenerScanner<

private final SchemasService schemasService;

private static final Comparator<Map.Entry<String, ChannelItem>> byPublishOperationName =
Comparator.comparing(it -> it.getValue().getPublish().getOperationId());
private static final Supplier<Set<Map.Entry<String, ChannelItem>>> channelItemSupplier =
() -> new TreeSet<>(byPublishOperationName);

/**
* This annotation is used on class level
*
Expand Down Expand Up @@ -100,17 +93,17 @@ protected AsyncHeaders buildHeaders(Method method) {
@Override
public Map<String, ChannelItem> scan() {
Set<Class<?>> components = componentClassScanner.scan();
Set<Map.Entry<String, ChannelItem>> channels = mapToChannels(components);
List<Map.Entry<String, ChannelItem>> channels = mapToChannels(components);
return ChannelMerger.merge(new ArrayList<>(channels));
}

private Set<Map.Entry<String, ChannelItem>> mapToChannels(Set<Class<?>> components) {
private List<Map.Entry<String, ChannelItem>> mapToChannels(Set<Class<?>> components) {
return components.stream()
.filter(this::isClassAnnotated)
.map(this::mapClassToChannel)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toCollection(channelItemSupplier));
.collect(Collectors.toList());
}

private boolean isClassAnnotated(Class<?> component) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.asyncapi.v2.binding.channel.ChannelBinding;
import com.asyncapi.v2.binding.message.MessageBinding;
import com.asyncapi.v2.binding.operation.OperationBinding;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelMerger;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.classes.ComponentClassScanner;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
Expand All @@ -21,11 +22,12 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;

@Slf4j
Expand All @@ -38,11 +40,17 @@ public abstract class AbstractMethodLevelListenerScanner<T extends Annotation> i

@Override
public Map<String, ChannelItem> scan() {
return componentClassScanner.scan().stream()
Set<Class<?>> components = componentClassScanner.scan();
List<Map.Entry<String, ChannelItem>> channels = mapToChannels(components);
return ChannelMerger.merge(channels);
}

private List<Map.Entry<String, ChannelItem>> mapToChannels(Set<Class<?>> components) {
return components.stream()
.map(this::getAnnotatedMethods)
.flatMap(Collection::stream)
.map(this::mapMethodToChannel)
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue, (el1, el2) -> el1));
.collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Exchange exampleTopicExchange() {

@Bean
public Queue exampleTopicQueue() {
return new Queue("example-topic-queue");
return new Queue("multi-payload-queue");
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,13 @@ public void bindingsExample(AnotherPayloadDto payload) {
log.info("Received new message in example-bindings-queue: {}", payload.toString());
}

@RabbitListener(queues = "example-topic-queue")
@RabbitListener(queues = "multi-payload-queue")
public void bindingsBeanExample(AnotherPayloadDto payload) {
log.info("Received new message in example-topic-queue: {}", payload.toString());
log.info("Received new message in multi-payload-queue (AnotherPayloadDto): {}", payload.toString());
}

@RabbitListener(queues = "multi-payload-queue")
public void bindingsBeanExample(ExamplePayloadDto payload) {
log.info("Received new message in multi-payload-queue (ExamplePayloadDto): {}", payload.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@
}
}
},
"example-topic-queue": {
"example-topic-routing-key": {
"publish": {
"operationId": "example-topic-queue_publish_bindingsBeanExample",
"operationId": "example-topic-routing-key_publish_bindingsExample",
"description": "Auto-generated description",
"bindings": {
"amqp": {
Expand Down Expand Up @@ -274,26 +274,26 @@
"amqp": {
"is": "routingKey",
"exchange": {
"name": "example-topic-exchange",
"name": "example-bindings-exchange-name",
"type": "topic",
"durable": true,
"autoDelete": false,
"vhost": "/"
},
"queue": {
"name": "example-topic-queue",
"durable": true,
"exclusive": false,
"autoDelete": false,
"name": "example-bindings-queue",
"durable": false,
"exclusive": true,
"autoDelete": true,
"vhost": "/"
},
"bindingVersion": "0.2.0"
}
}
},
"example-topic-routing-key": {
"multi-payload-queue": {
"publish": {
"operationId": "example-topic-routing-key_publish_bindingsExample",
"operationId": "multi-payload-queue_publish_bindingsBeanExample",
"description": "Auto-generated description",
"bindings": {
"amqp": {
Expand All @@ -304,37 +304,57 @@
}
},
"message": {
"schemaFormat": "application/vnd.oai.openapi+json;version=3.0.0",
"name": "io.github.stavshamir.springwolf.example.amqp.dtos.AnotherPayloadDto",
"title": "AnotherPayloadDto",
"payload": {
"$ref": "#/components/schemas/io.github.stavshamir.springwolf.example.amqp.dtos.AnotherPayloadDto"
},
"headers": {
"$ref": "#/components/schemas/HeadersNotDocumented"
},
"bindings": {
"amqp": {
"bindingVersion": "0.2.0"
"oneOf": [
{
"schemaFormat": "application/vnd.oai.openapi+json;version=3.0.0",
"name": "io.github.stavshamir.springwolf.example.amqp.dtos.AnotherPayloadDto",
"title": "AnotherPayloadDto",
"payload": {
"$ref": "#/components/schemas/io.github.stavshamir.springwolf.example.amqp.dtos.AnotherPayloadDto"
},
"headers": {
"$ref": "#/components/schemas/HeadersNotDocumented"
},
"bindings": {
"amqp": {
"bindingVersion": "0.2.0"
}
}
},
{
"schemaFormat": "application/vnd.oai.openapi+json;version=3.0.0",
"name": "io.github.stavshamir.springwolf.example.amqp.dtos.ExamplePayloadDto",
"title": "ExamplePayloadDto",
"payload": {
"$ref": "#/components/schemas/io.github.stavshamir.springwolf.example.amqp.dtos.ExamplePayloadDto"
},
"headers": {
"$ref": "#/components/schemas/HeadersNotDocumented"
},
"bindings": {
"amqp": {
"bindingVersion": "0.2.0"
}
}
}
}
]
}
},
"bindings": {
"amqp": {
"is": "routingKey",
"exchange": {
"name": "example-bindings-exchange-name",
"name": "example-topic-exchange",
"type": "topic",
"durable": true,
"autoDelete": false,
"vhost": "/"
},
"queue": {
"name": "example-bindings-queue",
"durable": false,
"exclusive": true,
"autoDelete": true,
"name": "multi-payload-queue",
"durable": true,
"exclusive": false,
"autoDelete": false,
"vhost": "/"
},
"bindingVersion": "0.2.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@
}
}
},
"example-topic-queue": {
"example-topic-routing-key": {
"publish": {
"operationId": "example-topic-queue_publish_bindingsBeanExample",
"operationId": "example-topic-routing-key_publish_bindingsExample",
"description": "Auto-generated description",
"bindings": {
"amqp": {
Expand Down Expand Up @@ -192,26 +192,26 @@
"amqp": {
"is": "routingKey",
"exchange": {
"name": "example-topic-exchange",
"name": "example-bindings-exchange-name",
"type": "topic",
"durable": true,
"autoDelete": false,
"vhost": "/"
},
"queue": {
"name": "example-topic-queue",
"durable": true,
"exclusive": false,
"autoDelete": false,
"name": "example-bindings-queue",
"durable": false,
"exclusive": true,
"autoDelete": true,
"vhost": "/"
},
"bindingVersion": "0.2.0"
}
}
},
"example-topic-routing-key": {
"multi-payload-queue": {
"publish": {
"operationId": "example-topic-routing-key_publish_bindingsExample",
"operationId": "multi-payload-queue_publish_bindingsBeanExample",
"description": "Auto-generated description",
"bindings": {
"amqp": {
Expand All @@ -222,37 +222,57 @@
}
},
"message": {
"schemaFormat": "application/vnd.oai.openapi+json;version=3.0.0",
"name": "io.github.stavshamir.springwolf.example.amqp.dtos.AnotherPayloadDto",
"title": "AnotherPayloadDto",
"payload": {
"$ref": "#/components/schemas/io.github.stavshamir.springwolf.example.amqp.dtos.AnotherPayloadDto"
},
"headers": {
"$ref": "#/components/schemas/HeadersNotDocumented"
},
"bindings": {
"amqp": {
"bindingVersion": "0.2.0"
"oneOf": [
{
"schemaFormat": "application/vnd.oai.openapi+json;version=3.0.0",
"name": "io.github.stavshamir.springwolf.example.amqp.dtos.AnotherPayloadDto",
"title": "AnotherPayloadDto",
"payload": {
"$ref": "#/components/schemas/io.github.stavshamir.springwolf.example.amqp.dtos.AnotherPayloadDto"
},
"headers": {
"$ref": "#/components/schemas/HeadersNotDocumented"
},
"bindings": {
"amqp": {
"bindingVersion": "0.2.0"
}
}
},
{
"schemaFormat": "application/vnd.oai.openapi+json;version=3.0.0",
"name": "io.github.stavshamir.springwolf.example.amqp.dtos.ExamplePayloadDto",
"title": "ExamplePayloadDto",
"payload": {
"$ref": "#/components/schemas/io.github.stavshamir.springwolf.example.amqp.dtos.ExamplePayloadDto"
},
"headers": {
"$ref": "#/components/schemas/HeadersNotDocumented"
},
"bindings": {
"amqp": {
"bindingVersion": "0.2.0"
}
}
}
}
]
}
},
"bindings": {
"amqp": {
"is": "routingKey",
"exchange": {
"name": "example-bindings-exchange-name",
"name": "example-topic-exchange",
"type": "topic",
"durable": true,
"autoDelete": false,
"vhost": "/"
},
"queue": {
"name": "example-bindings-queue",
"durable": false,
"exclusive": true,
"autoDelete": true,
"name": "multi-payload-queue",
"durable": true,
"exclusive": false,
"autoDelete": false,
"vhost": "/"
},
"bindingVersion": "0.2.0"
Expand Down
Loading

0 comments on commit e4545cd

Please sign in to comment.