From 14fca7696ef6d6a7c8e6420d30176b90fae86533 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Tue, 4 Jun 2024 10:30:42 +0200 Subject: [PATCH] Fix mqtt-kafka non compact test --- .../internal/stream/MqttKafkaSessionFactory.java | 15 +++++++++++++++ .../client.rpt | 5 ++++- .../server.rpt | 5 ++++- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java index 7ab987ddba..990ab929e2 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java @@ -3470,6 +3470,7 @@ protected void onKafkaBegin( delegate.doMqttWindow(authorization, traceId, 0, 0, 0); delegate.doMqttReset(traceId, mqttResetEx); events.onMqttConnectionReset(traceId, routedId, MQTT_NON_COMPACT_SESSIONS_TOPIC); + doKafkaWindow(traceId, authorization, 0, 0, 0, 0, 0); doKafkaAbort(traceId, authorization); break onKafkaBegin; } @@ -3630,6 +3631,20 @@ protected void doKafkaBegin( delegate.sessionId, server, capabilities); } + private void doKafkaWindow( + long traceId, + long authorization, + long budgetId, + int capabilities, + long replySeq, + long replyAck, + int replyMax) + { + + doWindow(kafka, originId, routedId, replyId, replySeq, replyAck, replyMax, + traceId, authorization, budgetId, replyPad, 0, capabilities); + } + private void cancelWillSignal( long authorization, long traceId) diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/client.rpt index a8498c5c61..0a25a46830 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/client.rpt @@ -110,4 +110,7 @@ read zilla:begin.ext ${kafka:beginEx() .build() .build()} -connected \ No newline at end of file +connected + +write abort +read aborted diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/server.rpt index f84db986bc..91a9e09489 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.reject.non.compacted.sessions.topic/server.rpt @@ -105,4 +105,7 @@ write zilla:begin.ext ${kafka:beginEx() .build() .build()} -connect abort +connected + +read aborted +write abort