Skip to content

Commit

Permalink
support channel server assignments (#264)
Browse files Browse the repository at this point in the history
* Allow server assignments to channels in @AsyncOperation

* added comment.

* fix import

* fix broken javadoc in AsyncOperation

* merge master changes

* validate server refs in channels

* validate server refs in channels

* make servers property in OperationData Nullable
  • Loading branch information
tvahrst authored Nov 10, 2023
1 parent 1e245e1 commit 754474f
Show file tree
Hide file tree
Showing 19 changed files with 275 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,15 @@ public ProducerOperationDataScanner producerOperationDataScanner(
public AsyncListenerAnnotationScanner asyncListenerAnnotationScanner(
ComponentClassScanner componentClassScanner,
SchemasService schemasService,
AsyncApiDocketService asyncApiDocketService,
List<OperationBindingProcessor> operationBindingProcessors,
List<MessageBindingProcessor> messageBindingProcessors) {
return new AsyncListenerAnnotationScanner(
componentClassScanner, schemasService, operationBindingProcessors, messageBindingProcessors);
componentClassScanner,
schemasService,
asyncApiDocketService,
operationBindingProcessors,
messageBindingProcessors);
}

@Bean
Expand All @@ -94,9 +99,14 @@ public AsyncListenerAnnotationScanner asyncListenerAnnotationScanner(
public AsyncPublisherAnnotationScanner asyncPublisherAnnotationScanner(
ComponentClassScanner componentClassScanner,
SchemasService schemasService,
AsyncApiDocketService asyncApiDocketService,
List<OperationBindingProcessor> operationBindingProcessors,
List<MessageBindingProcessor> messageBindingProcessors) {
return new AsyncPublisherAnnotationScanner(
componentClassScanner, schemasService, operationBindingProcessors, messageBindingProcessors);
componentClassScanner,
schemasService,
asyncApiDocketService,
operationBindingProcessors,
messageBindingProcessors);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@

import com.asyncapi.v2._6_0.model.channel.ChannelItem;
import com.asyncapi.v2._6_0.model.channel.operation.Operation;
import com.asyncapi.v2._6_0.model.server.Server;
import com.asyncapi.v2.binding.channel.ChannelBinding;
import com.asyncapi.v2.binding.operation.OperationBinding;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner;
import io.github.stavshamir.springwolf.asyncapi.types.OperationData;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.HeaderReference;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -31,6 +33,14 @@ public abstract class AbstractOperationDataScanner implements ChannelsScanner {

protected abstract SchemasService getSchemaService();

protected abstract AsyncApiDocketService getAsyncApiDocketService();

/**
* provides a list of all {@link OperationData} items detected by the concrete
* subclass.
*
* @return
*/
protected abstract List<OperationData> getOperationData();

protected abstract OperationData.OperationType getOperationType();
Expand All @@ -57,6 +67,15 @@ private boolean allFieldsAreNonNull(OperationData operationData) {
return allNonNull;
}

/**
* Creates an asyncapi {@link ChannelItem} using the given list of {@link OperationData}. Expects, that all {@link OperationData}
* items belong to the same channel. Most properties of the resulting {@link ChannelItem} are extracted from the
* first {@link OperationData} item in the list, assuming that all {@link OperationData} contains the same channel
* informations.
*
* @param operationDataList List of all {@link OperationData} items for a single channel.
* @return the resulting {@link ChannelItem}
*/
private ChannelItem buildChannel(List<OperationData> operationDataList) {
// All bindings in the group are assumed to be the same
// AsyncApi does not support multiple bindings on a single channel
Expand All @@ -68,6 +87,7 @@ private ChannelItem buildChannel(List<OperationData> operationDataList) {
Map<String, Object> chBinding = channelBinding != null ? new HashMap<>(channelBinding) : null;
String operationId = operationDataList.get(0).getChannelName() + "_" + this.getOperationType().operationName;
String description = operationDataList.get(0).getDescription();
List<String> servers = operationDataList.get(0).getServers();

if (description.isEmpty()) {
description = "Auto-generated description";
Expand All @@ -84,6 +104,14 @@ private ChannelItem buildChannel(List<OperationData> operationDataList) {
channelBuilder = switch (getOperationType()) {
case PUBLISH -> channelBuilder.publish(operation);
case SUBSCRIBE -> channelBuilder.subscribe(operation);};

// Only set servers if servers are defined. Avoid setting an emtpy list
// because this would generate empty server entries for each channel in the resulting
// async api.
if (servers != null && !servers.isEmpty()) {
validateServers(servers, operationId);
channelBuilder.servers(servers);
}
return channelBuilder.build();
}

Expand Down Expand Up @@ -148,4 +176,31 @@ private void processAsyncMessageAnnotation(Message annotationMessage, Message.Me
}
}
}

/**
* validates the given list of server names (for a specific operation) with the servers defined in the 'servers' part of
* the current AsyncApi.
*
* @param serversFromOperation the server names defined for the current operation
* @param operationId operationId of the current operation - used for exception messages
* @throws IllegalArgumentException if server from operation is not present in AsyncApi's servers definition.
*/
void validateServers(List<String> serversFromOperation, String operationId) {
if (!serversFromOperation.isEmpty()) {
Map<String, Server> asyncApiServers =
getAsyncApiDocketService().getAsyncApiDocket().getServers();
if (asyncApiServers == null || asyncApiServers.isEmpty()) {
throw new IllegalArgumentException(String.format(
"Operation '%s' defines server refs (%s) but there are no servers defined in this AsyncAPI.",
operationId, serversFromOperation));
}
for (String server : serversFromOperation) {
if (!asyncApiServers.containsKey(server)) {
throw new IllegalArgumentException(String.format(
"Operation '%s' defines unknown server ref '%s'. This AsyncApi defines these server(s): %s",
operationId, server, asyncApiServers.keySet()));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ protected List<OperationData> getOperationData() {
return new ArrayList<>(asyncApiDocketService.getAsyncApiDocket().getConsumers());
}

@Override
protected AsyncApiDocketService getAsyncApiDocketService() {
return this.asyncApiDocketService;
}

@Override
protected OperationData.OperationType getOperationType() {
return OperationData.OperationType.PUBLISH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ protected SchemasService getSchemaService() {
return this.schemasService;
}

@Override
protected AsyncApiDocketService getAsyncApiDocketService() {
return this.asyncApiDocketService;
}

@Override
protected List<OperationData> getOperationData() {
return new ArrayList<>(asyncApiDocketService.getAsyncApiDocket().getProducers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaderSchema;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders;
import org.springframework.lang.NonNull;
import org.springframework.util.StringUtils;
import org.springframework.util.StringValueResolver;

Expand Down Expand Up @@ -122,4 +123,17 @@ private static Message parseMessage(AsyncOperation asyncOperation) {

return messageBuilder.build();
}

/**
* extracts servers array from the given AsyncOperation, resolves placeholdes with spring variables and
* return a List of server names.
*
* @param op the given AsyncOperation
* @param resolver the StringValueResolver to resolve placeholders
* @return List of server names
*/
@NonNull
public static List<String> getServers(AsyncOperation op, StringValueResolver resolver) {
return Arrays.stream(op.servers()).map(resolver::resolveStringValue).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.github.stavshamir.springwolf.asyncapi.types.ConsumerData;
import io.github.stavshamir.springwolf.asyncapi.types.OperationData;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -30,6 +31,7 @@ public class AsyncListenerAnnotationScanner extends AbstractOperationDataScanner
private StringValueResolver resolver;
private final ComponentClassScanner componentClassScanner;
private final SchemasService schemasService;
private final AsyncApiDocketService asyncApiDocketService;

private final List<OperationBindingProcessor> operationBindingProcessors;

Expand All @@ -45,6 +47,11 @@ protected SchemasService getSchemaService() {
return this.schemasService;
}

@Override
protected AsyncApiDocketService getAsyncApiDocketService() {
return asyncApiDocketService;
}

@Override
protected List<OperationData> getOperationData() {
return componentClassScanner.scan().stream()
Expand Down Expand Up @@ -87,9 +94,11 @@ private ConsumerData toConsumerData(
Class<?> payloadType = op.payloadType() != Object.class
? op.payloadType()
: SpringPayloadAnnotationTypeExtractor.getPayloadType(method);

return ConsumerData.builder()
.channelName(resolver.resolveStringValue(op.channelName()))
.description(resolver.resolveStringValue(op.description()))
.servers(AsyncAnnotationScannerUtil.getServers(op, resolver))
.headers(AsyncAnnotationScannerUtil.getAsyncHeaders(op, resolver))
.payloadType(payloadType)
.operationBinding(operationBindings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@
*/
AsyncMessage message() default @AsyncMessage();

/**
* The servers on which this channel is available, list of names of Server Objects.
* If servers is empty then this channel is available on all servers defined in the Async API.
* Mapped to ({@link OperationData#getServers()}
* @return
*/
String[] servers() default {};

/**
* Mapped to {@link OperationData#getPayloadType()}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.github.stavshamir.springwolf.asyncapi.types.OperationData;
import io.github.stavshamir.springwolf.asyncapi.types.ProducerData;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -30,6 +31,7 @@ public class AsyncPublisherAnnotationScanner extends AbstractOperationDataScanne
private StringValueResolver resolver;
private final ComponentClassScanner componentClassScanner;
private final SchemasService schemasService;
private final AsyncApiDocketService asyncApiDocketService;

private final List<OperationBindingProcessor> operationBindingProcessors;

Expand All @@ -45,6 +47,11 @@ protected SchemasService getSchemaService() {
return this.schemasService;
}

@Override
protected AsyncApiDocketService getAsyncApiDocketService() {
return asyncApiDocketService;
}

@Override
protected List<OperationData> getOperationData() {
return componentClassScanner.scan().stream()
Expand Down Expand Up @@ -90,6 +97,7 @@ private ProducerData toConsumerData(
return ProducerData.builder()
.channelName(resolver.resolveStringValue(op.channelName()))
.description(resolver.resolveStringValue(op.description()))
.servers(AsyncAnnotationScannerUtil.getServers(op, resolver))
.headers(AsyncAnnotationScannerUtil.getAsyncHeaders(op, resolver))
.payloadType(payloadType)
.operationBinding(operationBindings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Singular;
import org.springframework.lang.Nullable;

import java.util.List;
import java.util.Map;

/**
Expand All @@ -33,6 +36,14 @@ public class ConsumerData implements OperationData {
*/
protected String description;

/**
* Optional, List of server names the channel is assigned to. If empty, the
* channel is available on all defined servers.
*/
@Nullable
@Singular("server")
protected List<String> servers;

/**
* The channel binding of the producer.
* <br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import com.asyncapi.v2.binding.operation.OperationBinding;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders;
import org.springframework.lang.Nullable;

import java.util.List;
import java.util.Map;

/**
Expand All @@ -23,6 +25,13 @@ public interface OperationData {
*/
String getDescription();

/**
* Optional, List of server names the channel is assigned to. If empty, the
* channel is available on all defined servers. May be null.
*/
@Nullable
List<String> getServers();

/**
* The channel binding.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Singular;
import org.springframework.lang.Nullable;

import java.util.List;
import java.util.Map;

/**
Expand All @@ -33,6 +36,14 @@ public class ProducerData implements OperationData {
*/
protected String description;

/**
* Optional, List of server names the channel is assigned to. If empty, the
* channel is available on all defined servers.
*/
@Nullable
@Singular("server")
protected List<String> servers;

/**
* The channel binding of the producer.
* <br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private AsyncAPI getAsyncAPITestObject() {

ChannelItem newUserChannel = ChannelItem.builder()
.description("This channel is used to exchange messages about users signing up")
.servers(List.of("production"))
.subscribe(newUserOperation)
.build();

Expand Down
Loading

0 comments on commit 754474f

Please sign in to comment.