From 048e3091bbdcab57f7fa01b886eea841bc3dee9c Mon Sep 17 00:00:00 2001 From: bmaidics Date: Thu, 30 May 2024 17:37:06 +0200 Subject: [PATCH 1/6] http-kafka asyncapi --- runtime/binding-asyncapi/pom.xml | 13 + .../config/AsyncapiBindingConfig.java | 3 +- .../AsyncapiConditionConfigAdapter.java | 15 +- .../config/AsyncapiHttpKafkaProxy.java | 243 ++++++++++++++++++ .../config/AsyncapiMqttKafkaProxy.java | 124 +++++++++ .../config/AsyncapiNamespaceGenerator.java | 3 +- .../internal/config/AsyncapiProxy.java | 47 ++++ .../AsyncapiProxyNamespaceGenerator.java | 138 ++++------ .../internal/model/AsyncapiComponents.java | 1 + .../internal/model/AsyncapiCorrelationId.java | 25 ++ .../asyncapi/internal/model/AsyncapiItem.java | 1 + .../internal/model/AsyncapiMessage.java | 1 + .../view/AsyncapiCorrelationIdView.java | 70 +++++ .../internal/view/AsyncapiMessageView.java | 6 + .../internal/stream/proxy/AsyncapiIT.java | 22 ++ .../asyncapi/config/proxy.http.kafka.yaml | 231 +++++++++++++++++ .../binding/asyncapi/config/server.http.yaml | 10 +- .../asyncapi/proxy.http.create.pet/client.rpt | 47 ++++ .../asyncapi/proxy.http.create.pet/server.rpt | 50 ++++ .../proxy.kafka.create.pet/client.rpt | 60 +++++ .../proxy.kafka.create.pet/server.rpt | 55 ++++ 21 files changed, 1078 insertions(+), 87 deletions(-) create mode 100644 runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java create mode 100644 runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiMqttKafkaProxy.java create mode 100644 runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxy.java create mode 100644 runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiCorrelationId.java create mode 100644 runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java create mode 100644 specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml create mode 100644 specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/client.rpt create mode 100644 specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/server.rpt create mode 100644 specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/client.rpt create mode 100644 specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/server.rpt diff --git a/runtime/binding-asyncapi/pom.xml b/runtime/binding-asyncapi/pom.xml index 0f8d8e23b1..207e34dd3a 100644 --- a/runtime/binding-asyncapi/pom.xml +++ b/runtime/binding-asyncapi/pom.xml @@ -79,6 +79,12 @@ ${project.version} provided + + io.aklivity.zilla + binding-http-kafka + ${project.version} + provided + io.aklivity.zilla binding-tls @@ -157,6 +163,13 @@ ${project.version} test + + io.aklivity.zilla + binding-http-kafka + test-jar + ${project.version} + test + org.openjdk.jmh jmh-core diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java index e8b130ec05..0db90ef60c 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java @@ -224,7 +224,8 @@ private void attachProxyBinding( Object2ObjectHashMap::new)); namespaceGenerator.init(binding); - final NamespaceConfig composite = namespaceGenerator.generateProxy(binding, asyncapis, schemaIdsByApiId::get); + final List labels = configs.stream().map(c -> c.apiLabel).collect(toList()); + final NamespaceConfig composite = namespaceGenerator.generateProxy(binding, asyncapis, schemaIdsByApiId::get, labels); composite.readURL = binding.readURL; attach.accept(composite); updateNamespace(configs, composite, new ArrayList<>(asyncapis.values())); diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiConditionConfigAdapter.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiConditionConfigAdapter.java index 8d659d7141..cc3e31656c 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiConditionConfigAdapter.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiConditionConfigAdapter.java @@ -42,8 +42,11 @@ public JsonObject adaptToJson( JsonObjectBuilder object = Json.createObjectBuilder(); object.add(API_ID_NAME, asyncapiCondition.apiId); - object.add(OPERATION_ID_NAME, asyncapiCondition.operationId); + if (asyncapiCondition.operationId != null) + { + object.add(OPERATION_ID_NAME, asyncapiCondition.operationId); + } return object.build(); } @@ -51,8 +54,14 @@ public JsonObject adaptToJson( public ConditionConfig adaptFromJson( JsonObject object) { - String apiId = object.getString(API_ID_NAME); - String operationId = object.getString(OPERATION_ID_NAME); + String apiId = object.containsKey(API_ID_NAME) + ? object.getString(API_ID_NAME) + : null; + + String operationId = object.containsKey(OPERATION_ID_NAME) + ? object.getString(OPERATION_ID_NAME) + : null; + return new AsyncapiConditionConfig(apiId, operationId); } } diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java new file mode 100644 index 0000000000..35dd61f7b5 --- /dev/null +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java @@ -0,0 +1,243 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.asyncapi.internal.config; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiChannel; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiOperation; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiReply; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiCorrelationIdView; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiMessageView; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiSchemaView; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaConditionConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfigBuilder; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfigBuilder; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchMergeConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceAsyncHeaderConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder; + +public class AsyncapiHttpKafkaProxy extends AsyncapiProxy +{ + private static final String CORRELATION_ID = "\\{correlationId\\}"; + private static final String PARAMETERS = "\\{(?!correlationId)(\\w+)\\}"; + private static final String HEADER_LOCATION = "\".*/([^/]+)$\""; + private static final String ASYNCAPI_KAFKA_PROTOCOL_NAME = "kafka"; + private static final String ASYNCAPI_HTTP_PROTOCOL_NAME = "http"; + private static final String ASYNCAPI_SEND_ACTION_NAME = "send"; + private static final String ASYNCAPI_RECEIVE_ACTION_NAME = "receive"; + private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\{([^}]+)\\}"); + private static final Pattern CORRELATION_PATTERN = Pattern.compile(CORRELATION_ID); + private static final Pattern HEADER_LOCATION_PATTERN = Pattern.compile(HEADER_LOCATION); + + private final Matcher correlation = CORRELATION_PATTERN.matcher(""); + private final Matcher parameters = PARAMETER_PATTERN.matcher(""); + private final Matcher headerLocation = HEADER_LOCATION_PATTERN.matcher(""); + + protected AsyncapiHttpKafkaProxy( + String qname, + Map asyncapis) + { + super("mqtt-kafka", qname, asyncapis); + } + + @Override + protected BindingConfigBuilder injectProxyRoutes( + BindingConfigBuilder binding, + List routes) + { + inject: + for (AsyncapiRouteConfig route : routes) + { + final RouteConfigBuilder> routeBuilder = binding.route(); + + final Asyncapi kafkaAsyncapi = asyncapis.get(route.with.apiId); + + final AsyncapiOperation withOperation = kafkaAsyncapi.operations.get(route.with.operationId); + + for (AsyncapiConditionConfig condition : route.when) + { + final Asyncapi httpAsyncapi = asyncapis.get(condition.apiId); + if (httpAsyncapi.servers.values().stream().anyMatch(s -> !s.protocol.startsWith(ASYNCAPI_HTTP_PROTOCOL_NAME))) + { + break inject; + } + final AsyncapiOperation whenOperation = httpAsyncapi.operations.get(condition.operationId); + final AsyncapiChannelView channel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel); + String path = channel.address(); + String method = whenOperation.bindings.get("http").method; + final List paramNames = findParams(path); + + routeBuilder + .exit(qname) + .when(HttpKafkaConditionConfig::builder) + .method(method) + .path(path) + .build() + .inject(r -> injectHttpKafkaRouteWith(r, httpAsyncapi, kafkaAsyncapi, whenOperation, + withOperation, paramNames)) + .build(); + } + binding = routeBuilder.build(); + } + return binding; + } + + @Override + public BindingConfigBuilder injectProxyOptions( + BindingConfigBuilder binding, + AsyncapiOptionsConfig options) + { + return binding; + } + + private List findParams( + String item) + { + List paramNames = new ArrayList<>(); + Matcher matcher = parameters.reset(item); + while (matcher.find()) + { + paramNames.add(parameters.group(1)); + } + return paramNames; + } + + private RouteConfigBuilder injectHttpKafkaRouteWith( + RouteConfigBuilder route, + Asyncapi httpAsyncapi, + Asyncapi kafkaAsyncapi, + AsyncapiOperation httpOperation, + AsyncapiOperation kafkaOperation, + List paramNames) + { + final HttpKafkaWithConfigBuilder newWith = HttpKafkaWithConfig.builder(); + final AsyncapiChannelView channel = AsyncapiChannelView + .of(kafkaAsyncapi.channels, kafkaOperation.channel); + final String topic = channel.address(); + + switch (kafkaOperation.action) + { + case "receive": + newWith.fetch(HttpKafkaWithFetchConfig.builder() + .topic(topic) + .inject(with -> injectHttpKafkaRouteFetchWith(with, httpAsyncapi, httpOperation, paramNames)) + .build()); + break; + case "send": + newWith.produce(HttpKafkaWithProduceConfig.builder() + .topic(topic) + .inject(w -> injectHttpKafkaRouteProduceWith(w, httpOperation, kafkaOperation, httpAsyncapi, + kafkaAsyncapi.channels, paramNames)) + .build()); + break; + } + + route.with(newWith.build()); + + return route; + } + + private HttpKafkaWithFetchConfigBuilder injectHttpKafkaRouteFetchWith( + HttpKafkaWithFetchConfigBuilder fetch, + Asyncapi httpAsyncapi, + AsyncapiOperation httpOperation, + List paramNames) + { + final AsyncapiChannelView channel = AsyncapiChannelView.of(httpAsyncapi.channels, httpOperation.channel); + merge: + for (Map.Entry message : channel.messages().entrySet()) + { + AsyncapiMessageView messageView = AsyncapiMessageView.of(httpAsyncapi.components.messages, message.getValue()); + AsyncapiSchemaView schema = AsyncapiSchemaView.of(httpAsyncapi.components.schemas, messageView.payload()); + + if (schema != null && "array".equals(schema.getType())) + { + fetch.merged(HttpKafkaWithFetchMergeConfig.builder() + .contentType("application/json") + .initial("[]") + .path("/-") + .build()); + break merge; + } + } + + if (!paramNames.isEmpty()) + { + fetch.filters(List.of(HttpKafkaWithFetchFilterConfig.builder() + .key(String.format("${params.%s}", paramNames.get(paramNames.size() - 1))) + .build())); + } + + return fetch; + } + + private HttpKafkaWithProduceConfigBuilder injectHttpKafkaRouteProduceWith( + HttpKafkaWithProduceConfigBuilder produce, + AsyncapiOperation httpOperation, + AsyncapiOperation kafkaOperation, + Asyncapi httpAsyncapi, + Map kafkaChannels, + List paramNames) + { + final String key = !paramNames.isEmpty() ? String.format("${params.%s}", + paramNames.get(paramNames.size() - 1)) : "${idempotencyKey}"; + + produce.acks("in_sync_replicas").key(key); + + AsyncapiChannelView httpChannel = AsyncapiChannelView.of(httpAsyncapi.channels, httpOperation.channel); + + httpChannel.messages().values() + .forEach(asyncapiMessage -> + { + AsyncapiMessageView message = AsyncapiMessageView.of(httpAsyncapi.components.messages, asyncapiMessage); + AsyncapiCorrelationIdView correlationId = + AsyncapiCorrelationIdView.of(httpAsyncapi.components.correlationIds, message.correlationId()); + String headerName = headerLocation.reset(correlationId.location()).group(1); + String location = asyncapiMessage.headers.properties.get(headerName).format; + location = location.replaceAll(CORRELATION_ID, "\\${correlationId}"); + location = location.replaceAll(PARAMETERS, "\\${params.$1}"); + produce.async(HttpKafkaWithProduceAsyncHeaderConfig.builder() + .name("location") + .value(location) + .build()); + }); + + AsyncapiReply reply = kafkaOperation.reply; + if (reply != null) + { + AsyncapiChannelView channel = AsyncapiChannelView.of(kafkaChannels, reply.channel); + produce.replyTo(channel.address()); + } + + produce.build(); + + return produce; + } +} diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiMqttKafkaProxy.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiMqttKafkaProxy.java new file mode 100644 index 0000000000..863a7c3bfc --- /dev/null +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiMqttKafkaProxy.java @@ -0,0 +1,124 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.asyncapi.internal.config; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiOperation; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaConditionConfig; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaConditionKind; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaOptionsConfig; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaWithConfig; +import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder; + +public class AsyncapiMqttKafkaProxy extends AsyncapiProxy +{ + private static final String ASYNCAPI_KAFKA_PROTOCOL_NAME = "kafka"; + private static final String ASYNCAPI_MQTT_PROTOCOL_NAME = "mqtt"; + private static final String ASYNCAPI_SEND_ACTION_NAME = "send"; + private static final String ASYNCAPI_RECEIVE_ACTION_NAME = "receive"; + + protected AsyncapiMqttKafkaProxy( + String qname, + Map asyncapis) + { + super("mqtt-kafka", qname, asyncapis); + } + + @Override + protected BindingConfigBuilder injectProxyRoutes( + BindingConfigBuilder binding, + List routes) + { + inject: + for (AsyncapiRouteConfig route : routes) + { + final RouteConfigBuilder> routeBuilder = binding.route(); + + final Asyncapi kafkaAsyncapi = asyncapis.get(route.with.apiId); + + final AsyncapiOperation withOperation = kafkaAsyncapi.operations.get(route.with.operationId); + final String messages = AsyncapiChannelView.of(kafkaAsyncapi.channels, withOperation.channel).address(); + + for (AsyncapiConditionConfig condition : route.when) + { + final Asyncapi mqttAsyncapi = asyncapis.get(condition.apiId); + if (mqttAsyncapi.servers.values().stream().anyMatch(s -> !s.protocol.startsWith(ASYNCAPI_MQTT_PROTOCOL_NAME))) + { + break inject; + } + final AsyncapiOperation whenOperation = mqttAsyncapi.operations.get(condition.operationId); + final AsyncapiChannelView channel = AsyncapiChannelView.of(mqttAsyncapi.channels, whenOperation.channel); + final MqttKafkaConditionKind kind = whenOperation.action.equals(ASYNCAPI_SEND_ACTION_NAME) ? + MqttKafkaConditionKind.PUBLISH : MqttKafkaConditionKind.SUBSCRIBE; + String topic = channel.address(); + + routeBuilder + .when(MqttKafkaConditionConfig::builder) + .topic(topic) + .kind(kind) + .build() + .with(MqttKafkaWithConfig::builder) + .messages(messages.replaceAll("\\{([^{}]*)\\}", "\\${params.$1}")) + .build() + .exit(qname); + } + binding = routeBuilder.build(); + } + return binding; + } + + @Override + public BindingConfigBuilder injectProxyOptions( + BindingConfigBuilder binding, + AsyncapiOptionsConfig options) + { + String sessions = ""; + String messages = ""; + String retained = ""; + for (Asyncapi asyncapi : asyncapis.values()) + { + if (asyncapi.channels.containsKey(options.mqttKafka.channels.sessions)) + { + sessions = asyncapi.channels.get(options.mqttKafka.channels.sessions).address; + } + + if (asyncapi.channels.containsKey(options.mqttKafka.channels.messages)) + { + messages = asyncapi.channels.get(options.mqttKafka.channels.messages).address; + } + + if (asyncapi.channels.containsKey(options.mqttKafka.channels.retained)) + { + retained = asyncapi.channels.get(options.mqttKafka.channels.retained).address; + } + } + return binding + .options(MqttKafkaOptionsConfig::builder) + .topics() + .sessions(sessions) + .messages(messages) + .retained(retained) + .build() + .clients(Collections.emptyList()) + .build(); + } +} diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiNamespaceGenerator.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiNamespaceGenerator.java index ddf75ede2c..25feb3cb34 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiNamespaceGenerator.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiNamespaceGenerator.java @@ -92,7 +92,8 @@ public NamespaceConfig generate( public NamespaceConfig generateProxy( BindingConfig binding, Map asyncapis, - ToLongFunction resolveApiId) + ToLongFunction resolveApiId, + List labels) { return null; } diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxy.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxy.java new file mode 100644 index 0000000000..728e25f05b --- /dev/null +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxy.java @@ -0,0 +1,47 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.asyncapi.internal.config; + +import java.util.List; +import java.util.Map; + +import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi; +import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; + +public abstract class AsyncapiProxy +{ + protected final String type; + protected final String qname; + protected final Map asyncapis; + + protected AsyncapiProxy( + String type, + String qname, + Map asyncapis) + { + this.type = type; + this.qname = qname; + this.asyncapis = asyncapis; + } + + protected abstract BindingConfigBuilder injectProxyRoutes( + BindingConfigBuilder binding, + List routes); + + public abstract BindingConfigBuilder injectProxyOptions( + BindingConfigBuilder binding, + AsyncapiOptionsConfig options); +} diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxyNamespaceGenerator.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxyNamespaceGenerator.java index 79cb502d77..9039313cdf 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxyNamespaceGenerator.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxyNamespaceGenerator.java @@ -17,37 +17,33 @@ import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY; import static java.util.Collections.emptyList; -import java.util.Collections; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.ToLongFunction; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig; import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi; -import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiOperation; -import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView; -import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaConditionConfig; -import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaConditionKind; -import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaOptionsConfig; -import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaWithConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfig; -import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; -import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; public class AsyncapiProxyNamespaceGenerator extends AsyncapiNamespaceGenerator { - private static final String ASYNCAPI_SEND_ACTION_NAME = "send"; - private static final String ASYNCAPI_RECEIVE_ACTION_NAME = "receive"; private static final String ASYNCAPI_KAFKA_PROTOCOL_NAME = "kafka"; private static final String ASYNCAPI_MQTT_PROTOCOL_NAME = "mqtt"; + private static final String ASYNCAPI_HTTP_PROTOCOL_NAME = "http"; public NamespaceConfig generateProxy( BindingConfig binding, Map asyncapis, - ToLongFunction resolveApiId) + ToLongFunction resolveApiId, + List labels) { AsyncapiOptionsConfig options = binding.options != null ? (AsyncapiOptionsConfig) binding.options : EMPTY_OPTION; List routes = binding.routes.stream() @@ -58,92 +54,72 @@ public NamespaceConfig generateProxy( final List metricRefs = binding.telemetryRef != null ? binding.telemetryRef.metricRefs : emptyList(); - String sessions = ""; - String messages = ""; - String retained = ""; - for (Asyncapi asyncapi : asyncapis.values()) - { - if (asyncapi.channels.containsKey(options.mqttKafka.channels.sessions)) - { - sessions = asyncapi.channels.get(options.mqttKafka.channels.sessions).address; - } - - if (asyncapi.channels.containsKey(options.mqttKafka.channels.messages)) - { - messages = asyncapi.channels.get(options.mqttKafka.channels.messages).address; - } + final Map> routesByProtocol = new HashMap<>(); - if (asyncapi.channels.containsKey(options.mqttKafka.channels.retained)) - { - retained = asyncapi.channels.get(options.mqttKafka.channels.retained).address; - } - } - - return NamespaceConfig.builder() - .name(String.format("%s/%s", qname, "mqtt-kafka")) - .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty())) - .binding() - .name("mqtt_kafka_proxy0") - .type("mqtt-kafka") - .kind(PROXY) - .inject(b -> this.injectMetrics(b, metricRefs, "mqtt-kafka")) - .options(MqttKafkaOptionsConfig::builder) - .topics() - .sessions(sessions) - .messages(messages) - .retained(retained) - .build() - .clients(Collections.emptyList()) - .build() - .inject(b -> this.injectMqttKafkaRoutes(b, routes)) - .build() - .build(); - } - - public BindingConfigBuilder injectMqttKafkaRoutes( - BindingConfigBuilder binding, - List routes) - { inject: for (AsyncapiRouteConfig route : routes) { - final RouteConfigBuilder> routeBuilder = binding.route(); - final Asyncapi kafkaAsyncapi = asyncapis.get(route.with.apiId); - if (kafkaAsyncapi.servers.values().stream().anyMatch(s -> !s.protocol.startsWith(ASYNCAPI_KAFKA_PROTOCOL_NAME))) { break inject; } - final AsyncapiOperation withOperation = kafkaAsyncapi.operations.get(route.with.operationId); - final String messages = AsyncapiChannelView.of(kafkaAsyncapi.channels, withOperation.channel).address(); - for (AsyncapiConditionConfig condition : route.when) { - final Asyncapi mqttAsyncapi = asyncapis.get(condition.apiId); - if (mqttAsyncapi.servers.values().stream().anyMatch(s -> !s.protocol.startsWith(ASYNCAPI_MQTT_PROTOCOL_NAME))) + final Asyncapi asyncapi = asyncapis.get(condition.apiId); + if (asyncapi.servers.values().stream().anyMatch(s -> + !s.protocol.startsWith(ASYNCAPI_MQTT_PROTOCOL_NAME) && + !s.protocol.startsWith(ASYNCAPI_HTTP_PROTOCOL_NAME))) { break inject; } - final AsyncapiOperation whenOperation = mqttAsyncapi.operations.get(condition.operationId); - final AsyncapiChannelView channel = AsyncapiChannelView.of(mqttAsyncapi.channels, whenOperation.channel); - final MqttKafkaConditionKind kind = whenOperation.action.equals(ASYNCAPI_SEND_ACTION_NAME) ? - MqttKafkaConditionKind.PUBLISH : MqttKafkaConditionKind.SUBSCRIBE; - String topic = channel.address(); + final String conditionProtocol = asyncapi.servers.values().stream().findFirst().get().protocol; + routesByProtocol.computeIfAbsent(conditionProtocol, c -> new ArrayList<>()).add(route); + } + } + + final String namespace = String.join("+", labels); + NamespaceConfigBuilder builder = NamespaceConfig.builder() + .name(String.format("%s/%s", qname, namespace)) + .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty())); + + routesByProtocol.forEach((k, r) -> + { + final AsyncapiProxy proxy = resolveProxy(k); + builder.binding() + .name(String.format("%s_proxy0", proxy.type)) + .type(proxy.type) + .kind(PROXY) + .inject(b -> this.injectMetrics(b, metricRefs, proxy.type)) + .inject(b -> proxy.injectProxyOptions(b, options)) + .inject(b -> proxy.injectProxyRoutes(b, r)) + .build(); + }); - routeBuilder - .when(MqttKafkaConditionConfig::builder) - .topic(topic) - .kind(kind) - .build() - .with(MqttKafkaWithConfig::builder) - .messages(messages.replaceAll("\\{([^{}]*)\\}", "\\${params.$1}")) - .build() - .exit(qname); + + return builder.build(); + } + + private AsyncapiProxy resolveProxy( + String protocol) + { + Pattern pattern = Pattern.compile("(http|mqtt)"); + Matcher matcher = pattern.matcher(protocol); + AsyncapiProxy proxy = null; + if (matcher.find()) + { + switch (matcher.group()) + { + case "http": + proxy = new AsyncapiHttpKafkaProxy(qname, asyncapis); + break; + case "mqtt": + proxy = new AsyncapiMqttKafkaProxy(qname, asyncapis); + break; } - binding = routeBuilder.build(); } - return binding; + return proxy; } + } diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiComponents.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiComponents.java index 5f751c1b67..185abee13e 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiComponents.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiComponents.java @@ -21,6 +21,7 @@ public class AsyncapiComponents public Map securitySchemes; public Map messages; public Map schemas; + public Map correlationIds; public Map messageTraits; public Map serverVariables; } diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiCorrelationId.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiCorrelationId.java new file mode 100644 index 0000000000..44569ad89d --- /dev/null +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiCorrelationId.java @@ -0,0 +1,25 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.asyncapi.internal.model; + +import jakarta.json.bind.annotation.JsonbProperty; + +public class AsyncapiCorrelationId +{ + public String location; + + @JsonbProperty("$ref") + public String ref; +} diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiItem.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiItem.java index aed859d19c..1184b669fe 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiItem.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiItem.java @@ -27,6 +27,7 @@ public class AsyncapiItem public String description; public Integer minimum; public Integer maximum; + public String format; @JsonbProperty("enum") public List values; @JsonbProperty("$ref") diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessage.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessage.java index 4a0f51d277..b002de02b7 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessage.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessage.java @@ -24,6 +24,7 @@ public class AsyncapiMessage public String contentType; public AsyncapiSchema payload; public List traits; + public AsyncapiCorrelationId correlationId; @JsonbProperty("$ref") public String ref; diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java new file mode 100644 index 0000000000..405ac84638 --- /dev/null +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java @@ -0,0 +1,70 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.asyncapi.internal.view; + +import java.util.List; +import java.util.Map; + +import jakarta.json.bind.annotation.JsonbPropertyOrder; + +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiCorrelationId; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiItem; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiSchema; +@JsonbPropertyOrder({ + "type", + "items", + "properties", + "required" +}) +public final class AsyncapiCorrelationIdView extends AsyncapiResolvable +{ + private final AsyncapiCorrelationId correlationId; + private final Map correlationIds; + + public String refKey() + { + return key; + } + public AsyncapiCorrelationId correlationId() + { + return correlationId; + } + + public String location() + { + return correlationId.location; + } + + public static AsyncapiCorrelationIdView of( + Map correlationIds, + AsyncapiCorrelationId correlationId) + { + return new AsyncapiCorrelationIdView(correlationIds, correlationId); + } + + private AsyncapiCorrelationIdView( + Map correlationIds, + AsyncapiCorrelationId correlationId) + { + super(correlationIds, "#/components/correlationIds/(\\w+)"); + if (correlationId.ref != null) + { + correlationId = resolveRef(correlationId.ref); + } + + this.correlationIds = correlationIds; + this.correlationId = correlationId; + } +} diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiMessageView.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiMessageView.java index 6ee6094e17..232ba13676 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiMessageView.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiMessageView.java @@ -17,6 +17,7 @@ import java.util.List; import java.util.Map; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiCorrelationId; import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage; import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiSchema; import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiTrait; @@ -49,6 +50,11 @@ public List traits() return message.traits; } + public AsyncapiCorrelationId correlationId() + { + return message.correlationId; + } + public static AsyncapiMessageView of( Map messages, AsyncapiMessage asyncapiMessage) diff --git a/runtime/binding-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/proxy/AsyncapiIT.java b/runtime/binding-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/proxy/AsyncapiIT.java index ce7ed7df1b..273837fe30 100644 --- a/runtime/binding-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/proxy/AsyncapiIT.java +++ b/runtime/binding-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/proxy/AsyncapiIT.java @@ -57,4 +57,26 @@ public void shouldPublish() throws Exception { k3po.finish(); } + + @Test + @Configuration("proxy.http.kafka.yaml") + @Specification({ + "${asyncapi}/proxy.mqtt.create.pet/client", + "${asyncapi}/proxy.kafka.create.pet/server" + }) + public void shouldCreatePet() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("proxy-async.yaml") + @Specification({ + "${asyncapi}/async.verify.customer/client", + "${asyncapi}/async.verify.customer/server" + }) + public void shouldVerifyCustomerAsync() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml new file mode 100644 index 0000000000..ac4d76e456 --- /dev/null +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml @@ -0,0 +1,231 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +catalogs: + catalog0: + type: test + options: + subject: petstore + schema: | + asyncapi: 3.0.0 + info: + title: AsyncAPI Petstore + license: + name: MIT + version: 1.0.0 + servers: + plain: + host: localhost:8080 + protocol: http + protocolVersion: '2.0' + defaultContentType: application/json + + channels: + pets: + address: /pets + messages: + pet: + $ref: '#/components/messages/pet' + showPetById: + address: /pets/{id} + messages: + pet: + $ref: '#/components/messages/pet' + + operations: + createPet: + action: send + bindings: + http: + method: POST + channel: + $ref: '#/channels/pets' + listPets: + action: receive + bindings: + http: + method: GET + channel: + $ref: '#/channels/pets' + getPets: + action: receive + bindings: + http: + method: GET + query: + type: object + properties: + limit: + type: number + channel: + $ref: '#/channels/showPetById' + + components: + schemas: + petPayload: + type: object + properties: + id: + type: integer + minimum: 0 + description: Pet id. + name: + type: string + description: Pet name. + tag: + type: string + description: Tag. + messages: + pet: + name: Pet + title: Pet + summary: >- + Inform about Pet. + contentType: application/json + payload: + $ref: '#/components/schemas/petPayload' + + catalog1: + type: test + options: + subject: sensor + schema: | + asyncapi: 3.0.0 + info: + title: Petstore Kafka API + version: 1.0.0 + defaultContentType: application/json + servers: + host-connections: + host: 'localhost:9092' + protocol: kafka-secure + description: Test broker + tags: + - name: 'kind:remote' + description: This server is a remote server. Not exposed by the application. + - name: 'visibility:private' + description: This resource is private and only available to certain users. + channels: + customers: + address: "petstore-customers" + messages: + customer: + $ref: "#/components/messages/customer" + empty: + $ref: "#/components/messages/empty" + description: The topic on which pet values may be produced and consumed. + verifiedCustomers: + address: "petstore-verified-customers" + messages: + customer: + $ref: "#/components/messages/customer" + empty: + $ref: "#/components/messages/empty" + description: The topic on which pet values may be produced and consumed. + operations: + createCustomer: + action: send + channel: + $ref: "#/channels/customers" + reply: + channel: + $ref: "#/channels/verifiedCustomers" + summary: >- + Add a pet. + messages: + - $ref: "#/channels/customers/messages/customer" + components: + correlationIds: + customerVerifyCorrelationId: + description: > + This correlation ID is used for message tracing and messages + correlation. This correlation ID is generated at runtime based on the + `VERIFY_ID` and sent to the RESPONSE message. + location: $message.header#/VERIFY_ID + messages: + empty: + name: EmptyMessage + payload: + type: "null" + customer: + name: Customer + title: Customer + summary: Information about a Customer. + contentType: application/json + payload: + $ref: "#/components/schemas/Customer" + schemas: + Customer: + type: object + properties: + id: + type: integer + format: int64 + example: 100000 + username: + type: string + example: fehguy + status: + type: string + description: Verification Status + example: approved + enum: + - pending + - approved + - denied + address: + type: array + items: + $ref: "#/components/schemas/Address" + Address: + type: object + properties: + street: + type: string + example: 437 Lytton + city: + type: string + example: Palo Alto + state: + type: string + example: CA + zip: + type: string + example: "94301" +bindings: + asyncapi_proxy0: + type: asyncapi + kind: proxy + options: + specs: + http_api: + catalog: + catalog0: + subject: petstore + version: latest + kafka_api: + catalog: + catalog1: + subject: petstore + version: latest + routes: + - when: + - api-id: http_api + exit: asyncapi_kafka0 + with: + api-id: kafka_api diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/server.http.yaml b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/server.http.yaml index 5931f33bf1..4cc7790852 100644 --- a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/server.http.yaml +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/server.http.yaml @@ -79,7 +79,7 @@ catalogs: components: correlationIds: petsCorrelationId: - location: '$message.header#/idempotency-key' + location: '$message.header#/CorrelId' schemas: petPayload: type: object @@ -101,6 +101,14 @@ catalogs: summary: >- Inform about Pet. contentType: application/json + correlationId: + $ref: '#/components/correlationIds/petsCorrelationId' + headers: + type: object + properties: + CorrelId: + type: string + format: /pets;cid={correlationId} payload: $ref: '#/components/schemas/petPayload' bindings: diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/client.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/client.rpt new file mode 100644 index 0000000000..815f900eef --- /dev/null +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/client.rpt @@ -0,0 +1,47 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/composite0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${asyncapi:beginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .operationId("createPets") + .extension(http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "POST") + .header(":scheme", "http") + .header(":path", "/pets") + .header(":authority", "localhost:8080") + .header("content-type", "application/json") + .header("content-length", "39") + .build()) + .build()} +connected + +write "{\"id\": 1, \"name\": \"Dog\", \"tag\": \"test\"}" +write close + +read zilla:begin.ext ${asyncapi:matchBeginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .operationId("createPets") + .extension(http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "204") + .build()) + .build()} diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/server.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/server.rpt new file mode 100644 index 0000000000..9f7c3be1fe --- /dev/null +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/server.rpt @@ -0,0 +1,50 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/composite0" + option zilla:window 8192 + option zilla:transmission "half-duplex" +accepted + +read zilla:begin.ext ${openapi:beginEx() + .typeId(zilla:id("openapi")) + .apiId(0) + .operationId("createPets") + .extension(http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "POST") + .header(":scheme", "http") + .header(":path", "/pets") + .header(":authority", "localhost:8080") + .header("content-type", "application/json") + .header("content-length", "39") + .build()) + .build()} + +connected + +read "{\"id\": 1, \"name\": \"Dog\", \"tag\": \"test\"}" +read closed + +write zilla:begin.ext ${openapi:beginEx() + .typeId(zilla:id("openapi")) + .apiId(0) + .operationId("createPets") + .extension(http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "204") + .build()) + .build()} +write flush diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/client.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/client.rpt new file mode 100644 index 0000000000..8725cfea43 --- /dev/null +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/client.rpt @@ -0,0 +1,60 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +connect "zilla://streams/asyncapi_client0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${asyncapi:beginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .extension(kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("petstore") + .partition(-1, -2) + .ackMode("IN_SYNC_REPLICAS") + .build() + .build()) + .build()} + +connected + +write option zilla:flags "init" +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .timestamp(newTimestamp) + .partition(-1, -1) + .build() + .build()} +write zilla:data.empty +write flush + +write option zilla:flags "none" +write "{\"id\": 1, \"name\": \"Dog\", \"tag\": \"test\"}" +write flush + +write option zilla:flags "fin" +write zilla:data.empty +write flush + +write close +read closed diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/server.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/server.rpt new file mode 100644 index 0000000000..b019ce8b61 --- /dev/null +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/server.rpt @@ -0,0 +1,55 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/asyncapi_client0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${asyncapi:matchBeginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .extension(kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("petstore") + .partition(-1, -2) + .ackMode("IN_SYNC_REPLICAS") + .build() + .build()) + .build()} + +connected + +read option zilla:flags "init" +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .partition(-1, -1) + .build() + .build()} +read zilla:data.empty + +read option zilla:flags "none" +read "{\"id\": 1, \"name\": \"Dog\", \"tag\": \"test\"}" + +read option zilla:flags "fin" +read zilla:data.empty + +read closed +write close From fb2e61f05480f6c25e92b1a7cd6738a9c98b16fe Mon Sep 17 00:00:00 2001 From: bmaidics Date: Fri, 31 May 2024 16:21:22 +0200 Subject: [PATCH 2/6] checkpoint --- .../config/AsyncapiBindingConfig.java | 3 +- .../config/AsyncapiHttpKafkaProxy.java | 115 ++++++-- .../view/AsyncapiCorrelationIdView.java | 4 +- .../internal/stream/proxy/AsyncapiIT.java | 8 +- .../config/proxy.http.kafka.async.yaml | 262 ++++++++++++++++++ .../asyncapi/config/proxy.http.kafka.yaml | 124 ++++----- .../client.rpt | 89 ++++++ .../server.rpt | 88 ++++++ .../asyncapi/proxy.http.create.pet/client.rpt | 18 +- .../asyncapi/proxy.http.create.pet/server.rpt | 17 +- .../client.rpt | 135 +++++++++ .../server.rpt | 122 ++++++++ .../proxy.kafka.create.pet/client.rpt | 17 +- .../proxy.kafka.create.pet/server.rpt | 17 +- 14 files changed, 889 insertions(+), 130 deletions(-) create mode 100644 specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.async.yaml create mode 100644 specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/client.rpt create mode 100644 specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/server.rpt create mode 100644 specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/client.rpt create mode 100644 specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/server.rpt diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java index 0db90ef60c..5151ab36c1 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java @@ -199,7 +199,8 @@ public void attach( NamespaceConfig v = entry.getValue(); List bindings = v.bindings.stream() .filter(b -> b.type.equals("mqtt") || b.type.equals("http") || - b.type.equals("kafka") && b.kind == CACHE_CLIENT || b.type.equals("mqtt-kafka")) + b.type.equals("kafka") && b.kind == CACHE_CLIENT || b.type.equals("mqtt-kafka") || + b.type.equals("http-kafka")) .collect(toList()); extractResolveId(k, bindings); extractNamespace(k, bindings); diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java index 35dd61f7b5..39efec5d11 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java @@ -47,7 +47,7 @@ public class AsyncapiHttpKafkaProxy extends AsyncapiProxy { private static final String CORRELATION_ID = "\\{correlationId\\}"; private static final String PARAMETERS = "\\{(?!correlationId)(\\w+)\\}"; - private static final String HEADER_LOCATION = "\".*/([^/]+)$\""; + private static final String HEADER_LOCATION = "([^/]+)$"; private static final String ASYNCAPI_KAFKA_PROTOCOL_NAME = "kafka"; private static final String ASYNCAPI_HTTP_PROTOCOL_NAME = "http"; private static final String ASYNCAPI_SEND_ACTION_NAME = "send"; @@ -64,7 +64,7 @@ protected AsyncapiHttpKafkaProxy( String qname, Map asyncapis) { - super("mqtt-kafka", qname, asyncapis); + super("http-kafka", qname, asyncapis); } @Override @@ -75,8 +75,6 @@ protected BindingConfigBuilder injectProxyRoutes( inject: for (AsyncapiRouteConfig route : routes) { - final RouteConfigBuilder> routeBuilder = binding.route(); - final Asyncapi kafkaAsyncapi = asyncapis.get(route.with.apiId); final AsyncapiOperation withOperation = kafkaAsyncapi.operations.get(route.with.operationId); @@ -94,21 +92,70 @@ protected BindingConfigBuilder injectProxyRoutes( String method = whenOperation.bindings.get("http").method; final List paramNames = findParams(path); + AsyncapiChannelView httpChannel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel); + + boolean async = httpChannel.messages().values() + .stream().anyMatch(asyncapiMessage -> + { + AsyncapiMessageView message = AsyncapiMessageView.of(httpAsyncapi.components.messages, asyncapiMessage); + return message.correlationId() != null; + }); + + if (async) + { + for (AsyncapiOperation operation : httpAsyncapi.operations.values()) + { + AsyncapiChannelView channelView = AsyncapiChannelView.of(httpAsyncapi.channels, operation.channel); + if (parameters.reset(channelView.address()).find()) + { + AsyncapiReply reply = withOperation.reply; + if (reply != null) + { + final RouteConfigBuilder> asyncRouteBuilder = binding.route(); + binding = addAsyncOperation(asyncRouteBuilder, httpAsyncapi, kafkaAsyncapi, operation, + withOperation); + } + } + } + } + + final RouteConfigBuilder> routeBuilder = binding.route(); routeBuilder .exit(qname) .when(HttpKafkaConditionConfig::builder) - .method(method) - .path(path) - .build() + .method(method) + .path(path) + .build() .inject(r -> injectHttpKafkaRouteWith(r, httpAsyncapi, kafkaAsyncapi, whenOperation, - withOperation, paramNames)) - .build(); + withOperation, paramNames)); + binding = routeBuilder.build(); } - binding = routeBuilder.build(); } return binding; } + private BindingConfigBuilder addAsyncOperation( + RouteConfigBuilder> routeBuilder, + Asyncapi httpAsyncapi, + Asyncapi kafkaAsyncapi, + AsyncapiOperation httpOperation, + AsyncapiOperation kafkaOperation) + { + final AsyncapiChannelView channel = AsyncapiChannelView.of(httpAsyncapi.channels, httpOperation.channel); + String path = channel.address(); + String method = httpOperation.bindings.get("http").method; + final List paramNames = findParams(path); + return routeBuilder + .exit(qname) + .when(HttpKafkaConditionConfig::builder) + .method(method) + .path(path) + .build() + .inject(r -> injectAsyncProduceHttpKafkaRouteWith(r, httpAsyncapi, kafkaAsyncapi, httpOperation, + kafkaOperation, paramNames)) + .build(); + } + @Override public BindingConfigBuilder injectProxyOptions( BindingConfigBuilder binding, @@ -164,6 +211,28 @@ private RouteConfigBuilder injectHttpKafkaRouteWith( return route; } + private RouteConfigBuilder injectAsyncProduceHttpKafkaRouteWith( + RouteConfigBuilder route, + Asyncapi httpAsyncapi, + Asyncapi kafkaAsyncapi, + AsyncapiOperation httpOperation, + AsyncapiOperation kafkaOperation, + List paramNames) + { + final HttpKafkaWithConfigBuilder newWith = HttpKafkaWithConfig.builder(); + final AsyncapiChannelView channel = AsyncapiChannelView.of(kafkaAsyncapi.channels, kafkaOperation.channel); + final String topic = channel.address(); + + newWith.produce(HttpKafkaWithProduceConfig.builder() + .topic(topic) + .inject(w -> injectHttpKafkaRouteProduceWith(w, httpOperation, kafkaOperation, httpAsyncapi, + kafkaAsyncapi.channels, paramNames)) + .build()); + route.with(newWith.build()); + + return route; + } + private HttpKafkaWithFetchConfigBuilder injectHttpKafkaRouteFetchWith( HttpKafkaWithFetchConfigBuilder fetch, Asyncapi httpAsyncapi, @@ -217,16 +286,22 @@ private HttpKafkaWithProduceConfigBuilder injectHttpKafkaRouteProduceWith .forEach(asyncapiMessage -> { AsyncapiMessageView message = AsyncapiMessageView.of(httpAsyncapi.components.messages, asyncapiMessage); - AsyncapiCorrelationIdView correlationId = - AsyncapiCorrelationIdView.of(httpAsyncapi.components.correlationIds, message.correlationId()); - String headerName = headerLocation.reset(correlationId.location()).group(1); - String location = asyncapiMessage.headers.properties.get(headerName).format; - location = location.replaceAll(CORRELATION_ID, "\\${correlationId}"); - location = location.replaceAll(PARAMETERS, "\\${params.$1}"); - produce.async(HttpKafkaWithProduceAsyncHeaderConfig.builder() - .name("location") - .value(location) - .build()); + if (message.correlationId() != null) + { + AsyncapiCorrelationIdView correlationId = + AsyncapiCorrelationIdView.of(httpAsyncapi.components.correlationIds, message.correlationId()); + if (headerLocation.reset(correlationId.location()).find()) + { + String headerName = headerLocation.group(1); + String location = message.headers().properties.get(headerName).format; + location = location.replaceAll(CORRELATION_ID, "\\${correlationId}"); + location = location.replaceAll(PARAMETERS, "\\${params.$1}"); + produce.async(HttpKafkaWithProduceAsyncHeaderConfig.builder() + .name("location") + .value(location) + .build()); + } + } }); AsyncapiReply reply = kafkaOperation.reply; diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java index 405ac84638..d74d06482f 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java @@ -14,14 +14,12 @@ */ package io.aklivity.zilla.runtime.binding.asyncapi.internal.view; -import java.util.List; import java.util.Map; import jakarta.json.bind.annotation.JsonbPropertyOrder; import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiCorrelationId; -import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiItem; -import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiSchema; + @JsonbPropertyOrder({ "type", "items", diff --git a/runtime/binding-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/proxy/AsyncapiIT.java b/runtime/binding-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/proxy/AsyncapiIT.java index 273837fe30..3df61527da 100644 --- a/runtime/binding-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/proxy/AsyncapiIT.java +++ b/runtime/binding-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/proxy/AsyncapiIT.java @@ -61,7 +61,7 @@ public void shouldPublish() throws Exception @Test @Configuration("proxy.http.kafka.yaml") @Specification({ - "${asyncapi}/proxy.mqtt.create.pet/client", + "${asyncapi}/proxy.http.create.pet/client", "${asyncapi}/proxy.kafka.create.pet/server" }) public void shouldCreatePet() throws Exception @@ -70,10 +70,10 @@ public void shouldCreatePet() throws Exception } @Test - @Configuration("proxy-async.yaml") + @Configuration("proxy.http.kafka.async.yaml") @Specification({ - "${asyncapi}/async.verify.customer/client", - "${asyncapi}/async.verify.customer/server" + "${asyncapi}/proxy.http.async.verify.customer/client", + "${asyncapi}/proxy.kafka.async.verify.customer/server" }) public void shouldVerifyCustomerAsync() throws Exception { diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.async.yaml b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.async.yaml new file mode 100644 index 0000000000..889a2995a7 --- /dev/null +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.async.yaml @@ -0,0 +1,262 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +catalogs: + catalog0: + type: test + options: + subject: petstore + schema: | + asyncapi: 3.0.0 + info: + title: AsyncAPI Petstore + license: + name: MIT + version: 1.0.0 + servers: + plain: + host: localhost:8080 + protocol: http + protocolVersion: '2.0' + defaultContentType: application/json + + channels: + customer: + address: /customer + messages: + customer: + $ref: '#/components/messages/customer' + showCustomerById: + address: /customer;cid={correlationId} + messages: + customer: + $ref: '#/components/messages/customer' + + operations: + createCustomer: + action: send + bindings: + http: + method: POST + channel: + $ref: '#/channels/customer' + getVerifiedCustomer: + action: receive + bindings: + http: + method: GET + query: + type: object + properties: + limit: + type: number + channel: + $ref: '#/channels/showCustomerById' + + components: + correlationIds: + customerCorrelationId: + location: '$message.header#/CorrelId' + schemas: + Customer: + type: object + properties: + id: + type: integer + format: int64 + example: 100000 + username: + type: string + example: fehguy + status: + type: string + description: Verification Status + example: approved + enum: + - pending + - approved + - denied + address: + type: array + items: + $ref: "#/components/schemas/Address" + Address: + type: object + properties: + street: + type: string + example: 437 Lytton + city: + type: string + example: Palo Alto + state: + type: string + example: CA + zip: + type: string + example: "94301" + + messages: + customer: + name: Customer + title: Customer + summary: Information about a Customer. + contentType: application/json + correlationId: + $ref: '#/components/correlationIds/customerCorrelationId' + headers: + type: object + properties: + CorrelId: + type: string + format: /customer;cid={correlationId} + payload: + $ref: "#/components/schemas/Customer" + + catalog1: + type: test + options: + subject: sensor + schema: | + asyncapi: 3.0.0 + info: + title: Petstore Kafka API + version: 1.0.0 + defaultContentType: application/json + servers: + host-connections: + host: 'localhost:9092' + protocol: kafka-secure + description: Test broker + tags: + - name: 'kind:remote' + description: This server is a remote server. Not exposed by the application. + - name: 'visibility:private' + description: This resource is private and only available to certain users. + channels: + customers: + address: "petstore-customers" + messages: + customer: + $ref: "#/components/messages/customer" + empty: + $ref: "#/components/messages/empty" + description: The topic on which pet values may be produced and consumed. + verifiedCustomers: + address: "petstore-verified-customers" + messages: + customer: + $ref: "#/components/messages/customer" + empty: + $ref: "#/components/messages/empty" + description: The topic on which pet values may be produced and consumed. + operations: + createCustomer: + action: send + channel: + $ref: "#/channels/customers" + reply: + channel: + $ref: "#/channels/verifiedCustomers" + summary: >- + Add a pet. + messages: + - $ref: "#/channels/customers/messages/customer" + components: + correlationIds: + customerVerifyCorrelationId: + description: > + This correlation ID is used for message tracing and messages + correlation. This correlation ID is generated at runtime based on the + `VERIFY_ID` and sent to the RESPONSE message. + location: $message.header#/VERIFY_ID + messages: + empty: + name: EmptyMessage + payload: + type: "null" + customer: + name: Customer + title: Customer + summary: Information about a Customer. + contentType: application/json + payload: + $ref: "#/components/schemas/Customer" + schemas: + Customer: + type: object + properties: + id: + type: integer + format: int64 + example: 100000 + username: + type: string + example: fehguy + status: + type: string + description: Verification Status + example: approved + enum: + - pending + - approved + - denied + address: + type: array + items: + $ref: "#/components/schemas/Address" + Address: + type: object + properties: + street: + type: string + example: 437 Lytton + city: + type: string + example: Palo Alto + state: + type: string + example: CA + zip: + type: string + example: "94301" + +bindings: + asyncapi_proxy0: + type: asyncapi + kind: proxy + options: + specs: + http_api: + catalog: + catalog0: + subject: petstore + version: latest + kafka_api: + catalog: + catalog1: + subject: petstore + version: latest + routes: + - when: + - api-id: http_api + operation-id: createCustomer + exit: asyncapi_kafka0 + with: + api-id: kafka_api + operation-id: createCustomer diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml index ac4d76e456..954aeec6d2 100644 --- a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml @@ -48,7 +48,7 @@ catalogs: $ref: '#/components/messages/pet' operations: - createPet: + createPets: action: send bindings: http: @@ -121,92 +121,76 @@ catalogs: - name: 'visibility:private' description: This resource is private and only available to certain users. channels: - customers: - address: "petstore-customers" + petstore: + address: 'petstore' messages: - customer: - $ref: "#/components/messages/customer" - empty: - $ref: "#/components/messages/empty" - description: The topic on which pet values may be produced and consumed. - verifiedCustomers: - address: "petstore-verified-customers" - messages: - customer: - $ref: "#/components/messages/customer" - empty: - $ref: "#/components/messages/empty" + pet: + $ref: '#/components/messages/pet' description: The topic on which pet values may be produced and consumed. operations: - createCustomer: + listPets: + action: receive + channel: + $ref: '#/channels/petstore' + summary: >- + List all pets. + traits: + - $ref: '#/components/operationTraits/kafka' + messages: + - $ref: '#/channels/petstore/messages/pet' + createPets: action: send channel: - $ref: "#/channels/customers" - reply: - channel: - $ref: "#/channels/verifiedCustomers" + $ref: '#/channels/petstore' summary: >- - Add a pet. + Create a pet. + traits: + - $ref: '#/components/operationTraits/kafka' messages: - - $ref: "#/channels/customers/messages/customer" + - $ref: '#/channels/petstore/messages/pet' components: - correlationIds: - customerVerifyCorrelationId: - description: > - This correlation ID is used for message tracing and messages - correlation. This correlation ID is generated at runtime based on the - `VERIFY_ID` and sent to the RESPONSE message. - location: $message.header#/VERIFY_ID messages: - empty: - name: EmptyMessage - payload: - type: "null" - customer: - name: Customer - title: Customer - summary: Information about a Customer. + pet: + name: Pet + title: Pet + summary: >- + Inform about Pet. contentType: application/json + traits: + - $ref: '#/components/messageTraits/commonHeaders' payload: - $ref: "#/components/schemas/Customer" + $ref: '#/components/schemas/petPayload' schemas: - Customer: + petPayload: type: object properties: id: type: integer - format: int64 - example: 100000 - username: - type: string - example: fehguy - status: - type: string - description: Verification Status - example: approved - enum: - - pending - - approved - - denied - address: - type: array - items: - $ref: "#/components/schemas/Address" - Address: - type: object - properties: - street: - type: string - example: 437 Lytton - city: - type: string - example: Palo Alto - state: + minimum: 0 + description: Pet id. + name: type: string - example: CA - zip: + description: Pet name. + tag: type: string - example: "94301" + description: Tag. + messageTraits: + commonHeaders: + headers: + type: object + properties: + my-app-header: + type: integer + minimum: 0 + maximum: 100 + operationTraits: + kafka: + bindings: + kafka: + clientId: + type: string + enum: + - my-app-id bindings: asyncapi_proxy0: type: asyncapi @@ -226,6 +210,8 @@ bindings: routes: - when: - api-id: http_api + operation-id: createPets exit: asyncapi_kafka0 with: api-id: kafka_api + operation-id: createPets diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/client.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/client.rpt new file mode 100644 index 0000000000..10c47464e7 --- /dev/null +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/client.rpt @@ -0,0 +1,89 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/asyncapi_proxy0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${asyncapi:beginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .operationId("createCustomer") + .extension(http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "POST") + .header(":scheme", "http") + .header(":path", "/customer") + .header(":authority", "localhost:8080") + .header("content-type", "application/json") + .header("content-length", "155") + .header("prefer", "respond-async") + .header("idempotency-key", "3f96592e-c8f1-4167-8c46-85f2aabb70a5") + .build()) + .build()} +connected + +write "{ \"id\": 100000, \"username\": \"fehguy\", \"status\": \"approved\", \"address\": [ { \"street\": \"437 Lytton\", \"city\": \"Palo Alto\", \"state\": \"CA\", \"zip\": \"94301\" } ] }" +write close + +read zilla:begin.ext ${asyncapi:matchBeginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .extension(http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "202") + .header("content-length", "0") + .header("location", "/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .build()) + .build()} + +read closed + +read notify RECEIVED_ASYNC_RESPONSE + +connect await RECEIVED_ASYNC_RESPONSE + "zilla://streams/asyncapi_proxy0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + + +write zilla:begin.ext ${asyncapi:beginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .operationId("getVerifiedCustomer") + .extension(http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "GET") + .header(":scheme", "http") + .header(":authority", "localhost:8080") + .header(":path", "/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .header("prefer", "respond-async") + .build()) + .build()} +connected + +write close + +read zilla:begin.ext ${asyncapi:matchBeginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .extension(http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "204") + .build()) + .build()} + +read closed diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/server.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/server.rpt new file mode 100644 index 0000000000..b08597d817 --- /dev/null +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/server.rpt @@ -0,0 +1,88 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/asyncapi_proxy0" + option zilla:window 8192 + option zilla:transmission "half-duplex" +accepted + +read zilla:begin.ext ${asyncapi:beginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .operationId("createCustomer") + .extension(http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "POST") + .header(":scheme", "http") + .header(":path", "/customer") + .header(":authority", "localhost:8080") + .header("content-type", "application/json") + .header("content-length", "155") + .header("prefer", "respond-async") + .header("idempotency-key", "3f96592e-c8f1-4167-8c46-85f2aabb70a5") + .build()) + .build()} +connected + +read "{ \"id\": 100000, \"username\": \"fehguy\", \"status\": \"approved\", \"address\": [ { \"street\": \"437 Lytton\", \"city\": \"Palo Alto\", \"state\": \"CA\", \"zip\": \"94301\" } ] }" +read closed + +write zilla:begin.ext ${asyncapi:beginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .operationId("createCustomer") + .extension(http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "202") + .header("content-length", "0") + .header("Location", "/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .build()) + .build()} +write flush + +write close + +accepted + +read zilla:begin.ext ${asyncapi:matchBeginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .operationId("getVerifiedCustomer") + .extension(http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "GET") + .header(":scheme", "http") + .header(":authority", "localhost:8080") + .header(":path", "/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .header("prefer", "respond-async") + .build()) + .build()} +connected + +read closed + +write zilla:begin.ext ${asyncapi:beginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .operationId("getVerifiedCustomer") + .extension(http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "204") + .build()) + .build()} + +write flush +write close diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/client.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/client.rpt index 815f900eef..b55b624bb8 100644 --- a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/client.rpt +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/client.rpt @@ -1,19 +1,20 @@ # -# Copyright 2021-2023 Aklivity Inc +# Copyright 2021-2023 Aklivity Inc. # -# Licensed under the Aklivity Community License (the "License"); you may not use -# this file except in compliance with the License. You may obtain a copy of the -# License at +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: # -# https://www.aklivity.io/aklivity-community-license/ +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. # -connect "zilla://streams/composite0" +connect "zilla://streams/asyncapi_proxy0" option zilla:window 8192 option zilla:transmission "half-duplex" @@ -39,7 +40,6 @@ write close read zilla:begin.ext ${asyncapi:matchBeginEx() .typeId(zilla:id("asyncapi")) .apiId(0) - .operationId("createPets") .extension(http:beginEx() .typeId(zilla:id("http")) .header(":status", "204") diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/server.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/server.rpt index 9f7c3be1fe..e13782ead4 100644 --- a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/server.rpt +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/server.rpt @@ -1,19 +1,20 @@ # -# Copyright 2021-2023 Aklivity Inc +# Copyright 2021-2023 Aklivity Inc. # -# Licensed under the Aklivity Community License (the "License"); you may not use -# this file except in compliance with the License. You may obtain a copy of the -# License at +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: # -# https://www.aklivity.io/aklivity-community-license/ +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. # -accept "zilla://streams/composite0" +accept "zilla://streams/asyncapi_proxy0" option zilla:window 8192 option zilla:transmission "half-duplex" accepted diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/client.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/client.rpt new file mode 100644 index 0000000000..2810462ef7 --- /dev/null +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/client.rpt @@ -0,0 +1,135 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +connect "zilla://streams/asyncapi_kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${asyncapi:beginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .extension(kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("petstore-customers") + .partition(-1, -2) + .ackMode("IN_SYNC_REPLICAS") + .build() + .build()) + .build()} + +connected + +write option zilla:flags "init" +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .timestamp(newTimestamp) + .partition(-1, -1) + .build() + .build()} +write zilla:data.empty +write flush + +write option zilla:flags "none" +write "{ \"id\": 100000, \"username\": \"fehguy\", \"status\": \"approved\", \"address\": [ { \"street\": \"437 Lytton\", \"city\": \"Palo Alto\", \"state\": \"CA\", \"zip\": \"94301\" } ] }" +write flush + +write notify SEND_ASYNC_REQUEST + +write option zilla:flags "fin" +write zilla:data.empty +write flush + +write close +read closed + +write notify SENT_ASYNC_REQUEST + +connect await SENT_ASYNC_REQUEST + "zilla://streams/asyncapi_kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${asyncapi:beginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .extension(kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("petstore-verified-customers") + .partition(-1, -2) + .filter() + .header("zilla:correlation-id", "3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .build() + .build() + .build()) + .build()} + +connected + +read advised zilla:flush + +read notify RECEIVED_ASYNC_FLUSH + +read closed +write close + +connect await RECEIVED_ASYNC_FLUSH + "zilla://streams/asyncapi_kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${asyncapi:beginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .extension(kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("petstore-verified-customers") + .partition(-1, -2) + .filter() + .header("zilla:correlation-id", "3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .build() + .build() + .build()) + .build()} + +connected + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .partition(0, 1, 2) + .progress(0, 2) + .progress(1, 1) + .key("92d0bf92-63e0-4cfc-ae73-71dee92d1544") + .header(":status", "204") + .header("zilla:correlation-id", "3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .build() + .build()} +read zilla:data.null + +read closed +write close diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/server.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/server.rpt new file mode 100644 index 0000000000..bb40b517e9 --- /dev/null +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/server.rpt @@ -0,0 +1,122 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/asyncapi_kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${asyncapi:matchBeginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .extension(kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("petstore-customers") + .partition(-1, -2) + .ackMode("IN_SYNC_REPLICAS") + .build() + .build()) + .build()} + +connected + +read option zilla:flags "init" +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .partition(-1, -1) + .build() + .build()} +read zilla:data.empty + +read option zilla:flags "none" +read "{ \"id\": 100000, \"username\": \"fehguy\", \"status\": \"approved\", \"address\": [ { \"street\": \"437 Lytton\", \"city\": \"Palo Alto\", \"state\": \"CA\", \"zip\": \"94301\" } ] }" + +read await SEND_ASYNC_REQUEST + +read option zilla:flags "fin" +read zilla:data.empty + +read closed +write close + +accepted + +read zilla:begin.ext ${asyncapi:matchBeginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .extension(kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("petstore-verified-customers") + .partition(-1, -2) + .filter() + .header("zilla:correlation-id", "3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .build() + .build() + .build()) + .build()} + +connected + +write advise zilla:flush + +write notify SEND_ASYNC_REQUEST + +write close +read closed + +accepted + +read zilla:begin.ext ${asyncapi:matchBeginEx() + .typeId(zilla:id("asyncapi")) + .apiId(0) + .extension(kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("petstore-verified-customers") + .partition(-1, -2) + .filter() + .header("zilla:correlation-id", "3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .build() + .build() + .build()) + .build()} + +connected + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .partition(0, 1, 2) + .progress(0, 2) + .progress(1, 1) + .key("92d0bf92-63e0-4cfc-ae73-71dee92d1544") + .header(":status", "204") + .header("zilla:correlation-id", "3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .build() + .build()} +write flush + +write close +read closed diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/client.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/client.rpt index 8725cfea43..c4f0de4bd3 100644 --- a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/client.rpt +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/client.rpt @@ -1,22 +1,23 @@ # -# Copyright 2021-2023 Aklivity Inc +# Copyright 2021-2023 Aklivity Inc. # -# Licensed under the Aklivity Community License (the "License"); you may not use -# this file except in compliance with the License. You may obtain a copy of the -# License at +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: # -# https://www.aklivity.io/aklivity-community-license/ +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. # property deltaMillis 0L property newTimestamp ${kafka:timestamp() + deltaMillis} -connect "zilla://streams/asyncapi_client0" +connect "zilla://streams/asyncapi_kafka0" option zilla:window 8192 option zilla:transmission "duplex" diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/server.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/server.rpt index b019ce8b61..69f3faf594 100644 --- a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/server.rpt +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/server.rpt @@ -1,19 +1,20 @@ # -# Copyright 2021-2023 Aklivity Inc +# Copyright 2021-2023 Aklivity Inc. # -# Licensed under the Aklivity Community License (the "License"); you may not use -# this file except in compliance with the License. You may obtain a copy of the -# License at +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: # -# https://www.aklivity.io/aklivity-community-license/ +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. # -accept "zilla://streams/asyncapi_client0" +accept "zilla://streams/asyncapi_kafka0" option zilla:window 8192 option zilla:transmission "duplex" From a261d20dfbe71b7e3eb56967b050b7fc9be62b41 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Mon, 3 Jun 2024 13:07:51 +0200 Subject: [PATCH 3/6] Add http-kafka module --- runtime/binding-asyncapi/src/main/moditect/module-info.java | 1 + 1 file changed, 1 insertion(+) diff --git a/runtime/binding-asyncapi/src/main/moditect/module-info.java b/runtime/binding-asyncapi/src/main/moditect/module-info.java index 48fdc65612..142d6a324f 100644 --- a/runtime/binding-asyncapi/src/main/moditect/module-info.java +++ b/runtime/binding-asyncapi/src/main/moditect/module-info.java @@ -21,6 +21,7 @@ requires io.aklivity.zilla.runtime.binding.http; requires io.aklivity.zilla.runtime.binding.kafka; requires io.aklivity.zilla.runtime.binding.mqtt.kafka; + requires io.aklivity.zilla.runtime.binding.http.kafka; requires io.aklivity.zilla.runtime.binding.tcp; requires io.aklivity.zilla.runtime.binding.tls; requires io.aklivity.zilla.runtime.catalog.inline; From ca4a591828adb8bf1b82592fd4625c554b339504 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Mon, 3 Jun 2024 13:15:49 +0200 Subject: [PATCH 4/6] Remove unused fields --- .../asyncapi/internal/config/AsyncapiHttpKafkaProxy.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java index 39efec5d11..31b31b3a4e 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java @@ -53,10 +53,8 @@ public class AsyncapiHttpKafkaProxy extends AsyncapiProxy private static final String ASYNCAPI_SEND_ACTION_NAME = "send"; private static final String ASYNCAPI_RECEIVE_ACTION_NAME = "receive"; private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\{([^}]+)\\}"); - private static final Pattern CORRELATION_PATTERN = Pattern.compile(CORRELATION_ID); private static final Pattern HEADER_LOCATION_PATTERN = Pattern.compile(HEADER_LOCATION); - private final Matcher correlation = CORRELATION_PATTERN.matcher(""); private final Matcher parameters = PARAMETER_PATTERN.matcher(""); private final Matcher headerLocation = HEADER_LOCATION_PATTERN.matcher(""); From c62fe9a677dac05b2f105f64a534d1ba3b4e8764 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Mon, 3 Jun 2024 16:46:49 +0200 Subject: [PATCH 5/6] Add operations if they've the same name in both API --- .../config/AsyncapiHttpKafkaProxy.java | 101 +++++++++++------- .../asyncapi/config/proxy.http.kafka.yaml | 2 - 2 files changed, 64 insertions(+), 39 deletions(-) diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java index 31b31b3a4e..63c83bdad8 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java @@ -75,8 +75,6 @@ protected BindingConfigBuilder injectProxyRoutes( { final Asyncapi kafkaAsyncapi = asyncapis.get(route.with.apiId); - final AsyncapiOperation withOperation = kafkaAsyncapi.operations.get(route.with.operationId); - for (AsyncapiConditionConfig condition : route.when) { final Asyncapi httpAsyncapi = asyncapis.get(condition.apiId); @@ -85,50 +83,79 @@ protected BindingConfigBuilder injectProxyRoutes( break inject; } final AsyncapiOperation whenOperation = httpAsyncapi.operations.get(condition.operationId); - final AsyncapiChannelView channel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel); - String path = channel.address(); - String method = whenOperation.bindings.get("http").method; - final List paramNames = findParams(path); - - AsyncapiChannelView httpChannel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel); - - boolean async = httpChannel.messages().values() - .stream().anyMatch(asyncapiMessage -> - { - AsyncapiMessageView message = AsyncapiMessageView.of(httpAsyncapi.components.messages, asyncapiMessage); - return message.correlationId() != null; - }); - - if (async) + if (whenOperation == null) { - for (AsyncapiOperation operation : httpAsyncapi.operations.values()) + for (Map.Entry e : httpAsyncapi.operations.entrySet()) { - AsyncapiChannelView channelView = AsyncapiChannelView.of(httpAsyncapi.channels, operation.channel); - if (parameters.reset(channelView.address()).find()) + AsyncapiOperation withOperation = route.with.operationId != null ? + kafkaAsyncapi.operations.get(route.with.operationId) : kafkaAsyncapi.operations.get(e.getKey()); + if (withOperation != null) { - AsyncapiReply reply = withOperation.reply; - if (reply != null) - { - final RouteConfigBuilder> asyncRouteBuilder = binding.route(); - binding = addAsyncOperation(asyncRouteBuilder, httpAsyncapi, kafkaAsyncapi, operation, - withOperation); - } + binding = addHttpKafkaRoute(binding, kafkaAsyncapi, httpAsyncapi, e.getValue(), withOperation); } } } + else + { + AsyncapiOperation withOperation = kafkaAsyncapi.operations.get(route.with.operationId); + binding = addHttpKafkaRoute(binding, kafkaAsyncapi, httpAsyncapi, whenOperation, withOperation); + } + } + } + return binding; + } - final RouteConfigBuilder> routeBuilder = binding.route(); - routeBuilder - .exit(qname) - .when(HttpKafkaConditionConfig::builder) - .method(method) - .path(path) - .build() - .inject(r -> injectHttpKafkaRouteWith(r, httpAsyncapi, kafkaAsyncapi, whenOperation, - withOperation, paramNames)); - binding = routeBuilder.build(); + private BindingConfigBuilder addHttpKafkaRoute( + BindingConfigBuilder binding, + Asyncapi kafkaAsyncapi, + Asyncapi httpAsyncapi, + AsyncapiOperation whenOperation, + AsyncapiOperation withOperation) + { + + final AsyncapiChannelView channel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel); + String path = channel.address(); + String method = whenOperation.bindings.get("http").method; + final List paramNames = findParams(path); + + AsyncapiChannelView httpChannel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel); + + boolean async = httpChannel.messages().values() + .stream().anyMatch(asyncapiMessage -> + { + AsyncapiMessageView message = + AsyncapiMessageView.of(httpAsyncapi.components.messages, asyncapiMessage); + return message.correlationId() != null; + }); + + if (async) + { + for (AsyncapiOperation operation : httpAsyncapi.operations.values()) + { + AsyncapiChannelView channelView = AsyncapiChannelView.of(httpAsyncapi.channels, operation.channel); + if (parameters.reset(channelView.address()).find()) + { + AsyncapiReply reply = withOperation.reply; + if (reply != null) + { + final RouteConfigBuilder> asyncRouteBuilder = binding.route(); + binding = addAsyncOperation(asyncRouteBuilder, httpAsyncapi, kafkaAsyncapi, operation, + withOperation); + } + } } } + + final RouteConfigBuilder> routeBuilder = binding.route(); + routeBuilder + .exit(qname) + .when(HttpKafkaConditionConfig::builder) + .method(method) + .path(path) + .build() + .inject(r -> injectHttpKafkaRouteWith(r, httpAsyncapi, kafkaAsyncapi, whenOperation, + withOperation, paramNames)); + binding = routeBuilder.build(); return binding; } diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml index 954aeec6d2..205ac08128 100644 --- a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml @@ -210,8 +210,6 @@ bindings: routes: - when: - api-id: http_api - operation-id: createPets exit: asyncapi_kafka0 with: api-id: kafka_api - operation-id: createPets From b355311590864c8ddce36b14a0d41e7909b37d45 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Wed, 5 Jun 2024 11:55:03 +0200 Subject: [PATCH 6/6] feedback --- .../zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml index 205ac08128..e31c9b3fdf 100644 --- a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml +++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml @@ -103,7 +103,7 @@ catalogs: catalog1: type: test options: - subject: sensor + subject: petstore schema: | asyncapi: 3.0.0 info: