diff --git a/incubator/command-dump/src/main/resources/io/aklivity/zilla/runtime/command/dump/internal/airline/zilla.lua b/incubator/command-dump/src/main/resources/io/aklivity/zilla/runtime/command/dump/internal/airline/zilla.lua index 9cf59d0b9f..60bf401234 100644 --- a/incubator/command-dump/src/main/resources/io/aklivity/zilla/runtime/command/dump/internal/airline/zilla.lua +++ b/incubator/command-dump/src/main/resources/io/aklivity/zilla/runtime/command/dump/internal/airline/zilla.lua @@ -2391,6 +2391,11 @@ function handle_kafka_begin_merged_extension(buffer, offset, ext_subtree) local ack_mode = kafka_ext_ack_modes[slice_ack_mode_id:le_int()] ext_subtree:add_le(fields.kafka_ext_ack_mode_id, slice_ack_mode_id) ext_subtree:add(fields.kafka_ext_ack_mode, ack_mode) + -- configs + local configs_offset = ack_mode_offset + ack_mode_length + local configs_length = resolve_length_of_array(buffer, configs_offset) + dissect_and_add_kafka_config_struct_array(buffer, configs_offset, ext_subtree, fields.kafka_ext_config_array_length, + fields.kafka_ext_config_array_size) end function handle_kafka_begin_init_producer_id_extension(buffer, offset, ext_subtree) diff --git a/incubator/command-dump/src/test/resources/io/aklivity/zilla/runtime/command/dump/internal/KafkaMergedApplicationIT_shouldFetchMergedFilterSync.txt b/incubator/command-dump/src/test/resources/io/aklivity/zilla/runtime/command/dump/internal/KafkaMergedApplicationIT_shouldFetchMergedFilterSync.txt index 4c91348e1e..f2a4c27876 100644 --- a/incubator/command-dump/src/test/resources/io/aklivity/zilla/runtime/command/dump/internal/KafkaMergedApplicationIT_shouldFetchMergedFilterSync.txt +++ b/incubator/command-dump/src/test/resources/io/aklivity/zilla/runtime/command/dump/internal/KafkaMergedApplicationIT_shouldFetchMergedFilterSync.txt @@ -1,7 +1,7 @@ -Frame 1: 303 bytes on wire (2424 bits), 303 bytes captured (2424 bits) +Frame 1: 311 bytes on wire (2488 bits), 311 bytes captured (2488 bits) Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00) Internet Protocol Version 6, Src: fe80::3f3f:0:0:2, Dst: fe80::3f3f:0:0:3 -Transmission Control Protocol, Src Port: 0, Dst Port: 7114, Seq: 0, Ack: 1, Len: 229 +Transmission Control Protocol, Src Port: 0, Dst Port: 7114, Seq: 0, Ack: 1, Len: 237 Zilla Frame Frame Type ID: 0x00000001 Frame Type: BEGIN @@ -68,18 +68,21 @@ Zilla Frame Delta Type: NONE (0) Ack Mode ID: -1 Ack Mode: IN_SYNC_REPLICAS + Configs (0 items) + Length: 4 + Size: 0 Frame 2: 211 bytes on wire (1688 bits), 211 bytes captured (1688 bits) Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00) Internet Protocol Version 6, Src: fe80::3f3f:0:0:2, Dst: fe80::3f3f:0:0:3 -Transmission Control Protocol, Src Port: 0, Dst Port: 7114, Seq: 229, Ack: 1, Len: 137 +Transmission Control Protocol, Src Port: 0, Dst Port: 7114, Seq: 237, Ack: 1, Len: 137 Zilla Frame Frame Type ID: 0x40000002 Frame Type: WINDOW Protocol Type ID: 0x00000000 Protocol Type: Worker: 0 - Offset: 0x000000c0 + Offset: 0x000000c8 Origin ID: 0x0000000100000002 Origin Namespace: test Origin Binding: app0 @@ -103,17 +106,17 @@ Zilla Frame Progress: 0 Progress/Maximum: 0/8192 -Frame 3: 303 bytes on wire (2424 bits), 303 bytes captured (2424 bits) +Frame 3: 311 bytes on wire (2488 bits), 311 bytes captured (2488 bits) Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00) Internet Protocol Version 6, Src: fe80::3f3f:0:0:3, Dst: fe80::3f3f:0:0:2 -Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 1, Ack: 366, Len: 229 +Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 1, Ack: 374, Len: 237 Zilla Frame Frame Type ID: 0x00000001 Frame Type: BEGIN Protocol Type ID: 0x00000000 Protocol Type: Worker: 0 - Offset: 0x00000120 + Offset: 0x00000128 Origin ID: 0x0000000100000002 Origin Namespace: test Origin Binding: app0 @@ -173,18 +176,21 @@ Zilla Frame Delta Type: NONE (0) Ack Mode ID: -1 Ack Mode: IN_SYNC_REPLICAS + Configs (0 items) + Length: 4 + Size: 0 Frame 4: 320 bytes on wire (2560 bits), 320 bytes captured (2560 bits) Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00) Internet Protocol Version 6, Src: fe80::3f3f:0:0:3, Dst: fe80::3f3f:0:0:2 -Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 230, Ack: 366, Len: 246 +Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 238, Ack: 374, Len: 246 Zilla Frame Frame Type ID: 0x00000005 Frame Type: FLUSH Protocol Type ID: 0x00000000 Protocol Type: Worker: 0 - Offset: 0x000001e0 + Offset: 0x000001f0 Origin ID: 0x0000000100000002 Origin Namespace: test Origin Binding: app0 @@ -247,14 +253,14 @@ Zilla Frame Frame 5: 211 bytes on wire (1688 bits), 211 bytes captured (1688 bits) Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00) Internet Protocol Version 6, Src: fe80::3f3f:0:0:3, Dst: fe80::3f3f:0:0:2 -Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 476, Ack: 366, Len: 137 +Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 484, Ack: 374, Len: 137 Zilla Frame Frame Type ID: 0x40000002 Frame Type: WINDOW Protocol Type ID: 0x00000000 Protocol Type: Worker: 0 - Offset: 0x000002b0 + Offset: 0x000002c0 Origin ID: 0x0000000100000002 Origin Namespace: test Origin Binding: app0 @@ -281,14 +287,14 @@ Zilla Frame Frame 6: 194 bytes on wire (1552 bits), 194 bytes captured (1552 bits) Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00) Internet Protocol Version 6, Src: fe80::3f3f:0:0:2, Dst: fe80::3f3f:0:0:3 -Transmission Control Protocol, Src Port: 0, Dst Port: 7114, Seq: 366, Ack: 613, Len: 120 +Transmission Control Protocol, Src Port: 0, Dst Port: 7114, Seq: 374, Ack: 621, Len: 120 Zilla Frame Frame Type ID: 0x00000003 Frame Type: END Protocol Type ID: 0x00000000 Protocol Type: Worker: 0 - Offset: 0x00000310 + Offset: 0x00000320 Origin ID: 0x0000000100000002 Origin Namespace: test Origin Binding: app0 @@ -309,14 +315,14 @@ Zilla Frame Frame 7: 194 bytes on wire (1552 bits), 194 bytes captured (1552 bits) Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00) Internet Protocol Version 6, Src: fe80::3f3f:0:0:3, Dst: fe80::3f3f:0:0:2 -Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 613, Ack: 486, Len: 120 +Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 621, Ack: 494, Len: 120 Zilla Frame Frame Type ID: 0x00000003 Frame Type: END Protocol Type ID: 0x00000000 Protocol Type: Worker: 0 - Offset: 0x00000360 + Offset: 0x00000370 Origin ID: 0x0000000100000002 Origin Namespace: test Origin Binding: app0 diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java index 9280cfef77..21c91c6eb3 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java @@ -16,6 +16,7 @@ package io.aklivity.zilla.runtime.binding.kafka.internal.stream; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaCapabilities.FETCH_ONLY; +import static io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaCapabilities.PRODUCE_AND_FETCH; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaCapabilities.PRODUCE_ONLY; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType.HISTORICAL; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType.LIVE; @@ -39,6 +40,7 @@ import org.agrona.collections.MutableInteger; import org.agrona.collections.MutableLong; import org.agrona.collections.MutableReference; +import org.agrona.collections.Object2ObjectHashMap; import org.agrona.concurrent.UnsafeBuffer; import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding; @@ -1033,6 +1035,7 @@ private final class KafkaMergedStream private final KafkaIsolation isolation; private final KafkaDeltaType deltaType; private final KafkaAckMode ackMode; + private final Object2ObjectHashMap configs; private KafkaOffsetType maximumOffset; private List filters; @@ -1107,6 +1110,7 @@ private final class KafkaMergedStream this.isolation = isolation; this.deltaType = deltaType; this.ackMode = ackMode; + this.configs = new Object2ObjectHashMap<>(); } private void onMergedMessage( @@ -1618,7 +1622,7 @@ else if (capabilities == PRODUCE_ONLY) else { doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax, - traceId, authorization, affinity, EMPTY_EXTENSION); + traceId, authorization, affinity, beginExToKafka(beginExToKafkaMergedProduceAndFetch())); } doUnmergedFetchReplyWindowsIfNecessary(traceId); @@ -1659,6 +1663,13 @@ private Consumer beginExToKafkaMergedFetchOnly() .latestOffset(v) .metadata(metadataRW.length() > 0 ? metadataRW.toString() : null)); }); + String cleanupPolicy = configs.get(CONFIG_NAME_CLEANUP_POLICY); + if (cleanupPolicy != null) + { + builder.configsItem(c -> + c.name(CONFIG_NAME_CLEANUP_POLICY) + .value(cleanupPolicy)); + } }; } @@ -1668,6 +1679,29 @@ private Consumer beginExToKafkaMergedProduceOnly() { builder.capabilities(c -> c.set(PRODUCE_ONLY)).topic(topic); leadersByPartitionId.intForEach((k, v) -> builder.partitionsItem(i -> i.partitionId(k))); + String cleanupPolicy = configs.get(CONFIG_NAME_CLEANUP_POLICY); + if (cleanupPolicy != null) + { + builder.configsItem(c -> + c.name(CONFIG_NAME_CLEANUP_POLICY) + .value(cleanupPolicy)); + } + }; + } + + private Consumer beginExToKafkaMergedProduceAndFetch() + { + return builder -> + { + builder.capabilities(c -> c.set(PRODUCE_AND_FETCH)) + .topic(topic); + String cleanupPolicy = configs.get(CONFIG_NAME_CLEANUP_POLICY); + if (cleanupPolicy != null) + { + builder.configsItem(c -> + c.name(CONFIG_NAME_CLEANUP_POLICY) + .value(cleanupPolicy)); + } }; } @@ -1952,6 +1986,13 @@ private void onTopicConfigChanged( long traceId, ArrayFW configs) { + configs.forEach(c -> + { + if (c.name().equals(CONFIG_NAME_CLEANUP_POLICY)) + { + this.configs.put(CONFIG_NAME_CLEANUP_POLICY, c.value().asString()); + } + }); metaStream.doMetaInitialBeginIfNecessary(traceId); } diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheMergedIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheMergedIT.java index 05fda6b413..9708c0ce06 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheMergedIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheMergedIT.java @@ -80,6 +80,16 @@ public void shouldFetchMergedMessagesWithHeaderFilterAfterCompaction() throws Ex k3po.finish(); } + @Test + @Configuration("cache.options.merged.yaml") + @Specification({ + "${app}/merged.produce.and.fetch.get.cleanup.policy/client", + "${app}/unmerged.produce.and.fetch.get.cleanup.policy/server"}) + public void shouldProduceAndFetchMergedGetCompaction() throws Exception + { + k3po.finish(); + } + @Ignore("requires k3po parallel reads") @Test @Configuration("cache.options.merged.yaml") diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/MqttKafkaEventContext.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/MqttKafkaEventContext.java new file mode 100644 index 0000000000..f62228de03 --- /dev/null +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/MqttKafkaEventContext.java @@ -0,0 +1,74 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mqtt.kafka.internal; + +import static io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.MqttKafkaEventType.NON_COMPACT_SESSIONS_TOPIC; + +import java.nio.ByteBuffer; +import java.time.Clock; + +import org.agrona.concurrent.AtomicBuffer; +import org.agrona.concurrent.UnsafeBuffer; + +import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.EventFW; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.MqttKafkaEventExFW; +import io.aklivity.zilla.runtime.engine.EngineContext; +import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer; + +public class MqttKafkaEventContext +{ + private static final int EVENT_BUFFER_CAPACITY = 2048; + + private final AtomicBuffer eventBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY)); + private final AtomicBuffer extensionBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY)); + private final EventFW.Builder eventRW = new EventFW.Builder(); + private final MqttKafkaEventExFW.Builder mqttKafkaEventExRW = new MqttKafkaEventExFW.Builder(); + private final int mqttTypeId; + private final int nonCompactSessionsTopicEventId; + private final MessageConsumer eventWriter; + private final Clock clock; + + public MqttKafkaEventContext( + EngineContext context) + { + this.mqttTypeId = context.supplyTypeId(MqttKafkaBinding.NAME); + this.nonCompactSessionsTopicEventId = context.supplyEventId("binding.mqtt.kafka.non.compact.sessions.topic"); + this.eventWriter = context.supplyEventWriter(); + this.clock = context.clock(); + } + + public void onMqttConnectionReset( + long traceId, + long bindingId, + String16FW reason) + { + MqttKafkaEventExFW extension = mqttKafkaEventExRW + .wrap(extensionBuffer, 0, extensionBuffer.capacity()) + .nonCompactSessionsTopic(e -> e + .typeId(NON_COMPACT_SESSIONS_TOPIC.value()) + .reason(reason)) + .build(); + EventFW event = eventRW + .wrap(eventBuffer, 0, eventBuffer.capacity()) + .id(nonCompactSessionsTopicEventId) + .timestamp(clock.millis()) + .traceId(traceId) + .namespacedId(bindingId) + .extension(extension.buffer(), extension.offset(), extension.limit()) + .build(); + eventWriter.accept(mqttTypeId, event.buffer(), event.offset(), event.limit()); + } +} diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/MqttKafkaEventFormatter.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/MqttKafkaEventFormatter.java new file mode 100644 index 0000000000..8c66ebbe31 --- /dev/null +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/MqttKafkaEventFormatter.java @@ -0,0 +1,65 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mqtt.kafka.internal; + +import org.agrona.DirectBuffer; + +import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.EventFW; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.MqttKafkaEventExFW; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.MqttKafkaResetMqttConnectionExFW; +import io.aklivity.zilla.runtime.engine.Configuration; +import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi; + +public final class MqttKafkaEventFormatter implements EventFormatterSpi +{ + private static final String NON_COMPACT_SESSIONS_TOPIC_FORMAT = "NON COMPACT SESSIONS TOPIC - %s"; + + private final EventFW eventRO = new EventFW(); + private final MqttKafkaEventExFW mqttKafkaEventExRO = new MqttKafkaEventExFW(); + + MqttKafkaEventFormatter( + Configuration config) + { + } + + public String format( + DirectBuffer buffer, + int index, + int length) + { + final EventFW event = eventRO.wrap(buffer, index, index + length); + final MqttKafkaEventExFW extension = mqttKafkaEventExRO + .wrap(event.extension().buffer(), event.extension().offset(), event.extension().limit()); + String result = null; + switch (extension.kind()) + { + case NON_COMPACT_SESSIONS_TOPIC: + { + MqttKafkaResetMqttConnectionExFW ex = extension.nonCompactSessionsTopic(); + result = String.format(NON_COMPACT_SESSIONS_TOPIC_FORMAT, asString(ex.reason())); + break; + } + } + return result; + } + + private static String asString( + String16FW stringFW) + { + String s = stringFW.asString(); + return s == null ? "" : s; + } +} diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/MqttKafkaEventFormatterFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/MqttKafkaEventFormatterFactory.java new file mode 100644 index 0000000000..a5b625d1c8 --- /dev/null +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/MqttKafkaEventFormatterFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mqtt.kafka.internal; + +import io.aklivity.zilla.runtime.engine.Configuration; +import io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi; + +public final class MqttKafkaEventFormatterFactory implements EventFormatterFactorySpi +{ + @Override + public MqttKafkaEventFormatter create( + Configuration config) + { + return new MqttKafkaEventFormatter(config); + } + + @Override + public String type() + { + return MqttKafkaBinding.NAME; + } +} diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java index b7731c36d9..7ab987ddba 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java @@ -53,6 +53,7 @@ import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaRouteConfig; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.InstanceId; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.MqttKafkaConfiguration; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.MqttKafkaEventContext; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.config.MqttKafkaBindingConfig; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.config.MqttKafkaHeaderHelper; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.stream.MqttKafkaPublishMetadata.KafkaGroup; @@ -63,6 +64,7 @@ import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.Flyweight; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.KafkaAckMode; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.KafkaCapabilities; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.KafkaConfigFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.KafkaEvaluation; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.KafkaHeaderFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.KafkaKeyFW; @@ -94,6 +96,7 @@ import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaGroupBeginExFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaGroupFlushExFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaInitProducerIdBeginExFW; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaMergedBeginExFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaMergedDataExFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaMergedFlushExFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaMetaDataExFW; @@ -167,6 +170,12 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory public static final int MQTT_NOT_AUTHORIZED = 0x87; public static final int MQTT_IMPLEMENTATION_SPECIFIC_ERROR = 0x83; public static final String MQTT_INVALID_SESSION_TIMEOUT_REASON = "Invalid session expiry interval"; + public static final String16FW MQTT_NON_COMPACT_SESSIONS_TOPIC = new String16FW("Sessions Kafka topic in non-compacted"); + private static final KafkaConfigFW CONFIG_COMPACT_CLEANUP_POLICY = new KafkaConfigFW.Builder() + .wrap(new UnsafeBuffer(new byte[25]), 0, 25) + .name("cleanup.policy") + .value("compact") + .build(); static { @@ -268,6 +277,7 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory private final String groupIdPrefixFormat; private final Function supplyNamespace; private final Function supplyLocalName; + private final MqttKafkaEventContext events; private String serverRef; private int reconnectAttempt; @@ -320,6 +330,7 @@ public MqttKafkaSessionFactory( this.groupIdPrefixFormat = config.groupIdPrefixFormat(); this.supplyNamespace = context::supplyNamespace; this.supplyLocalName = context::supplyLocalName; + this.events = new MqttKafkaEventContext(context); } @Override @@ -2907,7 +2918,7 @@ private void onKafkaMessage( } } - private void onKafkaBegin( + protected void onKafkaBegin( BeginFW begin) { final long sequence = begin.sequence(); @@ -3156,7 +3167,7 @@ private void doKafkaEnd( } } - private void doKafkaAbort( + protected void doKafkaAbort( long traceId, long authorization) { @@ -3415,6 +3426,59 @@ private KafkaSessionStateProxy( super(originId, routedId, delegate); } + @Override + protected void onKafkaBegin( + BeginFW begin) + { + final long sequence = begin.sequence(); + final long acknowledge = begin.acknowledge(); + final int maximum = begin.maximum(); + final long traceId = begin.traceId(); + final long authorization = begin.authorization(); + final long affinity = begin.affinity(); + final OctetsFW extension = begin.extension(); + + assert acknowledge <= sequence; + assert sequence >= replySeq; + assert acknowledge >= replyAck; + + replySeq = sequence; + replyAck = acknowledge; + replyMax = maximum; + state = MqttKafkaState.openingReply(state); + + assert replyAck <= replySeq; + + final KafkaBeginExFW kafkaBeginEx = extension.get(kafkaBeginExRO::tryWrap); + + onKafkaBegin: + { + if (kafkaBeginEx != null) + { + assert kafkaBeginEx.kind() == KafkaBeginExFW.KIND_MERGED; + final KafkaMergedBeginExFW kafkaMergedBeginEx = kafkaBeginEx.merged(); + + final KafkaConfigFW cleanupPolicyConfig = + kafkaMergedBeginEx.configs().matchFirst(c -> c.equals(CONFIG_COMPACT_CLEANUP_POLICY)); + + if (cleanupPolicyConfig == null) + { + Flyweight mqttResetEx = mqttSessionResetExRW.wrap(sessionExtBuffer, 0, sessionExtBuffer.capacity()) + .typeId(mqttTypeId) + .reasonCode(MQTT_IMPLEMENTATION_SPECIFIC_ERROR) + .build(); + delegate.doMqttWindow(authorization, traceId, 0, 0, 0); + delegate.doMqttReset(traceId, mqttResetEx); + events.onMqttConnectionReset(traceId, routedId, MQTT_NON_COMPACT_SESSIONS_TOPIC); + doKafkaAbort(traceId, authorization); + break onKafkaBegin; + } + } + + super.onKafkaBegin(begin); + } + } + @Override protected void onKafkaDataImpl( DataFW data) diff --git a/runtime/binding-mqtt-kafka/src/main/moditect/module-info.java b/runtime/binding-mqtt-kafka/src/main/moditect/module-info.java index e08083a7ce..2648cb37d9 100644 --- a/runtime/binding-mqtt-kafka/src/main/moditect/module-info.java +++ b/runtime/binding-mqtt-kafka/src/main/moditect/module-info.java @@ -30,4 +30,6 @@ provides io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi with io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.config.MqttKafkaOptionsConfigAdapter; + provides io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi + with io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.MqttKafkaEventFormatterFactory; } diff --git a/runtime/binding-mqtt-kafka/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi b/runtime/binding-mqtt-kafka/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi new file mode 100644 index 0000000000..6bb2339795 --- /dev/null +++ b/runtime/binding-mqtt-kafka/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi @@ -0,0 +1 @@ +io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.MqttKafkaEventFormatterFactory diff --git a/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionProxyIT.java b/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionProxyIT.java index 945b86d372..6808a4587b 100644 --- a/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionProxyIT.java +++ b/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionProxyIT.java @@ -143,6 +143,19 @@ public void shouldSubscribeSaveSubscriptionsInSession() throws Exception k3po.finish(); } + @Test + @Configuration("proxy.log.event.yaml") + @Configure(name = WILL_AVAILABLE_NAME, value = "false") + @Configure(name = PUBLISH_MAX_QOS_NAME, value = "1") + @Configure(name = PUBLISH_MAX_QOS_NAME, value = "0") + @Specification({ + "${mqtt}/session.reject.non.compacted.sessions.topic/client", + "${kafka}/session.reject.non.compacted.sessions.topic/server"}) + public void shouldRejectSessionNonCompactedSessionsTopic() throws Exception + { + k3po.finish(); + } + @Test @Configuration("proxy.yaml") @Configure(name = WILL_AVAILABLE_NAME, value = "false") diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v4/SessionIT.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v4/SessionIT.java index efb11c07a2..a8360ce722 100644 --- a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v4/SessionIT.java +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v4/SessionIT.java @@ -189,6 +189,16 @@ public void shouldClientTakeOverSession() throws Exception k3po.finish(); } + @Test + @Configuration("server.yaml") + @Specification({ + "${net}/session.reject.non.compacted.sessions.topic/client", + "${app}/session.reject.non.compacted.sessions.topic/server"}) + public void shouldRejectSessionNonCompactedSessionsTopic() throws Exception + { + k3po.finish(); + } + @Test @Configuration("server.route.non.default.yaml") @Specification({ diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/SessionIT.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/SessionIT.java index 640d6bc3d7..19fdde8dd6 100644 --- a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/SessionIT.java +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/SessionIT.java @@ -84,6 +84,16 @@ public void shouldSubscribeSaveSubscriptionsInSession() throws Exception k3po.finish(); } + @Test + @Configuration("server.yaml") + @Specification({ + "${net}/session.subscribe/client", + "${app}/session.subscribe.invalid.state/server"}) + public void shouldSubscribeInvalidSessionState() throws Exception + { + k3po.finish(); + } + @Test @Configuration("server.yaml") @Specification({ @@ -266,6 +276,16 @@ public void shouldSubscribeAndPublishToNonDefaultRoute() throws Exception k3po.finish(); } + @Test + @Configuration("server.yaml") + @Specification({ + "${net}/session.reject.non.compacted.sessions.topic/client", + "${app}/session.reject.non.compacted.sessions.topic/server"}) + public void shouldRejectSessionNonCompactedSessionsTopic() throws Exception + { + k3po.finish(); + } + @Test @Configuration("server.yaml") @Specification({ diff --git a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java index c2b215b83f..692cc2c6fd 100644 --- a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java +++ b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java @@ -36,6 +36,7 @@ import io.aklivity.zilla.specs.binding.kafka.internal.types.KafkaAckMode; import io.aklivity.zilla.specs.binding.kafka.internal.types.KafkaCapabilities; import io.aklivity.zilla.specs.binding.kafka.internal.types.KafkaConditionFW; +import io.aklivity.zilla.specs.binding.kafka.internal.types.KafkaConfigFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.KafkaDeltaFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.KafkaDeltaType; import io.aklivity.zilla.specs.binding.kafka.internal.types.KafkaDeltaTypeFW; @@ -1165,6 +1166,14 @@ public KafkaMergedBeginExBuilder ackMode( return this; } + public KafkaMergedBeginExBuilder config( + String name, + String value) + { + mergedBeginExRW.configsItem(c -> c.name(name).value(value)); + return this; + } + public KafkaBeginExBuilder build() { final KafkaMergedBeginExFW mergedBeginEx = mergedBeginExRW.build(); @@ -5802,6 +5811,7 @@ public final class KafkaMergedBeginExMatcherBuilder private KafkaEvaluation evaluation; private KafkaAckMode ackMode; private Array32FW.Builder filtersRW; + private Array32FW.Builder configsFW; private KafkaMergedBeginExMatcherBuilder() { diff --git a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl index 0377305825..dd8c53577a 100644 --- a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl +++ b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl @@ -243,6 +243,7 @@ scope kafka KafkaIsolation isolation = READ_COMMITTED; KafkaDeltaType deltaType = NONE; KafkaAckMode ackMode = IN_SYNC_REPLICAS; + KafkaConfig[] configs; } union KafkaMergedDataEx switch (uint8) diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.and.fetch.get.cleanup.policy/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.and.fetch.get.cleanup.policy/client.rpt new file mode 100644 index 0000000000..2b4c6b385c --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.and.fetch.get.cleanup.policy/client.rpt @@ -0,0 +1,42 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 16 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_AND_FETCH") + .topic("test") + .partition(0, 1) + .filter() + .header("header1", "value1") + .build() + .build() + .build()} + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_AND_FETCH") + .topic("test") + .config("cleanup.policy", "compact") + .build() + .build()} + +connected diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.and.fetch.get.cleanup.policy/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.and.fetch.get.cleanup.policy/server.rpt new file mode 100644 index 0000000000..337b0932ee --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.and.fetch.get.cleanup.policy/server.rpt @@ -0,0 +1,47 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_AND_FETCH") + .topic("test") + .partition(0, 1) + .filter() + .header("header1", "value1") + .build() + .build() + .build()} + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_AND_FETCH") + .topic("test") + .config("cleanup.policy", "compact") + .build() + .build()} + +connected diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.and.fetch.get.cleanup.policy/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.and.fetch.get.cleanup.policy/client.rpt new file mode 100644 index 0000000000..a2652e56f7 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.and.fetch.get.cleanup.policy/client.rpt @@ -0,0 +1,152 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .describe() + .topic("test") + .config("cleanup.policy") + .config("max.message.bytes") + .config("segment.bytes") + .config("segment.index.bytes") + .config("segment.ms") + .config("retention.bytes") + .config("retention.ms") + .config("delete.retention.ms") + .config("min.compaction.lag.ms") + .config("max.compaction.lag.ms") + .config("min.cleanable.dirty.ratio") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .describe() + .topic("test") + .config("cleanup.policy") + .config("max.message.bytes") + .config("segment.bytes") + .config("segment.index.bytes") + .config("segment.ms") + .config("retention.bytes") + .config("retention.ms") + .config("delete.retention.ms") + .config("min.compaction.lag.ms") + .config("max.compaction.lag.ms") + .config("min.cleanable.dirty.ratio") + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .describe() + .config("cleanup.policy", "compact") + .config("max.message.bytes", 16) + .config("segment.bytes", 1073741824) + .config("segment.index.bytes", 64) + .config("segment.ms", 604800000) + .config("retention.bytes", -1) + .config("retention.ms", 604800000) + .config("delete.retention.ms", 86400000) + .config("min.compaction.lag.ms", 0) + .config("max.compaction.lag.ms", 9223372036854775807) + .config("min.cleanable.dirty.ratio", 0.0) + .build() + .build()} + +read notify RECEIVED_CONFIG + +connect await RECEIVED_CONFIG + "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 1) + .build() + .build()} +read notify PARTITION_COUNT_1 + +connect await PARTITION_COUNT_1 + "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .fetch() + .topic("test") + .partition(0, -2) + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .fetch() + .topic("test") + .partition(0, 1, 2) + .build() + .build()} + + +connect await PARTITION_COUNT_1 + "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.and.fetch.get.cleanup.policy/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.and.fetch.get.cleanup.policy/server.rpt new file mode 100644 index 0000000000..93fb0ab0ba --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.and.fetch.get.cleanup.policy/server.rpt @@ -0,0 +1,151 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +accept "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .describe() + .topic("test") + .config("cleanup.policy") + .config("max.message.bytes") + .config("segment.bytes") + .config("segment.index.bytes") + .config("segment.ms") + .config("retention.bytes") + .config("retention.ms") + .config("delete.retention.ms") + .config("min.compaction.lag.ms") + .config("max.compaction.lag.ms") + .config("min.cleanable.dirty.ratio") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .describe() + .topic("test") + .config("cleanup.policy") + .config("max.message.bytes") + .config("segment.bytes") + .config("segment.index.bytes") + .config("segment.ms") + .config("retention.bytes") + .config("retention.ms") + .config("delete.retention.ms") + .config("min.compaction.lag.ms") + .config("max.compaction.lag.ms") + .config("min.cleanable.dirty.ratio") + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .describe() + .config("cleanup.policy", "compact") + .config("max.message.bytes", 16) + .config("segment.bytes", 1073741824) + .config("segment.index.bytes", 64) + .config("segment.ms", 604800000) + .config("retention.bytes", -1) + .config("retention.ms", 604800000) + .config("delete.retention.ms", 86400000) + .config("min.compaction.lag.ms", 0) + .config("max.compaction.lag.ms", 9223372036854775807) + .config("min.cleanable.dirty.ratio", 0.0) + .build() + .build()} +write flush + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 1) + .build() + .build()} +write flush + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .fetch() + .topic("test") + .partition(0, -2) + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .fetch() + .topic("test") + .partition(0, 1, 2) + .build() + .build()} +write flush + + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} +write flush diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/MergedIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/MergedIT.java index 12a994c0fd..6aa5454608 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/MergedIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/MergedIT.java @@ -714,6 +714,16 @@ public void shouldUnmergedFetchServerSentResetAndAbortWithMessage() throws Excep k3po.finish(); } + @Test + @Specification({ + "${app}/unmerged.produce.and.fetch.get.cleanup.policy/client", + "${app}/unmerged.produce.and.fetch.get.cleanup.policy/server"}) + public void shouldProduceAndFetchUnmergedGetCompaction() throws Exception + { + k3po.finish(); + } + + @Test @Specification({ "${app}/merged.fetch.filter.not.header/client", @@ -903,6 +913,15 @@ public void shouldProduceUnmergedMessageValueByGettingPartitionId() throws Excep k3po.finish(); } + @Test + @Specification({ + "${app}/merged.produce.and.fetch.get.cleanup.policy/client", + "${app}/merged.produce.and.fetch.get.cleanup.policy/server"}) + public void shouldProduceAndFetchMergedGetCompaction() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${app}/merged.fetch.unsubscribe/client", diff --git a/specs/binding-mqtt-kafka.spec/src/main/resources/META-INF/zilla/mqtt_kafka.idl b/specs/binding-mqtt-kafka.spec/src/main/resources/META-INF/zilla/mqtt_kafka.idl index 74a792a173..5b5cd547f2 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/resources/META-INF/zilla/mqtt_kafka.idl +++ b/specs/binding-mqtt-kafka.spec/src/main/resources/META-INF/zilla/mqtt_kafka.idl @@ -30,4 +30,22 @@ scope mqtt_kafka int8 length; int16[length] packetIds = null; } + + scope event + { + enum MqttKafkaEventType (uint8) + { + NON_COMPACT_SESSIONS_TOPIC (1) + } + + struct MqttKafkaResetMqttConnectionEx extends core::stream::Extension + { + string16 reason; + } + + union MqttKafkaEventEx switch (MqttKafkaEventType) + { + case NON_COMPACT_SESSIONS_TOPIC: MqttKafkaResetMqttConnectionEx nonCompactSessionsTopic; + } + } } \ No newline at end of file diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/config/proxy.log.event.yaml b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/config/proxy.log.event.yaml new file mode 100644 index 0000000000..f6c94af5cc --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/config/proxy.log.event.yaml @@ -0,0 +1,37 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +--- +name: test +telemetry: + exporters: + exporter0: + type: test + options: + events: + - qname: test.kafka0 + id: binding.mqtt.kafka.non.compact.sessions.topic + message: NON COMPACT SESSIONS TOPIC - Sessions Kafka topic in non-compacted +bindings: + mqtt0: + type: mqtt-kafka + kind: proxy + options: + server: mqtt-1.example.com:1883 + topics: + sessions: mqtt-sessions + messages: mqtt-messages + retained: mqtt-retained + exit: kafka0 diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/client.rpt new file mode 100644 index 0000000000..a8498c5c61 --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/client.rpt @@ -0,0 +1,113 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_AND_FETCH") + .topic("mqtt-sessions") + .groupId("mqtt-clients") + .filter() + .key("client-1#migrate") + .headerNot("sender-id", "sender-1") + .build() + .build() + .build()} + +connected + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .deferred(0) + .partition(-1, -1) + .key("client-1#migrate") + .hashKey("client-1") + .header("sender-id", "sender-1") + .build() + .build()} +write zilla:data.empty +write flush +write notify SENT_INIT_MIGRATE + +write close +read closed + + +connect await SENT_INIT_MIGRATE + "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .group() + .groupId("zilla:test-mqtt0-client-1-session") + .protocol("highlander") + .timeout(1000) + .build() + .build()} + +connected + +read advised zilla:flush ${kafka:matchFlushEx() + .typeId(zilla:id("kafka")) + .group() + .leaderId("consumer-1") + .memberId("consumer-1") + .members("consumer-1") + .build() + .build()} +read notify RECEIVED_LEADER + +write zilla:data.empty + + +connect await RECEIVED_LEADER + "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_AND_FETCH") + .topic("mqtt-sessions") + .groupId("mqtt-clients") + .filter() + .key("client-1") + .build() + .filter() + .key("client-1#migrate") + .headerNot("sender-id", "sender-1") + .build() + .build() + .build()} + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_AND_FETCH") + .topic("mqtt-sessions") + .config("cleanup.policy", "delete") + .build() + .build()} + +connected \ No newline at end of file diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/server.rpt new file mode 100644 index 0000000000..f84db986bc --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/server.rpt @@ -0,0 +1,108 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_AND_FETCH") + .topic("mqtt-sessions") + .groupId("mqtt-clients") + .filter() + .key("client-1#migrate") + .headerNot("sender-id", "sender-1") + .build() + .build() + .build()} + +connected + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .deferred(0) + .partition(-1, -1) + .key("client-1#migrate") + .hashKey("client-1") + .header("sender-id", "sender-1") + .build() + .build()} +read zilla:data.empty + +read closed +write close + + +accepted + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .group() + .groupId("zilla:test-mqtt0-client-1-session") + .protocol("highlander") + .timeout(1000) + .build() + .build()} + +connected + +# This is the second prerequisite +write advise zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .group() + .leaderId("consumer-1") + .memberId("consumer-1") + .members("consumer-1") + .build() + .build()} +write flush + +read zilla:data.empty + + +accepted + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_AND_FETCH") + .topic("mqtt-sessions") + .groupId("mqtt-clients") + .filter() + .key("client-1") + .build() + .filter() + .key("client-1#migrate") + .headerNot("sender-id", "sender-1") + .build() + .build() + .build()} + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_AND_FETCH") + .topic("mqtt-sessions") + .config("cleanup.policy", "delete") + .build() + .build()} + +connect abort diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/session.reject.non.compacted.sessions.topic/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/session.reject.non.compacted.sessions.topic/client.rpt new file mode 100644 index 0000000000..17138ad814 --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/session.reject.non.compacted.sessions.topic/client.rpt @@ -0,0 +1,46 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/mqtt0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .expiry(1) + .capabilities("REDIRECT") + .clientId("client-1") + .build() + .build()} + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .expiry(1) + .subscribeQosMax(2) + .publishQosMax(2) + .capabilities("RETAIN", "SUBSCRIPTION_IDS", "WILDCARD") + .clientId("client-1") + .build() + .build()} + +connected + +read zilla:reset.ext ${mqtt:resetEx() + .typeId(zilla:id("mqtt")) + .reasonCode(131) + .build()} +write aborted diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/session.reject.non.compacted.sessions.topic/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/session.reject.non.compacted.sessions.topic/server.rpt new file mode 100644 index 0000000000..3b4ff464d6 --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/session.reject.non.compacted.sessions.topic/server.rpt @@ -0,0 +1,49 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/mqtt0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .expiry(1) + .capabilities("REDIRECT") + .clientId("client-1") + .build() + .build()} + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .expiry(1) + .subscribeQosMax(2) + .publishQosMax(2) + .capabilities("RETAIN", "SUBSCRIPTION_IDS", "WILDCARD") + .clientId("client-1") + .build() + .build()} + +connected + +write zilla:reset.ext ${mqtt:resetEx() + .typeId(zilla:id("mqtt")) + .reasonCode(131) + .build()} + +read abort diff --git a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java index ad86104769..76b2d69056 100644 --- a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java +++ b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java @@ -638,6 +638,15 @@ public void shouldSubscribeSaveSubscriptionsInSession() throws Exception k3po.finish(); } + @Test + @Specification({ + "${kafka}/session.reject.non.compacted.sessions.topic/client", + "${kafka}/session.reject.non.compacted.sessions.topic/server"}) + public void shouldRejectSessionNonCompactedSessionsTopic() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${kafka}/session.subscribe.via.session.state/client", diff --git a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/MqttIT.java b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/MqttIT.java index 15cfc5bd6b..006a056988 100644 --- a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/MqttIT.java +++ b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/MqttIT.java @@ -523,6 +523,15 @@ public void shouldSubscribeSaveSubscriptionsInSession() throws Exception k3po.finish(); } + @Test + @Specification({ + "${mqtt}/session.reject.non.compacted.sessions.topic/client", + "${mqtt}/session.reject.non.compacted.sessions.topic/server"}) + public void shouldRejectSessionNonCompactedSessionsTopic() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${mqtt}/session.subscribe.via.session.state/client", diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/session.reject.non.compacted.sessions.topic/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/session.reject.non.compacted.sessions.topic/client.rpt new file mode 100644 index 0000000000..3b56ea3017 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/session.reject.non.compacted.sessions.topic/client.rpt @@ -0,0 +1,44 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .clientId("client") + .build() + .build()} + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .subscribeQosMax(2) + .publishQosMax(2) + .capabilities("RETAIN", "SUBSCRIPTION_IDS", "WILDCARD") + .clientId("client") + .build() + .build()} + +connected + +read zilla:reset.ext ${mqtt:resetEx() + .typeId(zilla:id("mqtt")) + .reasonCode(131) + .build()} +write aborted diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/session.reject.non.compacted.sessions.topic/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/session.reject.non.compacted.sessions.topic/server.rpt new file mode 100644 index 0000000000..4d3496049c --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/session.reject.non.compacted.sessions.topic/server.rpt @@ -0,0 +1,47 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .clientId("client") + .build() + .build()} + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .subscribeQosMax(2) + .publishQosMax(2) + .capabilities("RETAIN", "SUBSCRIPTION_IDS", "WILDCARD") + .clientId("client") + .build() + .build()} + +connected + +write zilla:reset.ext ${mqtt:resetEx() + .typeId(zilla:id("mqtt")) + .reasonCode(131) + .build()} + +read abort diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/session.subscribe.invalid.state/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/session.subscribe.invalid.state/client.rpt new file mode 100644 index 0000000000..7b68fbd642 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/session.subscribe.invalid.state/client.rpt @@ -0,0 +1,79 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .clientId("client") + .build() + .build()} + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .subscribeQosMax(2) + .publishQosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +read zilla:data.empty + +write zilla:data.ext ${mqtt:dataEx() + .typeId(zilla:id("mqtt")) + .session() + .kind("STATE") + .build() + .build()} + +write ${mqtt:session() + .subscription("sensor/one", 1, "AT_MOST_ONCE", "SEND_RETAINED") + .build()} +write flush + +read ${mqtt:session() + .subscription("sensor/one", 1, "AT_MOST_ONCE", "SEND_RETAINED") + .build()} + + +read ${mqtt:session() + .subscription("sensor/one", 1, "AT_MOST_ONCE", "SEND_RETAINED") + .subscription("sensor/one", 1, "AT_MOST_ONCE", "SEND_RETAINED") + .build()} + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .clientId("client") + .filter("sensor/one", 1, "AT_MOST_ONCE", "SEND_RETAINED") + .build() + .build()} + +connected + diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/session.subscribe.invalid.state/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/session.subscribe.invalid.state/server.rpt new file mode 100644 index 0000000000..5aa5ffbd9a --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/session.subscribe.invalid.state/server.rpt @@ -0,0 +1,80 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .clientId("client") + .build() + .build()} + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .subscribeQosMax(2) + .publishQosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +write zilla:data.empty +write flush + +read zilla:data.ext ${mqtt:dataEx() + .typeId(zilla:id("mqtt")) + .session() + .kind("STATE") + .build() + .build()} + +read ${mqtt:session() + .subscription("sensor/one", 1, "AT_MOST_ONCE", "SEND_RETAINED") + .build()} + +write ${mqtt:session() + .subscription("sensor/one", 1, "AT_MOST_ONCE", "SEND_RETAINED") + .build()} +write flush + + +write ${mqtt:session() + .subscription("sensor/one", 1, "AT_MOST_ONCE", "SEND_RETAINED") + .subscription("sensor/one", 1, "AT_MOST_ONCE", "SEND_RETAINED") + .build()} +write flush + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .clientId("client") + .filter("sensor/one", 1, "AT_MOST_ONCE", "SEND_RETAINED") + .build() + .build()} + +connected diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/session.reject.non.compacted.sessions.topic/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/session.reject.non.compacted.sessions.topic/client.rpt new file mode 100644 index 0000000000..6619adae1c --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/session.reject.non.compacted.sessions.topic/client.rpt @@ -0,0 +1,35 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write [0x10 0x11] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x04] # protocol version + [0x02] # flags = clean start + [0x00 0x3c] # keep alive = 60s + [0x00 0x06] "client" # client id + +read [0x20 0x02] # CONNACK + [0x00] # flags = none + [0x83] # reason code = = implementation specific error + +read closed diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/session.reject.non.compacted.sessions.topic/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/session.reject.non.compacted.sessions.topic/server.rpt new file mode 100644 index 0000000000..f56fbe87b7 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/session.reject.non.compacted.sessions.topic/server.rpt @@ -0,0 +1,36 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted +connected + +read [0x10 0x11] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x04] # protocol version + [0x02] # flags = clean start + [0x00 0x3c] # keep alive = 60s + [0x00 0x06] "client" # client id + +write [0x20 0x02] # CONNACK + [0x00] # flags = none + [0x83] # reason code = = implementation specific error + +write close diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/session.reject.non.compacted.sessions.topic/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/session.reject.non.compacted.sessions.topic/client.rpt new file mode 100644 index 0000000000..6d5075e5e6 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/session.reject.non.compacted.sessions.topic/client.rpt @@ -0,0 +1,37 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write [0x10 0x13] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x05] # protocol version + [0x02] # flags = clean start + [0x00 0x3c] # keep alive = 60s + [0x00] # properties = none + [0x00 0x06] "client" # client id + +read [0x20 0x03] # CONNACK + [0x00] # flags = none + [0x83] # reason code = implementation specific error + [0x00] # properties + +read closed diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/session.reject.non.compacted.sessions.topic/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/session.reject.non.compacted.sessions.topic/server.rpt new file mode 100644 index 0000000000..c53c986d69 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/session.reject.non.compacted.sessions.topic/server.rpt @@ -0,0 +1,38 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted +connected + +read [0x10 0x13] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x05] # protocol version + [0x02] # flags = clean start + [0x00 0x3c] # keep alive = 60s + [0x00] # properties = none + [0x00 0x06] "client" # client id + +write [0x20 0x03] # CONNACK + [0x00] # flags = none + [0x83] # reason code = implementation specific error + [0x00] # properties + +write close diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/SessionIT.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/SessionIT.java index 46b6f84e6b..213924acba 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/SessionIT.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/SessionIT.java @@ -209,6 +209,15 @@ public void shouldRedirectAfterConnack() throws Exception k3po.finish(); } + @Test + @Specification({ + "${app}/session.subscribe.invalid.state/client", + "${app}/session.subscribe.invalid.state/server"}) + public void shouldSubscribeInvalidSessionState() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${app}/session.subscribe.multiple.isolated/client", @@ -227,6 +236,15 @@ public void shouldSubscribeAndPublishToNonDefaultRoute() throws Exception k3po.finish(); } + @Test + @Specification({ + "${app}/session.reject.non.compacted.sessions.topic/client", + "${app}/session.reject.non.compacted.sessions.topic/server"}) + public void shouldRejectSessionNonCompactedSessionsTopic() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${app}/session.invalid.session.timeout.after.connack/client", diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/SessionIT.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/SessionIT.java index 844dccd4de..a9855be4c4 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/SessionIT.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/SessionIT.java @@ -145,4 +145,13 @@ public void shouldSubscribeAndPublishToNonDefaultRoute() throws Exception { k3po.finish(); } + @Test + @Specification({ + "${net}/session.reject.non.compacted.sessions.topic/client", + "${net}/session.reject.non.compacted.sessions.topic/server"}) + public void shouldRejectSessionNonCompactedSessionsTopic() throws Exception + { + k3po.finish(); + } + } diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/SessionIT.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/SessionIT.java index a0da7b4841..c9dcfa4a66 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/SessionIT.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/SessionIT.java @@ -192,6 +192,15 @@ public void shouldSubscribeAndPublishToNonDefaultRoute() throws Exception k3po.finish(); } + @Test + @Specification({ + "${net}/session.reject.non.compacted.sessions.topic/client", + "${net}/session.reject.non.compacted.sessions.topic/server"}) + public void shouldRejectSessionNonCompactedSessionsTopic() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${net}/session.invalid.session.timeout.after.connack/client",