Skip to content

Commit

Permalink
move the static reason into the event formatter
Browse files Browse the repository at this point in the history
  • Loading branch information
vordimous committed Jun 3, 2024
1 parent 1ba8a8e commit 30430b8
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.MqttKafkaEventExFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
Expand Down Expand Up @@ -52,14 +51,12 @@ public MqttKafkaEventContext(

public void onMqttConnectionReset(
long traceId,
long bindingId,
String16FW reason)
long bindingId)
{
MqttKafkaEventExFW extension = mqttKafkaEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.nonCompactSessionsTopic(e -> e
.typeId(NON_COMPACT_SESSIONS_TOPIC.value())
.reason(reason))
.typeId(NON_COMPACT_SESSIONS_TOPIC.value()))
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public String format(
case NON_COMPACT_SESSIONS_TOPIC:
{
MqttKafkaResetMqttConnectionExFW ex = extension.nonCompactSessionsTopic();
result = String.format("%s", asString(ex.reason()));
result = String.format(
"The MQTT sessions Kafka topic is not log compacted. Update the cleanup policy to enable log compaction."
);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory
public static final int MQTT_NOT_AUTHORIZED = 0x87;
public static final int MQTT_IMPLEMENTATION_SPECIFIC_ERROR = 0x83;
public static final String MQTT_INVALID_SESSION_TIMEOUT_REASON = "Invalid session expiry interval";
public static final String16FW MQTT_NON_COMPACT_SESSIONS_TOPIC = new String16FW("The Sessions Kafka topic is non-compacted");
private static final KafkaConfigFW CONFIG_COMPACT_CLEANUP_POLICY = new KafkaConfigFW.Builder()
.wrap(new UnsafeBuffer(new byte[25]), 0, 25)
.name("cleanup.policy")
Expand Down Expand Up @@ -3469,7 +3468,7 @@ protected void onKafkaBegin(
.build();
delegate.doMqttWindow(authorization, traceId, 0, 0, 0);
delegate.doMqttReset(traceId, mqttResetEx);
events.onMqttConnectionReset(traceId, routedId, MQTT_NON_COMPACT_SESSIONS_TOPIC);
events.onMqttConnectionReset(traceId, routedId);
doKafkaAbort(traceId, authorization);
break onKafkaBegin;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ scope mqtt_kafka

struct MqttKafkaResetMqttConnectionEx extends core::stream::Extension
{
string16 reason;
}

union MqttKafkaEventEx switch (MqttKafkaEventType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ telemetry:
- qname: test.kafka0
id: binding.mqtt.kafka.non.compact.sessions.topic
name: BINDING_MQTT_KAFKA_NON_COMPACT_SESSIONS_TOPIC
message: The Sessions Kafka topic is non-compacted
message: The MQTT sessions Kafka topic is not log compacted. Update the cleanup policy to enable log compaction.
bindings:
mqtt0:
type: mqtt-kafka
Expand Down

0 comments on commit 30430b8

Please sign in to comment.