diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaProxyFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaProxyFactory.java index d0e333955b..0a0e4a3ee4 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaProxyFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaProxyFactory.java @@ -75,7 +75,7 @@ public void attach( MqttKafkaBindingConfig kafkaBinding = new MqttKafkaBindingConfig(binding); bindings.put(binding.id, kafkaBinding); - factories.values().forEach(streamFactory -> streamFactory.onAttached(binding.id, binding.name)); + factories.values().forEach(streamFactory -> streamFactory.onAttached(binding.id)); } @Override diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java index ee716f1a8a..617bdf62de 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java @@ -267,6 +267,7 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory private final MqttQoS publishQosMax; private final String groupIdPrefixFormat; private final Function supplyNamespace; + private final Function supplyLocalName; private String serverRef; private int reconnectAttempt; @@ -318,6 +319,7 @@ public MqttKafkaSessionFactory( this.offsetMetadataHelper = new KafkaOffsetMetadataHelper(new UnsafeBuffer(new byte[context.writeBuffer().capacity()])); this.groupIdPrefixFormat = config.groupIdPrefixFormat(); this.supplyNamespace = context::supplyNamespace; + this.supplyLocalName = context::supplyLocalName; } @Override @@ -357,12 +359,12 @@ public MessageConsumer newStream( @Override public void onAttached( - long bindingId, - String bindingName) + long bindingId) { MqttKafkaBindingConfig binding = supplyBinding.apply(bindingId); this.serverRef = binding.options.serverRef; - this.groupIdPrefix = String.format(groupIdPrefixFormat, supplyNamespace.apply(bindingId), bindingName); + this.groupIdPrefix = + String.format(groupIdPrefixFormat, supplyNamespace.apply(bindingId), supplyLocalName.apply(bindingId)); if (willAvailable && coreIndex == 0) { diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaStreamFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaStreamFactory.java index 7e8d99d8e4..bbc9e9c44c 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaStreamFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaStreamFactory.java @@ -30,8 +30,7 @@ default void detach( } default void onAttached( - long bindingId, - String bindingName) + long bindingId) { } diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java index ce9bd2afa2..956d8cf1a9 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java @@ -179,6 +179,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory private final Object2IntHashMap qosLevels; private final String groupIdPrefixFormat; private final Function supplyNamespace; + private final Function supplyLocalName; private int reconnectAttempt; private String groupIdPrefix; @@ -216,12 +217,12 @@ public MqttKafkaSubscribeFactory( this.qosLevels.put("2", 2); this.groupIdPrefixFormat = config.groupIdPrefixFormat(); this.supplyNamespace = context::supplyNamespace; + this.supplyLocalName = context::supplyLocalName; } @Override public void onAttached( - long bindingId, - String bindingName) + long bindingId) { if (bootstrapAvailable) { @@ -238,7 +239,8 @@ public void onAttached( this.qosNames.put(0, new String16FW("0")); this.qosNames.put(1, new String16FW("1")); this.qosNames.put(2, new String16FW("2")); - this.groupIdPrefix = String.format(groupIdPrefixFormat, supplyNamespace.apply(bindingId), bindingName); + this.groupIdPrefix = + String.format(groupIdPrefixFormat, supplyNamespace.apply(bindingId), supplyLocalName.apply(bindingId)); } @Override