Skip to content

Commit

Permalink
Add detection of non-compacted session topic (#1044)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmaidics authored May 28, 2024
1 parent 78878be commit 2429256
Show file tree
Hide file tree
Showing 39 changed files with 1,606 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2391,6 +2391,11 @@ function handle_kafka_begin_merged_extension(buffer, offset, ext_subtree)
local ack_mode = kafka_ext_ack_modes[slice_ack_mode_id:le_int()]
ext_subtree:add_le(fields.kafka_ext_ack_mode_id, slice_ack_mode_id)
ext_subtree:add(fields.kafka_ext_ack_mode, ack_mode)
-- configs
local configs_offset = ack_mode_offset + ack_mode_length
local configs_length = resolve_length_of_array(buffer, configs_offset)
dissect_and_add_kafka_config_struct_array(buffer, configs_offset, ext_subtree, fields.kafka_ext_config_array_length,
fields.kafka_ext_config_array_size)
end

function handle_kafka_begin_init_producer_id_extension(buffer, offset, ext_subtree)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Frame 1: 303 bytes on wire (2424 bits), 303 bytes captured (2424 bits)
Frame 1: 311 bytes on wire (2488 bits), 311 bytes captured (2488 bits)
Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00)
Internet Protocol Version 6, Src: fe80::3f3f:0:0:2, Dst: fe80::3f3f:0:0:3
Transmission Control Protocol, Src Port: 0, Dst Port: 7114, Seq: 0, Ack: 1, Len: 229
Transmission Control Protocol, Src Port: 0, Dst Port: 7114, Seq: 0, Ack: 1, Len: 237
Zilla Frame
Frame Type ID: 0x00000001
Frame Type: BEGIN
Expand Down Expand Up @@ -68,18 +68,21 @@ Zilla Frame
Delta Type: NONE (0)
Ack Mode ID: -1
Ack Mode: IN_SYNC_REPLICAS
Configs (0 items)
Length: 4
Size: 0

Frame 2: 211 bytes on wire (1688 bits), 211 bytes captured (1688 bits)
Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00)
Internet Protocol Version 6, Src: fe80::3f3f:0:0:2, Dst: fe80::3f3f:0:0:3
Transmission Control Protocol, Src Port: 0, Dst Port: 7114, Seq: 229, Ack: 1, Len: 137
Transmission Control Protocol, Src Port: 0, Dst Port: 7114, Seq: 237, Ack: 1, Len: 137
Zilla Frame
Frame Type ID: 0x40000002
Frame Type: WINDOW
Protocol Type ID: 0x00000000
Protocol Type:
Worker: 0
Offset: 0x000000c0
Offset: 0x000000c8
Origin ID: 0x0000000100000002
Origin Namespace: test
Origin Binding: app0
Expand All @@ -103,17 +106,17 @@ Zilla Frame
Progress: 0
Progress/Maximum: 0/8192

Frame 3: 303 bytes on wire (2424 bits), 303 bytes captured (2424 bits)
Frame 3: 311 bytes on wire (2488 bits), 311 bytes captured (2488 bits)
Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00)
Internet Protocol Version 6, Src: fe80::3f3f:0:0:3, Dst: fe80::3f3f:0:0:2
Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 1, Ack: 366, Len: 229
Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 1, Ack: 374, Len: 237
Zilla Frame
Frame Type ID: 0x00000001
Frame Type: BEGIN
Protocol Type ID: 0x00000000
Protocol Type:
Worker: 0
Offset: 0x00000120
Offset: 0x00000128
Origin ID: 0x0000000100000002
Origin Namespace: test
Origin Binding: app0
Expand Down Expand Up @@ -173,18 +176,21 @@ Zilla Frame
Delta Type: NONE (0)
Ack Mode ID: -1
Ack Mode: IN_SYNC_REPLICAS
Configs (0 items)
Length: 4
Size: 0

