From fc61d88e10aca745011f5cec85e353dad368c59b Mon Sep 17 00:00:00 2001 From: bmaidics Date: Wed, 22 May 2024 15:21:19 +0200 Subject: [PATCH 1/6] Deliver published payload after publish stream close due to session takeover --- .../internal/stream/MqttServerFactory.java | 10 +-- .../internal/stream/server/v5/PublishIT.java | 10 +++ .../publish.session.takeover/client.rpt | 79 +++++++++++++++++++ .../publish.session.takeover/server.rpt | 79 +++++++++++++++++++ .../v5/publish.session.takeover/client.rpt | 49 ++++++++++++ .../v5/publish.session.takeover/server.rpt | 50 ++++++++++++ .../mqtt/streams/application/PublishIT.java | 9 +++ .../mqtt/streams/network/v5/PublishIT.java | 9 +++ 8 files changed, 290 insertions(+), 5 deletions(-) create mode 100644 specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/client.rpt create mode 100644 specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/server.rpt create mode 100644 specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/client.rpt create mode 100644 specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/server.rpt diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java index 04e48899e3..24edb9b6a5 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java @@ -3172,7 +3172,7 @@ private MqttPublishStream resolvePublishStream( final long resolvedId = resolved.id; stream = publishes.computeIfAbsent(topicKey, s -> - new MqttPublishStream(routedId, resolvedId, topic, qos, binding.supplyModelConfig(topic))); + new MqttPublishStream(routedId, resolvedId, topic, topicKey, binding.supplyModelConfig(topic))); stream.doPublishBegin(traceId, affinity, qos); } else @@ -5669,7 +5669,7 @@ private class MqttPublishStream long originId, long routedId, String topic, - int qos, + long topicKey, ModelConfig config) { this.originId = originId; @@ -5677,7 +5677,7 @@ private class MqttPublishStream this.initialId = supplyInitialId.applyAsLong(routedId); this.replyId = supplyReplyId.applyAsLong(initialId); this.topic = topic; - this.topicKey = topicKey(topic, qos); + this.topicKey = topicKey; this.contentType = config != null ? supplyValidator.apply(config) : null; } @@ -5884,7 +5884,7 @@ private void onPublishWindow( debitorIndex = debitor.acquire(budgetId, initialId, MqttServer.this::decodeNetwork); } - if (MqttState.initialClosing(state)) + if (MqttState.initialClosing(state) && publishPayloadBytes == 0) { doPublishAppEnd(traceId); } @@ -6061,7 +6061,7 @@ private void doPublishReset( private void doPublishAppEnd( long traceId) { - if (!MqttState.initialClosed(state)) + if (!MqttState.initialClosed(state) && publishPayloadBytes == 0) { doCancelPublishExpiration(); publishes.remove(topicKey); diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java index 3f1727c378..2b67a301b2 100644 --- a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java @@ -66,6 +66,16 @@ public void shouldPublishOneMessage() throws Exception k3po.finish(); } + @Test + @Configuration("server.yaml") + @Specification({ + "${net}/publish.session.takeover/client", + "${app}/publish.session.takeover/server"}) + public void shouldPublishAfterSessionTakeover() throws Exception + { + k3po.finish(); + } + @Test @Configuration("server.yaml") @Specification({ diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/client.rpt new file mode 100644 index 0000000000..cf23dbd7fe --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/client.rpt @@ -0,0 +1,79 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .clientId("client") + .build() + .build()} + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .subscribeQosMax(2) + .publishQosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +read zilla:data.empty +read notify RECEIVED_SESSION_STATE + +read notify SENT_FIRST_MESSAGE +read closed + + +connect await RECEIVED_SESSION_STATE + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("sensor/one") + .build() + .build()} + +connected + +write zilla:data.ext ${mqtt:dataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +write "message1" + +write zilla:data.ext ${mqtt:dataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +write "test" diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/server.rpt new file mode 100644 index 0000000000..f433d7e2a2 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/server.rpt @@ -0,0 +1,79 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .clientId("client") + .build() + .build()} + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .subscribeQosMax(2) + .publishQosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +write zilla:data.empty +write flush + +write await SENT_FIRST_MESSAGE +write close +write notify SENT_CLOSE + + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("sensor/one") + .build() + .build()} + +connected + +read zilla:data.ext ${mqtt:matchDataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +read "message1" + +read zilla:data.ext ${mqtt:matchDataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +read "test" diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/client.rpt new file mode 100644 index 0000000000..c01c0a3a86 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/client.rpt @@ -0,0 +1,49 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write [0x10 0x18] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x05] # protocol version + [0x02] # flags = clean start + [0x00 0x3c] # keep alive = 60s + [0x05] # properties + [0x27] 66560 # maximum packet size = 66560 + [0x00 0x06] "client" # client id + +read [0x20 0x03] # CONNACK + [0x00] # flags = none + [0x00] # reason code + [0x00] # properties + +write [0x30 0x15] # PUBLISH + [0x00 0x0a] "sensor/one" # topic name + [0x00] # properties + "message1" # payload + +read notify SENT_FIRST_MESSAGE +write await SENT_CLOSE +write [0x30 0x11] # PUBLISH + [0x00 0x0a] "sensor/one" # topic name + [0x00] # properties +write "test" # payload + diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/server.rpt new file mode 100644 index 0000000000..75882bdab2 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/server.rpt @@ -0,0 +1,50 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted +connected + +read [0x10 0x18] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x05] # protocol version + [0x02] # flags = clean start + [0x00 0x3c] # keep alive = 60s + [0x05] # properties + [0x27] 66560 # maximum packet size = 66560 + [0x00 0x06] "client" # client id + +write [0x20 0x03] # CONNACK + [0x00] # flags = none + [0x00] # reason code + [0x00] # properties = none + +read [0x30 0x15] # PUBLISH + [0x00 0x0a] "sensor/one" # topic name + [0x00] # properties + "message1" # payload + +read notify SENT_CLOSE +read [0x30 0x11] # PUBLISH + [0x00 0x0a] "sensor/one" # topic name + [0x00] # properties + "test" # payload + + diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/PublishIT.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/PublishIT.java index 69125918fc..ed026cb317 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/PublishIT.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/PublishIT.java @@ -63,6 +63,15 @@ public void shouldSendMultipleMessages() throws Exception k3po.finish(); } + @Test + @Specification({ + "${app}/publish.session.takeover/client", + "${app}/publish.session.takeover/server"}) + public void shouldSendMessageAfterSessionTakeover() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${app}/publish.multiple.clients/client", diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java index 39d82f3648..fa791d4288 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java @@ -63,6 +63,15 @@ public void shouldSendMultipleMessages() throws Exception k3po.finish(); } + @Test + @Specification({ + "${net}/publish.session.takeover/client", + "${net}/publish.session.takeover/server"}) + public void shouldSendMessageAfterSessionTakeover() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${net}/publish.multiple.clients/client", From 1e27059e0c03c0b4ab1c345248e1a29bcbee4234 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Wed, 22 May 2024 15:34:38 +0200 Subject: [PATCH 2/6] fix --- .../runtime/binding/mqtt/internal/stream/MqttServerFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java index 24edb9b6a5..c54d4e4a1d 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java @@ -5884,7 +5884,7 @@ private void onPublishWindow( debitorIndex = debitor.acquire(budgetId, initialId, MqttServer.this::decodeNetwork); } - if (MqttState.initialClosing(state) && publishPayloadBytes == 0) + if (MqttState.initialClosing(state)) { doPublishAppEnd(traceId); } From ab77674a6b8c8302f87740cba68340fb35fac20b Mon Sep 17 00:00:00 2001 From: bmaidics Date: Thu, 23 May 2024 10:45:49 +0200 Subject: [PATCH 3/6] Adapt test --- .../streams/application/publish.session.takeover/client.rpt | 2 ++ .../streams/application/publish.session.takeover/server.rpt | 2 ++ .../streams/network/v5/publish.session.takeover/client.rpt | 6 ++++++ .../streams/network/v5/publish.session.takeover/server.rpt | 6 ++++++ 4 files changed, 16 insertions(+) diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/client.rpt index cf23dbd7fe..00731d1683 100644 --- a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/client.rpt +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/client.rpt @@ -77,3 +77,5 @@ write zilla:data.ext ${mqtt:dataEx() .build()} write "test" + +write abort diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/server.rpt index f433d7e2a2..8e6ebc7106 100644 --- a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/server.rpt +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/server.rpt @@ -77,3 +77,5 @@ read zilla:data.ext ${mqtt:matchDataEx() .build()} read "test" + +read aborted diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/client.rpt index c01c0a3a86..de026f10c2 100644 --- a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/client.rpt +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/client.rpt @@ -47,3 +47,9 @@ write [0x30 0x11] # PUBLISH [0x00] # properties write "test" # payload +read [0xe0 0x02] # DISCONNECT + [0x8e] # session taken over + [0x00] # properties = none + +read closed +write close diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/server.rpt index 75882bdab2..8d595ba2b2 100644 --- a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/server.rpt +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/server.rpt @@ -47,4 +47,10 @@ read [0x30 0x11] # PUBLISH [0x00] # properties "test" # payload +write [0xe0 0x02] # DISCONNECT + [0x8e] # session taken over + [0x00] # properties = none + +write close +read closed From 0ab0f9eab2bba714a419e245c22d5368f7f94e6f Mon Sep 17 00:00:00 2001 From: bmaidics Date: Mon, 27 May 2024 10:55:10 +0200 Subject: [PATCH 4/6] checkpoint --- .../mqtt/internal/stream/MqttServerFactory.java | 4 ++-- .../application/publish.session.takeover/server.rpt | 13 +++---------- .../network/v5/publish.session.takeover/client.rpt | 1 - 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java index c54d4e4a1d..d894f154df 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java @@ -1492,7 +1492,7 @@ private int decodePublishPayload( int reasonCode = SUCCESS; decode: - if (length >= 0) + if (length >= 0 && !MqttState.replyClosed(server.state)) { MqttServer.MqttPublishStream publisher = server.publishes.get(server.decodePublisherKey); @@ -6061,7 +6061,7 @@ private void doPublishReset( private void doPublishAppEnd( long traceId) { - if (!MqttState.initialClosed(state) && publishPayloadBytes == 0) + if (!MqttState.initialClosed(state)) { doCancelPublishExpiration(); publishes.remove(topicKey); diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/server.rpt index 8e6ebc7106..edd9548c1a 100644 --- a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/server.rpt +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/server.rpt @@ -45,7 +45,7 @@ connected write zilla:data.empty write flush -write await SENT_FIRST_MESSAGE +write await RECEIVED_FIRST_MESSAGE write close write notify SENT_CLOSE @@ -69,13 +69,6 @@ read zilla:data.ext ${mqtt:matchDataEx() .build()} read "message1" +read notify RECEIVED_FIRST_MESSAGE -read zilla:data.ext ${mqtt:matchDataEx() - .typeId(zilla:id("mqtt")) - .publish() - .build() - .build()} - -read "test" - -read aborted +read closed diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/client.rpt index de026f10c2..d76049a711 100644 --- a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/client.rpt +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.session.takeover/client.rpt @@ -40,7 +40,6 @@ write [0x30 0x15] # PUBLISH [0x00] # properties "message1" # payload -read notify SENT_FIRST_MESSAGE write await SENT_CLOSE write [0x30 0x11] # PUBLISH [0x00 0x0a] "sensor/one" # topic name From 9a75b45da3feef3e7a8d084327403198729a4193 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Mon, 27 May 2024 19:25:54 +0200 Subject: [PATCH 5/6] set decoder after close --- .../binding/mqtt/internal/stream/MqttServerFactory.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java index d894f154df..5eac1db3ac 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java @@ -1492,7 +1492,7 @@ private int decodePublishPayload( int reasonCode = SUCCESS; decode: - if (length >= 0 && !MqttState.replyClosed(server.state)) + if (length >= 0) { MqttServer.MqttPublishStream publisher = server.publishes.get(server.decodePublisherKey); @@ -4890,6 +4890,7 @@ private void closeStreams( { session.cleanupEnd(traceId); } + decoder = decodeIgnoreAll; } private void cleanupBudgetCreditor() From a7627cf3d9a72401cd694030cc446fb9b5f82a12 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Mon, 27 May 2024 19:30:17 +0200 Subject: [PATCH 6/6] test fix --- .../application/publish.session.takeover/client.rpt | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/client.rpt index 00731d1683..7d1bc307ab 100644 --- a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/client.rpt +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.session.takeover/client.rpt @@ -70,12 +70,4 @@ write zilla:data.ext ${mqtt:dataEx() write "message1" -write zilla:data.ext ${mqtt:dataEx() - .typeId(zilla:id("mqtt")) - .publish() - .build() - .build()} - -write "test" - -write abort +write close