Skip to content

Commit

Permalink
Remove binding.name parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
bmaidics committed May 21, 2024
1 parent 5982129 commit cf813d3
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void attach(
MqttKafkaBindingConfig kafkaBinding = new MqttKafkaBindingConfig(binding);
bindings.put(binding.id, kafkaBinding);

factories.values().forEach(streamFactory -> streamFactory.onAttached(binding.id, binding.name));
factories.values().forEach(streamFactory -> streamFactory.onAttached(binding.id));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory
private final MqttQoS publishQosMax;
private final String groupIdPrefixFormat;
private final Function<Long, String> supplyNamespace;
private final Function<Long, String> supplyLocalName;

private String serverRef;
private int reconnectAttempt;
Expand Down Expand Up @@ -318,6 +319,7 @@ public MqttKafkaSessionFactory(
this.offsetMetadataHelper = new KafkaOffsetMetadataHelper(new UnsafeBuffer(new byte[context.writeBuffer().capacity()]));
this.groupIdPrefixFormat = config.groupIdPrefixFormat();
this.supplyNamespace = context::supplyNamespace;
this.supplyLocalName = context::supplyLocalName;
}

@Override
Expand Down Expand Up @@ -357,12 +359,12 @@ public MessageConsumer newStream(

@Override
public void onAttached(
long bindingId,
String bindingName)
long bindingId)
{
MqttKafkaBindingConfig binding = supplyBinding.apply(bindingId);
this.serverRef = binding.options.serverRef;
this.groupIdPrefix = String.format(groupIdPrefixFormat, supplyNamespace.apply(bindingId), bindingName);
this.groupIdPrefix =
String.format(groupIdPrefixFormat, supplyNamespace.apply(bindingId), supplyLocalName.apply(bindingId));

if (willAvailable && coreIndex == 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ default void detach(
}

default void onAttached(
long bindingId,
String bindingName)
long bindingId)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private final Object2IntHashMap<String> qosLevels;
private final String groupIdPrefixFormat;
private final Function<Long, String> supplyNamespace;
private final Function<Long, String> supplyLocalName;

private int reconnectAttempt;
private String groupIdPrefix;
Expand Down Expand Up @@ -216,12 +217,12 @@ public MqttKafkaSubscribeFactory(
this.qosLevels.put("2", 2);
this.groupIdPrefixFormat = config.groupIdPrefixFormat();
this.supplyNamespace = context::supplyNamespace;
this.supplyLocalName = context::supplyLocalName;
}

@Override
public void onAttached(
long bindingId,
String bindingName)
long bindingId)
{
if (bootstrapAvailable)
{
Expand All @@ -238,7 +239,8 @@ public void onAttached(
this.qosNames.put(0, new String16FW("0"));
this.qosNames.put(1, new String16FW("1"));
this.qosNames.put(2, new String16FW("2"));
this.groupIdPrefix = String.format(groupIdPrefixFormat, supplyNamespace.apply(bindingId), bindingName);
this.groupIdPrefix =
String.format(groupIdPrefixFormat, supplyNamespace.apply(bindingId), supplyLocalName.apply(bindingId));
}

@Override
Expand Down

0 comments on commit cf813d3

Please sign in to comment.