diff --git a/runtime/binding-asyncapi/pom.xml b/runtime/binding-asyncapi/pom.xml
index 0f8d8e23b1..207e34dd3a 100644
--- a/runtime/binding-asyncapi/pom.xml
+++ b/runtime/binding-asyncapi/pom.xml
@@ -79,6 +79,12 @@
${project.version}
provided
+
+ io.aklivity.zilla
+ binding-http-kafka
+ ${project.version}
+ provided
+
io.aklivity.zilla
binding-tls
@@ -157,6 +163,13 @@
${project.version}
test
+
+ io.aklivity.zilla
+ binding-http-kafka
+ test-jar
+ ${project.version}
+ test
+
org.openjdk.jmh
jmh-core
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java
index e8b130ec05..5151ab36c1 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java
@@ -199,7 +199,8 @@ public void attach(
NamespaceConfig v = entry.getValue();
List bindings = v.bindings.stream()
.filter(b -> b.type.equals("mqtt") || b.type.equals("http") ||
- b.type.equals("kafka") && b.kind == CACHE_CLIENT || b.type.equals("mqtt-kafka"))
+ b.type.equals("kafka") && b.kind == CACHE_CLIENT || b.type.equals("mqtt-kafka") ||
+ b.type.equals("http-kafka"))
.collect(toList());
extractResolveId(k, bindings);
extractNamespace(k, bindings);
@@ -224,7 +225,8 @@ private void attachProxyBinding(
Object2ObjectHashMap::new));
namespaceGenerator.init(binding);
- final NamespaceConfig composite = namespaceGenerator.generateProxy(binding, asyncapis, schemaIdsByApiId::get);
+ final List labels = configs.stream().map(c -> c.apiLabel).collect(toList());
+ final NamespaceConfig composite = namespaceGenerator.generateProxy(binding, asyncapis, schemaIdsByApiId::get, labels);
composite.readURL = binding.readURL;
attach.accept(composite);
updateNamespace(configs, composite, new ArrayList<>(asyncapis.values()));
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiConditionConfigAdapter.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiConditionConfigAdapter.java
index 8d659d7141..cc3e31656c 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiConditionConfigAdapter.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiConditionConfigAdapter.java
@@ -42,8 +42,11 @@ public JsonObject adaptToJson(
JsonObjectBuilder object = Json.createObjectBuilder();
object.add(API_ID_NAME, asyncapiCondition.apiId);
- object.add(OPERATION_ID_NAME, asyncapiCondition.operationId);
+ if (asyncapiCondition.operationId != null)
+ {
+ object.add(OPERATION_ID_NAME, asyncapiCondition.operationId);
+ }
return object.build();
}
@@ -51,8 +54,14 @@ public JsonObject adaptToJson(
public ConditionConfig adaptFromJson(
JsonObject object)
{
- String apiId = object.getString(API_ID_NAME);
- String operationId = object.getString(OPERATION_ID_NAME);
+ String apiId = object.containsKey(API_ID_NAME)
+ ? object.getString(API_ID_NAME)
+ : null;
+
+ String operationId = object.containsKey(OPERATION_ID_NAME)
+ ? object.getString(OPERATION_ID_NAME)
+ : null;
+
return new AsyncapiConditionConfig(apiId, operationId);
}
}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java
new file mode 100644
index 0000000000..63c83bdad8
--- /dev/null
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpKafkaProxy.java
@@ -0,0 +1,343 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.binding.asyncapi.internal.config;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiChannel;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiOperation;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiReply;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiCorrelationIdView;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiMessageView;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiSchemaView;
+import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaConditionConfig;
+import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfig;
+import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfigBuilder;
+import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfig;
+import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfigBuilder;
+import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterConfig;
+import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchMergeConfig;
+import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceAsyncHeaderConfig;
+import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfig;
+import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfigBuilder;
+import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
+import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder;
+
+public class AsyncapiHttpKafkaProxy extends AsyncapiProxy
+{
+ private static final String CORRELATION_ID = "\\{correlationId\\}";
+ private static final String PARAMETERS = "\\{(?!correlationId)(\\w+)\\}";
+ private static final String HEADER_LOCATION = "([^/]+)$";
+ private static final String ASYNCAPI_KAFKA_PROTOCOL_NAME = "kafka";
+ private static final String ASYNCAPI_HTTP_PROTOCOL_NAME = "http";
+ private static final String ASYNCAPI_SEND_ACTION_NAME = "send";
+ private static final String ASYNCAPI_RECEIVE_ACTION_NAME = "receive";
+ private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\{([^}]+)\\}");
+ private static final Pattern HEADER_LOCATION_PATTERN = Pattern.compile(HEADER_LOCATION);
+
+ private final Matcher parameters = PARAMETER_PATTERN.matcher("");
+ private final Matcher headerLocation = HEADER_LOCATION_PATTERN.matcher("");
+
+ protected AsyncapiHttpKafkaProxy(
+ String qname,
+ Map asyncapis)
+ {
+ super("http-kafka", qname, asyncapis);
+ }
+
+ @Override
+ protected BindingConfigBuilder injectProxyRoutes(
+ BindingConfigBuilder binding,
+ List routes)
+ {
+ inject:
+ for (AsyncapiRouteConfig route : routes)
+ {
+ final Asyncapi kafkaAsyncapi = asyncapis.get(route.with.apiId);
+
+ for (AsyncapiConditionConfig condition : route.when)
+ {
+ final Asyncapi httpAsyncapi = asyncapis.get(condition.apiId);
+ if (httpAsyncapi.servers.values().stream().anyMatch(s -> !s.protocol.startsWith(ASYNCAPI_HTTP_PROTOCOL_NAME)))
+ {
+ break inject;
+ }
+ final AsyncapiOperation whenOperation = httpAsyncapi.operations.get(condition.operationId);
+ if (whenOperation == null)
+ {
+ for (Map.Entry e : httpAsyncapi.operations.entrySet())
+ {
+ AsyncapiOperation withOperation = route.with.operationId != null ?
+ kafkaAsyncapi.operations.get(route.with.operationId) : kafkaAsyncapi.operations.get(e.getKey());
+ if (withOperation != null)
+ {
+ binding = addHttpKafkaRoute(binding, kafkaAsyncapi, httpAsyncapi, e.getValue(), withOperation);
+ }
+ }
+ }
+ else
+ {
+ AsyncapiOperation withOperation = kafkaAsyncapi.operations.get(route.with.operationId);
+ binding = addHttpKafkaRoute(binding, kafkaAsyncapi, httpAsyncapi, whenOperation, withOperation);
+ }
+ }
+ }
+ return binding;
+ }
+
+ private BindingConfigBuilder addHttpKafkaRoute(
+ BindingConfigBuilder binding,
+ Asyncapi kafkaAsyncapi,
+ Asyncapi httpAsyncapi,
+ AsyncapiOperation whenOperation,
+ AsyncapiOperation withOperation)
+ {
+
+ final AsyncapiChannelView channel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel);
+ String path = channel.address();
+ String method = whenOperation.bindings.get("http").method;
+ final List paramNames = findParams(path);
+
+ AsyncapiChannelView httpChannel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel);
+
+ boolean async = httpChannel.messages().values()
+ .stream().anyMatch(asyncapiMessage ->
+ {
+ AsyncapiMessageView message =
+ AsyncapiMessageView.of(httpAsyncapi.components.messages, asyncapiMessage);
+ return message.correlationId() != null;
+ });
+
+ if (async)
+ {
+ for (AsyncapiOperation operation : httpAsyncapi.operations.values())
+ {
+ AsyncapiChannelView channelView = AsyncapiChannelView.of(httpAsyncapi.channels, operation.channel);
+ if (parameters.reset(channelView.address()).find())
+ {
+ AsyncapiReply reply = withOperation.reply;
+ if (reply != null)
+ {
+ final RouteConfigBuilder> asyncRouteBuilder = binding.route();
+ binding = addAsyncOperation(asyncRouteBuilder, httpAsyncapi, kafkaAsyncapi, operation,
+ withOperation);
+ }
+ }
+ }
+ }
+
+ final RouteConfigBuilder> routeBuilder = binding.route();
+ routeBuilder
+ .exit(qname)
+ .when(HttpKafkaConditionConfig::builder)
+ .method(method)
+ .path(path)
+ .build()
+ .inject(r -> injectHttpKafkaRouteWith(r, httpAsyncapi, kafkaAsyncapi, whenOperation,
+ withOperation, paramNames));
+ binding = routeBuilder.build();
+ return binding;
+ }
+
+ private BindingConfigBuilder addAsyncOperation(
+ RouteConfigBuilder> routeBuilder,
+ Asyncapi httpAsyncapi,
+ Asyncapi kafkaAsyncapi,
+ AsyncapiOperation httpOperation,
+ AsyncapiOperation kafkaOperation)
+ {
+ final AsyncapiChannelView channel = AsyncapiChannelView.of(httpAsyncapi.channels, httpOperation.channel);
+ String path = channel.address();
+ String method = httpOperation.bindings.get("http").method;
+ final List paramNames = findParams(path);
+ return routeBuilder
+ .exit(qname)
+ .when(HttpKafkaConditionConfig::builder)
+ .method(method)
+ .path(path)
+ .build()
+ .inject(r -> injectAsyncProduceHttpKafkaRouteWith(r, httpAsyncapi, kafkaAsyncapi, httpOperation,
+ kafkaOperation, paramNames))
+ .build();
+ }
+
+ @Override
+ public BindingConfigBuilder injectProxyOptions(
+ BindingConfigBuilder binding,
+ AsyncapiOptionsConfig options)
+ {
+ return binding;
+ }
+
+ private List findParams(
+ String item)
+ {
+ List paramNames = new ArrayList<>();
+ Matcher matcher = parameters.reset(item);
+ while (matcher.find())
+ {
+ paramNames.add(parameters.group(1));
+ }
+ return paramNames;
+ }
+
+ private RouteConfigBuilder injectHttpKafkaRouteWith(
+ RouteConfigBuilder route,
+ Asyncapi httpAsyncapi,
+ Asyncapi kafkaAsyncapi,
+ AsyncapiOperation httpOperation,
+ AsyncapiOperation kafkaOperation,
+ List paramNames)
+ {
+ final HttpKafkaWithConfigBuilder newWith = HttpKafkaWithConfig.builder();
+ final AsyncapiChannelView channel = AsyncapiChannelView
+ .of(kafkaAsyncapi.channels, kafkaOperation.channel);
+ final String topic = channel.address();
+
+ switch (kafkaOperation.action)
+ {
+ case "receive":
+ newWith.fetch(HttpKafkaWithFetchConfig.builder()
+ .topic(topic)
+ .inject(with -> injectHttpKafkaRouteFetchWith(with, httpAsyncapi, httpOperation, paramNames))
+ .build());
+ break;
+ case "send":
+ newWith.produce(HttpKafkaWithProduceConfig.builder()
+ .topic(topic)
+ .inject(w -> injectHttpKafkaRouteProduceWith(w, httpOperation, kafkaOperation, httpAsyncapi,
+ kafkaAsyncapi.channels, paramNames))
+ .build());
+ break;
+ }
+
+ route.with(newWith.build());
+
+ return route;
+ }
+
+ private RouteConfigBuilder injectAsyncProduceHttpKafkaRouteWith(
+ RouteConfigBuilder route,
+ Asyncapi httpAsyncapi,
+ Asyncapi kafkaAsyncapi,
+ AsyncapiOperation httpOperation,
+ AsyncapiOperation kafkaOperation,
+ List paramNames)
+ {
+ final HttpKafkaWithConfigBuilder newWith = HttpKafkaWithConfig.builder();
+ final AsyncapiChannelView channel = AsyncapiChannelView.of(kafkaAsyncapi.channels, kafkaOperation.channel);
+ final String topic = channel.address();
+
+ newWith.produce(HttpKafkaWithProduceConfig.builder()
+ .topic(topic)
+ .inject(w -> injectHttpKafkaRouteProduceWith(w, httpOperation, kafkaOperation, httpAsyncapi,
+ kafkaAsyncapi.channels, paramNames))
+ .build());
+ route.with(newWith.build());
+
+ return route;
+ }
+
+ private HttpKafkaWithFetchConfigBuilder injectHttpKafkaRouteFetchWith(
+ HttpKafkaWithFetchConfigBuilder fetch,
+ Asyncapi httpAsyncapi,
+ AsyncapiOperation httpOperation,
+ List paramNames)
+ {
+ final AsyncapiChannelView channel = AsyncapiChannelView.of(httpAsyncapi.channels, httpOperation.channel);
+ merge:
+ for (Map.Entry message : channel.messages().entrySet())
+ {
+ AsyncapiMessageView messageView = AsyncapiMessageView.of(httpAsyncapi.components.messages, message.getValue());
+ AsyncapiSchemaView schema = AsyncapiSchemaView.of(httpAsyncapi.components.schemas, messageView.payload());
+
+ if (schema != null && "array".equals(schema.getType()))
+ {
+ fetch.merged(HttpKafkaWithFetchMergeConfig.builder()
+ .contentType("application/json")
+ .initial("[]")
+ .path("/-")
+ .build());
+ break merge;
+ }
+ }
+
+ if (!paramNames.isEmpty())
+ {
+ fetch.filters(List.of(HttpKafkaWithFetchFilterConfig.builder()
+ .key(String.format("${params.%s}", paramNames.get(paramNames.size() - 1)))
+ .build()));
+ }
+
+ return fetch;
+ }
+
+ private HttpKafkaWithProduceConfigBuilder injectHttpKafkaRouteProduceWith(
+ HttpKafkaWithProduceConfigBuilder produce,
+ AsyncapiOperation httpOperation,
+ AsyncapiOperation kafkaOperation,
+ Asyncapi httpAsyncapi,
+ Map kafkaChannels,
+ List paramNames)
+ {
+ final String key = !paramNames.isEmpty() ? String.format("${params.%s}",
+ paramNames.get(paramNames.size() - 1)) : "${idempotencyKey}";
+
+ produce.acks("in_sync_replicas").key(key);
+
+ AsyncapiChannelView httpChannel = AsyncapiChannelView.of(httpAsyncapi.channels, httpOperation.channel);
+
+ httpChannel.messages().values()
+ .forEach(asyncapiMessage ->
+ {
+ AsyncapiMessageView message = AsyncapiMessageView.of(httpAsyncapi.components.messages, asyncapiMessage);
+ if (message.correlationId() != null)
+ {
+ AsyncapiCorrelationIdView correlationId =
+ AsyncapiCorrelationIdView.of(httpAsyncapi.components.correlationIds, message.correlationId());
+ if (headerLocation.reset(correlationId.location()).find())
+ {
+ String headerName = headerLocation.group(1);
+ String location = message.headers().properties.get(headerName).format;
+ location = location.replaceAll(CORRELATION_ID, "\\${correlationId}");
+ location = location.replaceAll(PARAMETERS, "\\${params.$1}");
+ produce.async(HttpKafkaWithProduceAsyncHeaderConfig.builder()
+ .name("location")
+ .value(location)
+ .build());
+ }
+ }
+ });
+
+ AsyncapiReply reply = kafkaOperation.reply;
+ if (reply != null)
+ {
+ AsyncapiChannelView channel = AsyncapiChannelView.of(kafkaChannels, reply.channel);
+ produce.replyTo(channel.address());
+ }
+
+ produce.build();
+
+ return produce;
+ }
+}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiMqttKafkaProxy.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiMqttKafkaProxy.java
new file mode 100644
index 0000000000..3365d92b62
--- /dev/null
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiMqttKafkaProxy.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.binding.asyncapi.internal.config;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiOperation;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView;
+import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaConditionConfig;
+import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaConditionKind;
+import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaOptionsConfig;
+import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaWithConfig;
+import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS;
+import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
+import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder;
+
+public class AsyncapiMqttKafkaProxy extends AsyncapiProxy
+{
+ private static final String ASYNCAPI_KAFKA_PROTOCOL_NAME = "kafka";
+ private static final String ASYNCAPI_MQTT_PROTOCOL_NAME = "mqtt";
+ private static final String ASYNCAPI_SEND_ACTION_NAME = "send";
+ private static final String ASYNCAPI_RECEIVE_ACTION_NAME = "receive";
+
+ protected AsyncapiMqttKafkaProxy(
+ String qname,
+ Map asyncapis)
+ {
+ super("mqtt-kafka", qname, asyncapis);
+ }
+
+ @Override
+ protected BindingConfigBuilder injectProxyRoutes(
+ BindingConfigBuilder binding,
+ List routes)
+ {
+ inject:
+ for (AsyncapiRouteConfig route : routes)
+ {
+ final RouteConfigBuilder> routeBuilder = binding.route();
+
+ final Asyncapi kafkaAsyncapi = asyncapis.get(route.with.apiId);
+
+ final AsyncapiOperation withOperation = kafkaAsyncapi.operations.get(route.with.operationId);
+ final String messages = AsyncapiChannelView.of(kafkaAsyncapi.channels, withOperation.channel).address();
+
+ for (AsyncapiConditionConfig condition : route.when)
+ {
+ final Asyncapi mqttAsyncapi = asyncapis.get(condition.apiId);
+ if (mqttAsyncapi.servers.values().stream().anyMatch(s -> !s.protocol.startsWith(ASYNCAPI_MQTT_PROTOCOL_NAME)))
+ {
+ break inject;
+ }
+ final AsyncapiOperation whenOperation = mqttAsyncapi.operations.get(condition.operationId);
+ final AsyncapiChannelView channel = AsyncapiChannelView.of(mqttAsyncapi.channels, whenOperation.channel);
+ final MqttKafkaConditionKind kind = whenOperation.action.equals(ASYNCAPI_SEND_ACTION_NAME) ?
+ MqttKafkaConditionKind.PUBLISH : MqttKafkaConditionKind.SUBSCRIBE;
+ String topic = channel.address();
+
+ routeBuilder
+ .when(MqttKafkaConditionConfig::builder)
+ .topic(topic)
+ .kind(kind)
+ .build()
+ .with(MqttKafkaWithConfig::builder)
+ .messages(messages.replaceAll("\\{([^{}]*)\\}", "\\${params.$1}"))
+ .build()
+ .exit(qname);
+ }
+ binding = routeBuilder.build();
+ }
+ return binding;
+ }
+
+ @Override
+ public BindingConfigBuilder injectProxyOptions(
+ BindingConfigBuilder binding,
+ AsyncapiOptionsConfig options)
+ {
+ String sessions = "";
+ String messages = "";
+ String retained = "";
+ for (Asyncapi asyncapi : asyncapis.values())
+ {
+ if (asyncapi.channels.containsKey(options.mqttKafka.channels.sessions))
+ {
+ sessions = asyncapi.channels.get(options.mqttKafka.channels.sessions).address;
+ }
+
+ if (asyncapi.channels.containsKey(options.mqttKafka.channels.messages))
+ {
+ messages = asyncapi.channels.get(options.mqttKafka.channels.messages).address;
+ }
+
+ if (asyncapi.channels.containsKey(options.mqttKafka.channels.retained))
+ {
+ retained = asyncapi.channels.get(options.mqttKafka.channels.retained).address;
+ }
+ }
+ return binding
+ .options(MqttKafkaOptionsConfig::builder)
+ .topics()
+ .sessions(sessions)
+ .messages(messages)
+ .retained(retained)
+ .build()
+ .publish()
+ .qosMax(MqttQoS.EXACTLY_ONCE.name().toLowerCase())
+ .build()
+ .clients(Collections.emptyList())
+ .build();
+ }
+}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiNamespaceGenerator.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiNamespaceGenerator.java
index ddf75ede2c..25feb3cb34 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiNamespaceGenerator.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiNamespaceGenerator.java
@@ -92,7 +92,8 @@ public NamespaceConfig generate(
public NamespaceConfig generateProxy(
BindingConfig binding,
Map asyncapis,
- ToLongFunction resolveApiId)
+ ToLongFunction resolveApiId,
+ List labels)
{
return null;
}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxy.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxy.java
new file mode 100644
index 0000000000..728e25f05b
--- /dev/null
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxy.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.binding.asyncapi.internal.config;
+
+import java.util.List;
+import java.util.Map;
+
+import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
+import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
+
+public abstract class AsyncapiProxy
+{
+ protected final String type;
+ protected final String qname;
+ protected final Map asyncapis;
+
+ protected AsyncapiProxy(
+ String type,
+ String qname,
+ Map asyncapis)
+ {
+ this.type = type;
+ this.qname = qname;
+ this.asyncapis = asyncapis;
+ }
+
+ protected abstract BindingConfigBuilder injectProxyRoutes(
+ BindingConfigBuilder binding,
+ List routes);
+
+ public abstract BindingConfigBuilder injectProxyOptions(
+ BindingConfigBuilder binding,
+ AsyncapiOptionsConfig options);
+}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxyNamespaceGenerator.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxyNamespaceGenerator.java
index 2234a66621..9039313cdf 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxyNamespaceGenerator.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProxyNamespaceGenerator.java
@@ -17,38 +17,33 @@
import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY;
import static java.util.Collections.emptyList;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.ToLongFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
-import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiOperation;
-import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView;
-import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaConditionConfig;
-import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaConditionKind;
-import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaOptionsConfig;
-import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaWithConfig;
-import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
-import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfig;
-import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder;
+import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;
public class AsyncapiProxyNamespaceGenerator extends AsyncapiNamespaceGenerator
{
- private static final String ASYNCAPI_SEND_ACTION_NAME = "send";
- private static final String ASYNCAPI_RECEIVE_ACTION_NAME = "receive";
private static final String ASYNCAPI_KAFKA_PROTOCOL_NAME = "kafka";
private static final String ASYNCAPI_MQTT_PROTOCOL_NAME = "mqtt";
+ private static final String ASYNCAPI_HTTP_PROTOCOL_NAME = "http";
public NamespaceConfig generateProxy(
BindingConfig binding,
Map asyncapis,
- ToLongFunction resolveApiId)
+ ToLongFunction resolveApiId,
+ List labels)
{
AsyncapiOptionsConfig options = binding.options != null ? (AsyncapiOptionsConfig) binding.options : EMPTY_OPTION;
List routes = binding.routes.stream()
@@ -59,95 +54,72 @@ public NamespaceConfig generateProxy(
final List metricRefs = binding.telemetryRef != null ?
binding.telemetryRef.metricRefs : emptyList();
- String sessions = "";
- String messages = "";
- String retained = "";
- for (Asyncapi asyncapi : asyncapis.values())
- {
- if (asyncapi.channels.containsKey(options.mqttKafka.channels.sessions))
- {
- sessions = asyncapi.channels.get(options.mqttKafka.channels.sessions).address;
- }
-
- if (asyncapi.channels.containsKey(options.mqttKafka.channels.messages))
- {
- messages = asyncapi.channels.get(options.mqttKafka.channels.messages).address;
- }
+ final Map> routesByProtocol = new HashMap<>();
- if (asyncapi.channels.containsKey(options.mqttKafka.channels.retained))
- {
- retained = asyncapi.channels.get(options.mqttKafka.channels.retained).address;
- }
- }
-
- return NamespaceConfig.builder()
- .name(String.format("%s/%s", qname, "mqtt-kafka"))
- .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty()))
- .binding()
- .name("mqtt_kafka_proxy0")
- .type("mqtt-kafka")
- .kind(PROXY)
- .inject(b -> this.injectMetrics(b, metricRefs, "mqtt-kafka"))
- .options(MqttKafkaOptionsConfig::builder)
- .topics()
- .sessions(sessions)
- .messages(messages)
- .retained(retained)
- .build()
- .publish()
- .qosMax(MqttQoS.EXACTLY_ONCE.name().toLowerCase())
- .build()
- .clients(Collections.emptyList())
- .build()
- .inject(b -> this.injectMqttKafkaRoutes(b, routes))
- .build()
- .build();
- }
-
- public BindingConfigBuilder injectMqttKafkaRoutes(
- BindingConfigBuilder binding,
- List routes)
- {
inject:
for (AsyncapiRouteConfig route : routes)
{
- final RouteConfigBuilder> routeBuilder = binding.route();
-
final Asyncapi kafkaAsyncapi = asyncapis.get(route.with.apiId);
-
if (kafkaAsyncapi.servers.values().stream().anyMatch(s -> !s.protocol.startsWith(ASYNCAPI_KAFKA_PROTOCOL_NAME)))
{
break inject;
}
- final AsyncapiOperation withOperation = kafkaAsyncapi.operations.get(route.with.operationId);
- final String messages = AsyncapiChannelView.of(kafkaAsyncapi.channels, withOperation.channel).address();
-
for (AsyncapiConditionConfig condition : route.when)
{
- final Asyncapi mqttAsyncapi = asyncapis.get(condition.apiId);
- if (mqttAsyncapi.servers.values().stream().anyMatch(s -> !s.protocol.startsWith(ASYNCAPI_MQTT_PROTOCOL_NAME)))
+ final Asyncapi asyncapi = asyncapis.get(condition.apiId);
+ if (asyncapi.servers.values().stream().anyMatch(s ->
+ !s.protocol.startsWith(ASYNCAPI_MQTT_PROTOCOL_NAME) &&
+ !s.protocol.startsWith(ASYNCAPI_HTTP_PROTOCOL_NAME)))
{
break inject;
}
- final AsyncapiOperation whenOperation = mqttAsyncapi.operations.get(condition.operationId);
- final AsyncapiChannelView channel = AsyncapiChannelView.of(mqttAsyncapi.channels, whenOperation.channel);
- final MqttKafkaConditionKind kind = whenOperation.action.equals(ASYNCAPI_SEND_ACTION_NAME) ?
- MqttKafkaConditionKind.PUBLISH : MqttKafkaConditionKind.SUBSCRIBE;
- String topic = channel.address();
+ final String conditionProtocol = asyncapi.servers.values().stream().findFirst().get().protocol;
+ routesByProtocol.computeIfAbsent(conditionProtocol, c -> new ArrayList<>()).add(route);
+ }
+ }
+
+ final String namespace = String.join("+", labels);
+ NamespaceConfigBuilder builder = NamespaceConfig.builder()
+ .name(String.format("%s/%s", qname, namespace))
+ .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty()));
+
+ routesByProtocol.forEach((k, r) ->
+ {
+ final AsyncapiProxy proxy = resolveProxy(k);
+ builder.binding()
+ .name(String.format("%s_proxy0", proxy.type))
+ .type(proxy.type)
+ .kind(PROXY)
+ .inject(b -> this.injectMetrics(b, metricRefs, proxy.type))
+ .inject(b -> proxy.injectProxyOptions(b, options))
+ .inject(b -> proxy.injectProxyRoutes(b, r))
+ .build();
+ });
- routeBuilder
- .when(MqttKafkaConditionConfig::builder)
- .topic(topic)
- .kind(kind)
- .build()
- .with(MqttKafkaWithConfig::builder)
- .messages(messages.replaceAll("\\{([^{}]*)\\}", "\\${params.$1}"))
- .build()
- .exit(qname);
+
+ return builder.build();
+ }
+
+ private AsyncapiProxy resolveProxy(
+ String protocol)
+ {
+ Pattern pattern = Pattern.compile("(http|mqtt)");
+ Matcher matcher = pattern.matcher(protocol);
+ AsyncapiProxy proxy = null;
+ if (matcher.find())
+ {
+ switch (matcher.group())
+ {
+ case "http":
+ proxy = new AsyncapiHttpKafkaProxy(qname, asyncapis);
+ break;
+ case "mqtt":
+ proxy = new AsyncapiMqttKafkaProxy(qname, asyncapis);
+ break;
}
- binding = routeBuilder.build();
}
- return binding;
+ return proxy;
}
+
}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiComponents.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiComponents.java
index 5f751c1b67..185abee13e 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiComponents.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiComponents.java
@@ -21,6 +21,7 @@ public class AsyncapiComponents
public Map securitySchemes;
public Map messages;
public Map schemas;
+ public Map correlationIds;
public Map messageTraits;
public Map serverVariables;
}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiCorrelationId.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiCorrelationId.java
new file mode 100644
index 0000000000..44569ad89d
--- /dev/null
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiCorrelationId.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.binding.asyncapi.internal.model;
+
+import jakarta.json.bind.annotation.JsonbProperty;
+
+public class AsyncapiCorrelationId
+{
+ public String location;
+
+ @JsonbProperty("$ref")
+ public String ref;
+}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiItem.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiItem.java
index aed859d19c..1184b669fe 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiItem.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiItem.java
@@ -27,6 +27,7 @@ public class AsyncapiItem
public String description;
public Integer minimum;
public Integer maximum;
+ public String format;
@JsonbProperty("enum")
public List values;
@JsonbProperty("$ref")
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessage.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessage.java
index 4a0f51d277..b002de02b7 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessage.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessage.java
@@ -24,6 +24,7 @@ public class AsyncapiMessage
public String contentType;
public AsyncapiSchema payload;
public List traits;
+ public AsyncapiCorrelationId correlationId;
@JsonbProperty("$ref")
public String ref;
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java
new file mode 100644
index 0000000000..d74d06482f
--- /dev/null
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.binding.asyncapi.internal.view;
+
+import java.util.Map;
+
+import jakarta.json.bind.annotation.JsonbPropertyOrder;
+
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiCorrelationId;
+
+@JsonbPropertyOrder({
+ "type",
+ "items",
+ "properties",
+ "required"
+})
+public final class AsyncapiCorrelationIdView extends AsyncapiResolvable
+{
+ private final AsyncapiCorrelationId correlationId;
+ private final Map correlationIds;
+
+ public String refKey()
+ {
+ return key;
+ }
+ public AsyncapiCorrelationId correlationId()
+ {
+ return correlationId;
+ }
+
+ public String location()
+ {
+ return correlationId.location;
+ }
+
+ public static AsyncapiCorrelationIdView of(
+ Map correlationIds,
+ AsyncapiCorrelationId correlationId)
+ {
+ return new AsyncapiCorrelationIdView(correlationIds, correlationId);
+ }
+
+ private AsyncapiCorrelationIdView(
+ Map correlationIds,
+ AsyncapiCorrelationId correlationId)
+ {
+ super(correlationIds, "#/components/correlationIds/(\\w+)");
+ if (correlationId.ref != null)
+ {
+ correlationId = resolveRef(correlationId.ref);
+ }
+
+ this.correlationIds = correlationIds;
+ this.correlationId = correlationId;
+ }
+}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiMessageView.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiMessageView.java
index 6ee6094e17..232ba13676 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiMessageView.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiMessageView.java
@@ -17,6 +17,7 @@
import java.util.List;
import java.util.Map;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiCorrelationId;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiSchema;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiTrait;
@@ -49,6 +50,11 @@ public List traits()
return message.traits;
}
+ public AsyncapiCorrelationId correlationId()
+ {
+ return message.correlationId;
+ }
+
public static AsyncapiMessageView of(
Map messages,
AsyncapiMessage asyncapiMessage)
diff --git a/runtime/binding-asyncapi/src/main/moditect/module-info.java b/runtime/binding-asyncapi/src/main/moditect/module-info.java
index 48fdc65612..142d6a324f 100644
--- a/runtime/binding-asyncapi/src/main/moditect/module-info.java
+++ b/runtime/binding-asyncapi/src/main/moditect/module-info.java
@@ -21,6 +21,7 @@
requires io.aklivity.zilla.runtime.binding.http;
requires io.aklivity.zilla.runtime.binding.kafka;
requires io.aklivity.zilla.runtime.binding.mqtt.kafka;
+ requires io.aklivity.zilla.runtime.binding.http.kafka;
requires io.aklivity.zilla.runtime.binding.tcp;
requires io.aklivity.zilla.runtime.binding.tls;
requires io.aklivity.zilla.runtime.catalog.inline;
diff --git a/runtime/binding-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/proxy/AsyncapiIT.java b/runtime/binding-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/proxy/AsyncapiIT.java
index ce7ed7df1b..3df61527da 100644
--- a/runtime/binding-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/proxy/AsyncapiIT.java
+++ b/runtime/binding-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/proxy/AsyncapiIT.java
@@ -57,4 +57,26 @@ public void shouldPublish() throws Exception
{
k3po.finish();
}
+
+ @Test
+ @Configuration("proxy.http.kafka.yaml")
+ @Specification({
+ "${asyncapi}/proxy.http.create.pet/client",
+ "${asyncapi}/proxy.kafka.create.pet/server"
+ })
+ public void shouldCreatePet() throws Exception
+ {
+ k3po.finish();
+ }
+
+ @Test
+ @Configuration("proxy.http.kafka.async.yaml")
+ @Specification({
+ "${asyncapi}/proxy.http.async.verify.customer/client",
+ "${asyncapi}/proxy.kafka.async.verify.customer/server"
+ })
+ public void shouldVerifyCustomerAsync() throws Exception
+ {
+ k3po.finish();
+ }
}
diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.async.yaml b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.async.yaml
new file mode 100644
index 0000000000..889a2995a7
--- /dev/null
+++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.async.yaml
@@ -0,0 +1,262 @@
+#
+# Copyright 2021-2023 Aklivity Inc.
+#
+# Aklivity licenses this file to you under the Apache License,
+# version 2.0 (the "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+---
+name: test
+catalogs:
+ catalog0:
+ type: test
+ options:
+ subject: petstore
+ schema: |
+ asyncapi: 3.0.0
+ info:
+ title: AsyncAPI Petstore
+ license:
+ name: MIT
+ version: 1.0.0
+ servers:
+ plain:
+ host: localhost:8080
+ protocol: http
+ protocolVersion: '2.0'
+ defaultContentType: application/json
+
+ channels:
+ customer:
+ address: /customer
+ messages:
+ customer:
+ $ref: '#/components/messages/customer'
+ showCustomerById:
+ address: /customer;cid={correlationId}
+ messages:
+ customer:
+ $ref: '#/components/messages/customer'
+
+ operations:
+ createCustomer:
+ action: send
+ bindings:
+ http:
+ method: POST
+ channel:
+ $ref: '#/channels/customer'
+ getVerifiedCustomer:
+ action: receive
+ bindings:
+ http:
+ method: GET
+ query:
+ type: object
+ properties:
+ limit:
+ type: number
+ channel:
+ $ref: '#/channels/showCustomerById'
+
+ components:
+ correlationIds:
+ customerCorrelationId:
+ location: '$message.header#/CorrelId'
+ schemas:
+ Customer:
+ type: object
+ properties:
+ id:
+ type: integer
+ format: int64
+ example: 100000
+ username:
+ type: string
+ example: fehguy
+ status:
+ type: string
+ description: Verification Status
+ example: approved
+ enum:
+ - pending
+ - approved
+ - denied
+ address:
+ type: array
+ items:
+ $ref: "#/components/schemas/Address"
+ Address:
+ type: object
+ properties:
+ street:
+ type: string
+ example: 437 Lytton
+ city:
+ type: string
+ example: Palo Alto
+ state:
+ type: string
+ example: CA
+ zip:
+ type: string
+ example: "94301"
+
+ messages:
+ customer:
+ name: Customer
+ title: Customer
+ summary: Information about a Customer.
+ contentType: application/json
+ correlationId:
+ $ref: '#/components/correlationIds/customerCorrelationId'
+ headers:
+ type: object
+ properties:
+ CorrelId:
+ type: string
+ format: /customer;cid={correlationId}
+ payload:
+ $ref: "#/components/schemas/Customer"
+
+ catalog1:
+ type: test
+ options:
+ subject: sensor
+ schema: |
+ asyncapi: 3.0.0
+ info:
+ title: Petstore Kafka API
+ version: 1.0.0
+ defaultContentType: application/json
+ servers:
+ host-connections:
+ host: 'localhost:9092'
+ protocol: kafka-secure
+ description: Test broker
+ tags:
+ - name: 'kind:remote'
+ description: This server is a remote server. Not exposed by the application.
+ - name: 'visibility:private'
+ description: This resource is private and only available to certain users.
+ channels:
+ customers:
+ address: "petstore-customers"
+ messages:
+ customer:
+ $ref: "#/components/messages/customer"
+ empty:
+ $ref: "#/components/messages/empty"
+ description: The topic on which pet values may be produced and consumed.
+ verifiedCustomers:
+ address: "petstore-verified-customers"
+ messages:
+ customer:
+ $ref: "#/components/messages/customer"
+ empty:
+ $ref: "#/components/messages/empty"
+ description: The topic on which pet values may be produced and consumed.
+ operations:
+ createCustomer:
+ action: send
+ channel:
+ $ref: "#/channels/customers"
+ reply:
+ channel:
+ $ref: "#/channels/verifiedCustomers"
+ summary: >-
+ Add a pet.
+ messages:
+ - $ref: "#/channels/customers/messages/customer"
+ components:
+ correlationIds:
+ customerVerifyCorrelationId:
+ description: >
+ This correlation ID is used for message tracing and messages
+ correlation. This correlation ID is generated at runtime based on the
+ `VERIFY_ID` and sent to the RESPONSE message.
+ location: $message.header#/VERIFY_ID
+ messages:
+ empty:
+ name: EmptyMessage
+ payload:
+ type: "null"
+ customer:
+ name: Customer
+ title: Customer
+ summary: Information about a Customer.
+ contentType: application/json
+ payload:
+ $ref: "#/components/schemas/Customer"
+ schemas:
+ Customer:
+ type: object
+ properties:
+ id:
+ type: integer
+ format: int64
+ example: 100000
+ username:
+ type: string
+ example: fehguy
+ status:
+ type: string
+ description: Verification Status
+ example: approved
+ enum:
+ - pending
+ - approved
+ - denied
+ address:
+ type: array
+ items:
+ $ref: "#/components/schemas/Address"
+ Address:
+ type: object
+ properties:
+ street:
+ type: string
+ example: 437 Lytton
+ city:
+ type: string
+ example: Palo Alto
+ state:
+ type: string
+ example: CA
+ zip:
+ type: string
+ example: "94301"
+
+bindings:
+ asyncapi_proxy0:
+ type: asyncapi
+ kind: proxy
+ options:
+ specs:
+ http_api:
+ catalog:
+ catalog0:
+ subject: petstore
+ version: latest
+ kafka_api:
+ catalog:
+ catalog1:
+ subject: petstore
+ version: latest
+ routes:
+ - when:
+ - api-id: http_api
+ operation-id: createCustomer
+ exit: asyncapi_kafka0
+ with:
+ api-id: kafka_api
+ operation-id: createCustomer
diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml
new file mode 100644
index 0000000000..e31c9b3fdf
--- /dev/null
+++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/proxy.http.kafka.yaml
@@ -0,0 +1,215 @@
+#
+# Copyright 2021-2023 Aklivity Inc.
+#
+# Aklivity licenses this file to you under the Apache License,
+# version 2.0 (the "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+---
+name: test
+catalogs:
+ catalog0:
+ type: test
+ options:
+ subject: petstore
+ schema: |
+ asyncapi: 3.0.0
+ info:
+ title: AsyncAPI Petstore
+ license:
+ name: MIT
+ version: 1.0.0
+ servers:
+ plain:
+ host: localhost:8080
+ protocol: http
+ protocolVersion: '2.0'
+ defaultContentType: application/json
+
+ channels:
+ pets:
+ address: /pets
+ messages:
+ pet:
+ $ref: '#/components/messages/pet'
+ showPetById:
+ address: /pets/{id}
+ messages:
+ pet:
+ $ref: '#/components/messages/pet'
+
+ operations:
+ createPets:
+ action: send
+ bindings:
+ http:
+ method: POST
+ channel:
+ $ref: '#/channels/pets'
+ listPets:
+ action: receive
+ bindings:
+ http:
+ method: GET
+ channel:
+ $ref: '#/channels/pets'
+ getPets:
+ action: receive
+ bindings:
+ http:
+ method: GET
+ query:
+ type: object
+ properties:
+ limit:
+ type: number
+ channel:
+ $ref: '#/channels/showPetById'
+
+ components:
+ schemas:
+ petPayload:
+ type: object
+ properties:
+ id:
+ type: integer
+ minimum: 0
+ description: Pet id.
+ name:
+ type: string
+ description: Pet name.
+ tag:
+ type: string
+ description: Tag.
+ messages:
+ pet:
+ name: Pet
+ title: Pet
+ summary: >-
+ Inform about Pet.
+ contentType: application/json
+ payload:
+ $ref: '#/components/schemas/petPayload'
+
+ catalog1:
+ type: test
+ options:
+ subject: petstore
+ schema: |
+ asyncapi: 3.0.0
+ info:
+ title: Petstore Kafka API
+ version: 1.0.0
+ defaultContentType: application/json
+ servers:
+ host-connections:
+ host: 'localhost:9092'
+ protocol: kafka-secure
+ description: Test broker
+ tags:
+ - name: 'kind:remote'
+ description: This server is a remote server. Not exposed by the application.
+ - name: 'visibility:private'
+ description: This resource is private and only available to certain users.
+ channels:
+ petstore:
+ address: 'petstore'
+ messages:
+ pet:
+ $ref: '#/components/messages/pet'
+ description: The topic on which pet values may be produced and consumed.
+ operations:
+ listPets:
+ action: receive
+ channel:
+ $ref: '#/channels/petstore'
+ summary: >-
+ List all pets.
+ traits:
+ - $ref: '#/components/operationTraits/kafka'
+ messages:
+ - $ref: '#/channels/petstore/messages/pet'
+ createPets:
+ action: send
+ channel:
+ $ref: '#/channels/petstore'
+ summary: >-
+ Create a pet.
+ traits:
+ - $ref: '#/components/operationTraits/kafka'
+ messages:
+ - $ref: '#/channels/petstore/messages/pet'
+ components:
+ messages:
+ pet:
+ name: Pet
+ title: Pet
+ summary: >-
+ Inform about Pet.
+ contentType: application/json
+ traits:
+ - $ref: '#/components/messageTraits/commonHeaders'
+ payload:
+ $ref: '#/components/schemas/petPayload'
+ schemas:
+ petPayload:
+ type: object
+ properties:
+ id:
+ type: integer
+ minimum: 0
+ description: Pet id.
+ name:
+ type: string
+ description: Pet name.
+ tag:
+ type: string
+ description: Tag.
+ messageTraits:
+ commonHeaders:
+ headers:
+ type: object
+ properties:
+ my-app-header:
+ type: integer
+ minimum: 0
+ maximum: 100
+ operationTraits:
+ kafka:
+ bindings:
+ kafka:
+ clientId:
+ type: string
+ enum:
+ - my-app-id
+bindings:
+ asyncapi_proxy0:
+ type: asyncapi
+ kind: proxy
+ options:
+ specs:
+ http_api:
+ catalog:
+ catalog0:
+ subject: petstore
+ version: latest
+ kafka_api:
+ catalog:
+ catalog1:
+ subject: petstore
+ version: latest
+ routes:
+ - when:
+ - api-id: http_api
+ exit: asyncapi_kafka0
+ with:
+ api-id: kafka_api
diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/server.http.yaml b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/server.http.yaml
index 5931f33bf1..4cc7790852 100644
--- a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/server.http.yaml
+++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/server.http.yaml
@@ -79,7 +79,7 @@ catalogs:
components:
correlationIds:
petsCorrelationId:
- location: '$message.header#/idempotency-key'
+ location: '$message.header#/CorrelId'
schemas:
petPayload:
type: object
@@ -101,6 +101,14 @@ catalogs:
summary: >-
Inform about Pet.
contentType: application/json
+ correlationId:
+ $ref: '#/components/correlationIds/petsCorrelationId'
+ headers:
+ type: object
+ properties:
+ CorrelId:
+ type: string
+ format: /pets;cid={correlationId}
payload:
$ref: '#/components/schemas/petPayload'
bindings:
diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/client.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/client.rpt
new file mode 100644
index 0000000000..10c47464e7
--- /dev/null
+++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/client.rpt
@@ -0,0 +1,89 @@
+#
+# Copyright 2021-2023 Aklivity Inc.
+#
+# Aklivity licenses this file to you under the Apache License,
+# version 2.0 (the "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+connect "zilla://streams/asyncapi_proxy0"
+ option zilla:window 8192
+ option zilla:transmission "half-duplex"
+
+write zilla:begin.ext ${asyncapi:beginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .operationId("createCustomer")
+ .extension(http:beginEx()
+ .typeId(zilla:id("http"))
+ .header(":method", "POST")
+ .header(":scheme", "http")
+ .header(":path", "/customer")
+ .header(":authority", "localhost:8080")
+ .header("content-type", "application/json")
+ .header("content-length", "155")
+ .header("prefer", "respond-async")
+ .header("idempotency-key", "3f96592e-c8f1-4167-8c46-85f2aabb70a5")
+ .build())
+ .build()}
+connected
+
+write "{ \"id\": 100000, \"username\": \"fehguy\", \"status\": \"approved\", \"address\": [ { \"street\": \"437 Lytton\", \"city\": \"Palo Alto\", \"state\": \"CA\", \"zip\": \"94301\" } ] }"
+write close
+
+read zilla:begin.ext ${asyncapi:matchBeginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .extension(http:beginEx()
+ .typeId(zilla:id("http"))
+ .header(":status", "202")
+ .header("content-length", "0")
+ .header("location", "/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed")
+ .build())
+ .build()}
+
+read closed
+
+read notify RECEIVED_ASYNC_RESPONSE
+
+connect await RECEIVED_ASYNC_RESPONSE
+ "zilla://streams/asyncapi_proxy0"
+ option zilla:window 8192
+ option zilla:transmission "half-duplex"
+
+
+write zilla:begin.ext ${asyncapi:beginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .operationId("getVerifiedCustomer")
+ .extension(http:beginEx()
+ .typeId(zilla:id("http"))
+ .header(":method", "GET")
+ .header(":scheme", "http")
+ .header(":authority", "localhost:8080")
+ .header(":path", "/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed")
+ .header("prefer", "respond-async")
+ .build())
+ .build()}
+connected
+
+write close
+
+read zilla:begin.ext ${asyncapi:matchBeginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .extension(http:beginEx()
+ .typeId(zilla:id("http"))
+ .header(":status", "204")
+ .build())
+ .build()}
+
+read closed
diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/server.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/server.rpt
new file mode 100644
index 0000000000..b08597d817
--- /dev/null
+++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.async.verify.customer/server.rpt
@@ -0,0 +1,88 @@
+#
+# Copyright 2021-2023 Aklivity Inc.
+#
+# Aklivity licenses this file to you under the Apache License,
+# version 2.0 (the "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+accept "zilla://streams/asyncapi_proxy0"
+ option zilla:window 8192
+ option zilla:transmission "half-duplex"
+accepted
+
+read zilla:begin.ext ${asyncapi:beginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .operationId("createCustomer")
+ .extension(http:beginEx()
+ .typeId(zilla:id("http"))
+ .header(":method", "POST")
+ .header(":scheme", "http")
+ .header(":path", "/customer")
+ .header(":authority", "localhost:8080")
+ .header("content-type", "application/json")
+ .header("content-length", "155")
+ .header("prefer", "respond-async")
+ .header("idempotency-key", "3f96592e-c8f1-4167-8c46-85f2aabb70a5")
+ .build())
+ .build()}
+connected
+
+read "{ \"id\": 100000, \"username\": \"fehguy\", \"status\": \"approved\", \"address\": [ { \"street\": \"437 Lytton\", \"city\": \"Palo Alto\", \"state\": \"CA\", \"zip\": \"94301\" } ] }"
+read closed
+
+write zilla:begin.ext ${asyncapi:beginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .operationId("createCustomer")
+ .extension(http:beginEx()
+ .typeId(zilla:id("http"))
+ .header(":status", "202")
+ .header("content-length", "0")
+ .header("Location", "/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed")
+ .build())
+ .build()}
+write flush
+
+write close
+
+accepted
+
+read zilla:begin.ext ${asyncapi:matchBeginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .operationId("getVerifiedCustomer")
+ .extension(http:beginEx()
+ .typeId(zilla:id("http"))
+ .header(":method", "GET")
+ .header(":scheme", "http")
+ .header(":authority", "localhost:8080")
+ .header(":path", "/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed")
+ .header("prefer", "respond-async")
+ .build())
+ .build()}
+connected
+
+read closed
+
+write zilla:begin.ext ${asyncapi:beginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .operationId("getVerifiedCustomer")
+ .extension(http:beginEx()
+ .typeId(zilla:id("http"))
+ .header(":status", "204")
+ .build())
+ .build()}
+
+write flush
+write close
diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/client.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/client.rpt
new file mode 100644
index 0000000000..b55b624bb8
--- /dev/null
+++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/client.rpt
@@ -0,0 +1,47 @@
+#
+# Copyright 2021-2023 Aklivity Inc.
+#
+# Aklivity licenses this file to you under the Apache License,
+# version 2.0 (the "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+connect "zilla://streams/asyncapi_proxy0"
+ option zilla:window 8192
+ option zilla:transmission "half-duplex"
+
+write zilla:begin.ext ${asyncapi:beginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .operationId("createPets")
+ .extension(http:beginEx()
+ .typeId(zilla:id("http"))
+ .header(":method", "POST")
+ .header(":scheme", "http")
+ .header(":path", "/pets")
+ .header(":authority", "localhost:8080")
+ .header("content-type", "application/json")
+ .header("content-length", "39")
+ .build())
+ .build()}
+connected
+
+write "{\"id\": 1, \"name\": \"Dog\", \"tag\": \"test\"}"
+write close
+
+read zilla:begin.ext ${asyncapi:matchBeginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .extension(http:beginEx()
+ .typeId(zilla:id("http"))
+ .header(":status", "204")
+ .build())
+ .build()}
diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/server.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/server.rpt
new file mode 100644
index 0000000000..e13782ead4
--- /dev/null
+++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.http.create.pet/server.rpt
@@ -0,0 +1,51 @@
+#
+# Copyright 2021-2023 Aklivity Inc.
+#
+# Aklivity licenses this file to you under the Apache License,
+# version 2.0 (the "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+accept "zilla://streams/asyncapi_proxy0"
+ option zilla:window 8192
+ option zilla:transmission "half-duplex"
+accepted
+
+read zilla:begin.ext ${openapi:beginEx()
+ .typeId(zilla:id("openapi"))
+ .apiId(0)
+ .operationId("createPets")
+ .extension(http:beginEx()
+ .typeId(zilla:id("http"))
+ .header(":method", "POST")
+ .header(":scheme", "http")
+ .header(":path", "/pets")
+ .header(":authority", "localhost:8080")
+ .header("content-type", "application/json")
+ .header("content-length", "39")
+ .build())
+ .build()}
+
+connected
+
+read "{\"id\": 1, \"name\": \"Dog\", \"tag\": \"test\"}"
+read closed
+
+write zilla:begin.ext ${openapi:beginEx()
+ .typeId(zilla:id("openapi"))
+ .apiId(0)
+ .operationId("createPets")
+ .extension(http:beginEx()
+ .typeId(zilla:id("http"))
+ .header(":status", "204")
+ .build())
+ .build()}
+write flush
diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/client.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/client.rpt
new file mode 100644
index 0000000000..2810462ef7
--- /dev/null
+++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/client.rpt
@@ -0,0 +1,135 @@
+#
+# Copyright 2021-2023 Aklivity Inc.
+#
+# Aklivity licenses this file to you under the Apache License,
+# version 2.0 (the "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+property deltaMillis 0L
+property newTimestamp ${kafka:timestamp() + deltaMillis}
+
+connect "zilla://streams/asyncapi_kafka0"
+ option zilla:window 8192
+ option zilla:transmission "duplex"
+
+write zilla:begin.ext ${asyncapi:beginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .extension(kafka:beginEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .capabilities("PRODUCE_ONLY")
+ .topic("petstore-customers")
+ .partition(-1, -2)
+ .ackMode("IN_SYNC_REPLICAS")
+ .build()
+ .build())
+ .build()}
+
+connected
+
+write option zilla:flags "init"
+write zilla:data.ext ${kafka:dataEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .produce()
+ .timestamp(newTimestamp)
+ .partition(-1, -1)
+ .build()
+ .build()}
+write zilla:data.empty
+write flush
+
+write option zilla:flags "none"
+write "{ \"id\": 100000, \"username\": \"fehguy\", \"status\": \"approved\", \"address\": [ { \"street\": \"437 Lytton\", \"city\": \"Palo Alto\", \"state\": \"CA\", \"zip\": \"94301\" } ] }"
+write flush
+
+write notify SEND_ASYNC_REQUEST
+
+write option zilla:flags "fin"
+write zilla:data.empty
+write flush
+
+write close
+read closed
+
+write notify SENT_ASYNC_REQUEST
+
+connect await SENT_ASYNC_REQUEST
+ "zilla://streams/asyncapi_kafka0"
+ option zilla:window 8192
+ option zilla:transmission "duplex"
+
+write zilla:begin.ext ${asyncapi:beginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .extension(kafka:beginEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .capabilities("FETCH_ONLY")
+ .topic("petstore-verified-customers")
+ .partition(-1, -2)
+ .filter()
+ .header("zilla:correlation-id", "3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed")
+ .build()
+ .build()
+ .build())
+ .build()}
+
+connected
+
+read advised zilla:flush
+
+read notify RECEIVED_ASYNC_FLUSH
+
+read closed
+write close
+
+connect await RECEIVED_ASYNC_FLUSH
+ "zilla://streams/asyncapi_kafka0"
+ option zilla:window 8192
+ option zilla:transmission "duplex"
+
+write zilla:begin.ext ${asyncapi:beginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .extension(kafka:beginEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .capabilities("FETCH_ONLY")
+ .topic("petstore-verified-customers")
+ .partition(-1, -2)
+ .filter()
+ .header("zilla:correlation-id", "3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed")
+ .build()
+ .build()
+ .build())
+ .build()}
+
+connected
+
+read zilla:data.ext ${kafka:matchDataEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .fetch()
+ .partition(0, 1, 2)
+ .progress(0, 2)
+ .progress(1, 1)
+ .key("92d0bf92-63e0-4cfc-ae73-71dee92d1544")
+ .header(":status", "204")
+ .header("zilla:correlation-id", "3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed")
+ .build()
+ .build()}
+read zilla:data.null
+
+read closed
+write close
diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/server.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/server.rpt
new file mode 100644
index 0000000000..bb40b517e9
--- /dev/null
+++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.async.verify.customer/server.rpt
@@ -0,0 +1,122 @@
+#
+# Copyright 2021-2023 Aklivity Inc.
+#
+# Aklivity licenses this file to you under the Apache License,
+# version 2.0 (the "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+accept "zilla://streams/asyncapi_kafka0"
+ option zilla:window 8192
+ option zilla:transmission "duplex"
+
+accepted
+
+read zilla:begin.ext ${asyncapi:matchBeginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .extension(kafka:beginEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .capabilities("PRODUCE_ONLY")
+ .topic("petstore-customers")
+ .partition(-1, -2)
+ .ackMode("IN_SYNC_REPLICAS")
+ .build()
+ .build())
+ .build()}
+
+connected
+
+read option zilla:flags "init"
+read zilla:data.ext ${kafka:matchDataEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .produce()
+ .partition(-1, -1)
+ .build()
+ .build()}
+read zilla:data.empty
+
+read option zilla:flags "none"
+read "{ \"id\": 100000, \"username\": \"fehguy\", \"status\": \"approved\", \"address\": [ { \"street\": \"437 Lytton\", \"city\": \"Palo Alto\", \"state\": \"CA\", \"zip\": \"94301\" } ] }"
+
+read await SEND_ASYNC_REQUEST
+
+read option zilla:flags "fin"
+read zilla:data.empty
+
+read closed
+write close
+
+accepted
+
+read zilla:begin.ext ${asyncapi:matchBeginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .extension(kafka:beginEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .capabilities("FETCH_ONLY")
+ .topic("petstore-verified-customers")
+ .partition(-1, -2)
+ .filter()
+ .header("zilla:correlation-id", "3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed")
+ .build()
+ .build()
+ .build())
+ .build()}
+
+connected
+
+write advise zilla:flush
+
+write notify SEND_ASYNC_REQUEST
+
+write close
+read closed
+
+accepted
+
+read zilla:begin.ext ${asyncapi:matchBeginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .extension(kafka:beginEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .capabilities("FETCH_ONLY")
+ .topic("petstore-verified-customers")
+ .partition(-1, -2)
+ .filter()
+ .header("zilla:correlation-id", "3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed")
+ .build()
+ .build()
+ .build())
+ .build()}
+
+connected
+
+write zilla:data.ext ${kafka:dataEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .fetch()
+ .partition(0, 1, 2)
+ .progress(0, 2)
+ .progress(1, 1)
+ .key("92d0bf92-63e0-4cfc-ae73-71dee92d1544")
+ .header(":status", "204")
+ .header("zilla:correlation-id", "3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed")
+ .build()
+ .build()}
+write flush
+
+write close
+read closed
diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/client.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/client.rpt
new file mode 100644
index 0000000000..c4f0de4bd3
--- /dev/null
+++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/client.rpt
@@ -0,0 +1,61 @@
+#
+# Copyright 2021-2023 Aklivity Inc.
+#
+# Aklivity licenses this file to you under the Apache License,
+# version 2.0 (the "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+property deltaMillis 0L
+property newTimestamp ${kafka:timestamp() + deltaMillis}
+
+connect "zilla://streams/asyncapi_kafka0"
+ option zilla:window 8192
+ option zilla:transmission "duplex"
+
+write zilla:begin.ext ${asyncapi:beginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .extension(kafka:beginEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .capabilities("PRODUCE_ONLY")
+ .topic("petstore")
+ .partition(-1, -2)
+ .ackMode("IN_SYNC_REPLICAS")
+ .build()
+ .build())
+ .build()}
+
+connected
+
+write option zilla:flags "init"
+write zilla:data.ext ${kafka:dataEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .produce()
+ .timestamp(newTimestamp)
+ .partition(-1, -1)
+ .build()
+ .build()}
+write zilla:data.empty
+write flush
+
+write option zilla:flags "none"
+write "{\"id\": 1, \"name\": \"Dog\", \"tag\": \"test\"}"
+write flush
+
+write option zilla:flags "fin"
+write zilla:data.empty
+write flush
+
+write close
+read closed
diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/server.rpt b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/server.rpt
new file mode 100644
index 0000000000..69f3faf594
--- /dev/null
+++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/proxy.kafka.create.pet/server.rpt
@@ -0,0 +1,56 @@
+#
+# Copyright 2021-2023 Aklivity Inc.
+#
+# Aklivity licenses this file to you under the Apache License,
+# version 2.0 (the "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+accept "zilla://streams/asyncapi_kafka0"
+ option zilla:window 8192
+ option zilla:transmission "duplex"
+
+accepted
+
+read zilla:begin.ext ${asyncapi:matchBeginEx()
+ .typeId(zilla:id("asyncapi"))
+ .apiId(0)
+ .extension(kafka:beginEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .capabilities("PRODUCE_ONLY")
+ .topic("petstore")
+ .partition(-1, -2)
+ .ackMode("IN_SYNC_REPLICAS")
+ .build()
+ .build())
+ .build()}
+
+connected
+
+read option zilla:flags "init"
+read zilla:data.ext ${kafka:matchDataEx()
+ .typeId(zilla:id("kafka"))
+ .merged()
+ .produce()
+ .partition(-1, -1)
+ .build()
+ .build()}
+read zilla:data.empty
+
+read option zilla:flags "none"
+read "{\"id\": 1, \"name\": \"Dog\", \"tag\": \"test\"}"
+
+read option zilla:flags "fin"
+read zilla:data.empty
+
+read closed
+write close