From 405c718db3f9d05439b8a0326cb3a595cfeda9c3 Mon Sep 17 00:00:00 2001 From: AJ Date: Thu, 30 May 2024 12:39:56 -0400 Subject: [PATCH] refactor the compact session topic event --- .../binding/mqtt/kafka/internal/MqttKafkaEventFormatter.java | 4 +--- .../mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java | 2 +- .../specs/binding/mqtt/kafka/config/proxy.log.event.yaml | 3 ++- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/MqttKafkaEventFormatter.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/MqttKafkaEventFormatter.java index 8c66ebbe31..14b9081af9 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/MqttKafkaEventFormatter.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/MqttKafkaEventFormatter.java @@ -25,8 +25,6 @@ public final class MqttKafkaEventFormatter implements EventFormatterSpi { - private static final String NON_COMPACT_SESSIONS_TOPIC_FORMAT = "NON COMPACT SESSIONS TOPIC - %s"; - private final EventFW eventRO = new EventFW(); private final MqttKafkaEventExFW mqttKafkaEventExRO = new MqttKafkaEventExFW(); @@ -49,7 +47,7 @@ public String format( case NON_COMPACT_SESSIONS_TOPIC: { MqttKafkaResetMqttConnectionExFW ex = extension.nonCompactSessionsTopic(); - result = String.format(NON_COMPACT_SESSIONS_TOPIC_FORMAT, asString(ex.reason())); + result = String.format("%s", asString(ex.reason())); break; } } diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java index 7ab987ddba..a68ec1c94e 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java @@ -170,7 +170,7 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory public static final int MQTT_NOT_AUTHORIZED = 0x87; public static final int MQTT_IMPLEMENTATION_SPECIFIC_ERROR = 0x83; public static final String MQTT_INVALID_SESSION_TIMEOUT_REASON = "Invalid session expiry interval"; - public static final String16FW MQTT_NON_COMPACT_SESSIONS_TOPIC = new String16FW("Sessions Kafka topic in non-compacted"); + public static final String16FW MQTT_NON_COMPACT_SESSIONS_TOPIC = new String16FW("The Sessions Kafka topic is non-compacted"); private static final KafkaConfigFW CONFIG_COMPACT_CLEANUP_POLICY = new KafkaConfigFW.Builder() .wrap(new UnsafeBuffer(new byte[25]), 0, 25) .name("cleanup.policy") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/config/proxy.log.event.yaml b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/config/proxy.log.event.yaml index f6c94af5cc..4d2bef19db 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/config/proxy.log.event.yaml +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/config/proxy.log.event.yaml @@ -23,7 +23,8 @@ telemetry: events: - qname: test.kafka0 id: binding.mqtt.kafka.non.compact.sessions.topic - message: NON COMPACT SESSIONS TOPIC - Sessions Kafka topic in non-compacted + name: BINDING_MQTT_KAFKA_NON_COMPACT_SESSIONS_TOPIC + message: The Sessions Kafka topic is non-compacted bindings: mqtt0: type: mqtt-kafka