Frame 4: 320 bytes on wire (2560 bits), 320 bytes captured (2560 bits)
Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00)
Internet Protocol Version 6, Src: fe80::3f3f:0:0:3, Dst: fe80::3f3f:0:0:2
Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 230, Ack: 366, Len: 246
Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 238, Ack: 374, Len: 246
Zilla Frame
Frame Type ID: 0x00000005
Frame Type: FLUSH
Protocol Type ID: 0x00000000
Protocol Type:
Worker: 0
Offset: 0x000001e0
Offset: 0x000001f0
Origin ID: 0x0000000100000002
Origin Namespace: test
Origin Binding: app0
Expand Down Expand Up @@ -247,14 +253,14 @@ Zilla Frame
Frame 5: 211 bytes on wire (1688 bits), 211 bytes captured (1688 bits)
Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00)
Internet Protocol Version 6, Src: fe80::3f3f:0:0:3, Dst: fe80::3f3f:0:0:2
Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 476, Ack: 366, Len: 137
Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 484, Ack: 374, Len: 137
Zilla Frame
Frame Type ID: 0x40000002
Frame Type: WINDOW
Protocol Type ID: 0x00000000
Protocol Type:
Worker: 0
Offset: 0x000002b0
Offset: 0x000002c0
Origin ID: 0x0000000100000002
Origin Namespace: test
Origin Binding: app0
Expand All @@ -281,14 +287,14 @@ Zilla Frame
Frame 6: 194 bytes on wire (1552 bits), 194 bytes captured (1552 bits)
Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00)
Internet Protocol Version 6, Src: fe80::3f3f:0:0:2, Dst: fe80::3f3f:0:0:3
Transmission Control Protocol, Src Port: 0, Dst Port: 7114, Seq: 366, Ack: 613, Len: 120
Transmission Control Protocol, Src Port: 0, Dst Port: 7114, Seq: 374, Ack: 621, Len: 120
Zilla Frame
Frame Type ID: 0x00000003
Frame Type: END
Protocol Type ID: 0x00000000
Protocol Type:
Worker: 0
Offset: 0x00000310
Offset: 0x00000320
Origin ID: 0x0000000100000002
Origin Namespace: test
Origin Binding: app0
Expand All @@ -309,14 +315,14 @@ Zilla Frame
Frame 7: 194 bytes on wire (1552 bits), 194 bytes captured (1552 bits)
Ethernet II, Src: Send_00 (20:53:45:4e:44:00), Dst: Receive_00 (20:52:45:43:56:00)
Internet Protocol Version 6, Src: fe80::3f3f:0:0:3, Dst: fe80::3f3f:0:0:2
Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 613, Ack: 486, Len: 120
Transmission Control Protocol, Src Port: 7114, Dst Port: 0, Seq: 621, Ack: 494, Len: 120
Zilla Frame
Frame Type ID: 0x00000003
Frame Type: END
Protocol Type ID: 0x00000000
Protocol Type:
Worker: 0
Offset: 0x00000360
Offset: 0x00000370
Origin ID: 0x0000000100000002
Origin Namespace: test
Origin Binding: app0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.aklivity.zilla.runtime.binding.kafka.internal.stream;

import static io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaCapabilities.FETCH_ONLY;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaCapabilities.PRODUCE_AND_FETCH;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaCapabilities.PRODUCE_ONLY;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType.HISTORICAL;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType.LIVE;
Expand All @@ -39,6 +40,7 @@
import org.agrona.collections.MutableInteger;
import org.agrona.collections.MutableLong;
import org.agrona.collections.MutableReference;
import org.agrona.collections.Object2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
Expand Down Expand Up @@ -1033,6 +1035,7 @@ private final class KafkaMergedStream
private final KafkaIsolation isolation;
private final KafkaDeltaType deltaType;
private final KafkaAckMode ackMode;
private final Object2ObjectHashMap<String16FW, String> configs;

private KafkaOffsetType maximumOffset;
private List<KafkaMergedFilter> filters;
Expand Down Expand Up @@ -1107,6 +1110,7 @@ private final class KafkaMergedStream
this.isolation = isolation;
this.deltaType = deltaType;
this.ackMode = ackMode;
this.configs = new Object2ObjectHashMap<>();
}

private void onMergedMessage(
Expand Down Expand Up @@ -1618,7 +1622,7 @@ else if (capabilities == PRODUCE_ONLY)
else
{
doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, affinity, EMPTY_EXTENSION);
traceId, authorization, affinity, beginExToKafka(beginExToKafkaMergedProduceAndFetch()));
}

doUnmergedFetchReplyWindowsIfNecessary(traceId);
Expand Down Expand Up @@ -1659,6 +1663,13 @@ private Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMergedFetchOnly()
.latestOffset(v)
.metadata(metadataRW.length() > 0 ? metadataRW.toString() : null));
});
String cleanupPolicy = configs.get(CONFIG_NAME_CLEANUP_POLICY);
if (cleanupPolicy != null)
{
builder.configsItem(c ->
c.name(CONFIG_NAME_CLEANUP_POLICY)
.value(cleanupPolicy));
}
};
}

Expand All @@ -1668,6 +1679,29 @@ private Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMergedProduceOnly()
{
builder.capabilities(c -> c.set(PRODUCE_ONLY)).topic(topic);
leadersByPartitionId.intForEach((k, v) -> builder.partitionsItem(i -> i.partitionId(k)));
String cleanupPolicy = configs.get(CONFIG_NAME_CLEANUP_POLICY);
if (cleanupPolicy != null)
{
builder.configsItem(c ->
c.name(CONFIG_NAME_CLEANUP_POLICY)
.value(cleanupPolicy));
}
};
}

private Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMergedProduceAndFetch()
{
return builder ->
{
builder.capabilities(c -> c.set(PRODUCE_AND_FETCH))
.topic(topic);
String cleanupPolicy = configs.get(CONFIG_NAME_CLEANUP_POLICY);
if (cleanupPolicy != null)
{
builder.configsItem(c ->
c.name(CONFIG_NAME_CLEANUP_POLICY)
.value(cleanupPolicy));
}
};
}

Expand Down Expand Up @@ -1952,6 +1986,13 @@ private void onTopicConfigChanged(
long traceId,
ArrayFW<KafkaConfigFW> configs)
{
configs.forEach(c ->
{
if (c.name().equals(CONFIG_NAME_CLEANUP_POLICY))
{
this.configs.put(CONFIG_NAME_CLEANUP_POLICY, c.value().asString());
}
});
metaStream.doMetaInitialBeginIfNecessary(traceId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ public void shouldFetchMergedMessagesWithHeaderFilterAfterCompaction() throws Ex
k3po.finish();
}

@Test
@Configuration("cache.options.merged.yaml")
@Specification({
"${app}/merged.produce.and.fetch.get.cleanup.policy/client",
"${app}/unmerged.produce.and.fetch.get.cleanup.policy/server"})
public void shouldProduceAndFetchMergedGetCompaction() throws Exception
{
k3po.finish();
}

@Ignore("requires k3po parallel reads")
@Test
@Configuration("cache.options.merged.yaml")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.mqtt.kafka.internal;

import static io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.MqttKafkaEventType.NON_COMPACT_SESSIONS_TOPIC;

import java.nio.ByteBuffer;
import java.time.Clock;

import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.MqttKafkaEventExFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;

public class MqttKafkaEventContext
{
private static final int EVENT_BUFFER_CAPACITY = 2048;

private final AtomicBuffer eventBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final AtomicBuffer extensionBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final EventFW.Builder eventRW = new EventFW.Builder();
private final MqttKafkaEventExFW.Builder mqttKafkaEventExRW = new MqttKafkaEventExFW.Builder();
private final int mqttTypeId;
private final int nonCompactSessionsTopicEventId;
private final MessageConsumer eventWriter;
private final Clock clock;

public MqttKafkaEventContext(
EngineContext context)
{
this.mqttTypeId = context.supplyTypeId(MqttKafkaBinding.NAME);
this.nonCompactSessionsTopicEventId = context.supplyEventId("binding.mqtt.kafka.non.compact.sessions.topic");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}

public void onMqttConnectionReset(
long traceId,
long bindingId,
String16FW reason)
{
MqttKafkaEventExFW extension = mqttKafkaEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.nonCompactSessionsTopic(e -> e
.typeId(NON_COMPACT_SESSIONS_TOPIC.value())
.reason(reason))
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(nonCompactSessionsTopicEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();
eventWriter.accept(mqttTypeId, event.buffer(), event.offset(), event.limit());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.mqtt.kafka.internal;

import org.agrona.DirectBuffer;

import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.MqttKafkaEventExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.MqttKafkaResetMqttConnectionExFW;
import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi;

public final class MqttKafkaEventFormatter implements EventFormatterSpi
{
private static final String NON_COMPACT_SESSIONS_TOPIC_FORMAT = "NON COMPACT SESSIONS TOPIC - %s";

private final EventFW eventRO = new EventFW();
private final MqttKafkaEventExFW mqttKafkaEventExRO = new MqttKafkaEventExFW();

MqttKafkaEventFormatter(
Configuration config)
{
}

public String format(
DirectBuffer buffer,
int index,
int length)
{
final EventFW event = eventRO.wrap(buffer, index, index + length);
final MqttKafkaEventExFW extension = mqttKafkaEventExRO
.wrap(event.extension().buffer(), event.extension().offset(), event.extension().limit());
String result = null;
switch (extension.kind())
{
case NON_COMPACT_SESSIONS_TOPIC:
{
MqttKafkaResetMqttConnectionExFW ex = extension.nonCompactSessionsTopic();
result = String.format(NON_COMPACT_SESSIONS_TOPIC_FORMAT, asString(ex.reason()));
break;
}
}
return result;
}

private static String asString(
String16FW stringFW)
{
String s = stringFW.asString();
return s == null ? "" : s;
}
}
Loading

0 comments on commit 2429256

Please sign in to comment.