Skip to content

Commit

Permalink
[improve][broker] Improve the extensibility of the TopicBundleAssignm…
Browse files Browse the repository at this point in the history
…entStrategy interface class (apache#23773)
  • Loading branch information
rayluoluo committed Jan 7, 2025
1 parent 306d3fc commit b92a611
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignme
private PulsarService pulsar;
@Override
public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {
<<<<<<< Updated upstream
<<<<<<< Updated upstream
NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString()));
=======
Expand All @@ -39,6 +40,9 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac
=======
NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString()));
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
>>>>>>> Stashed changes
=======
NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName));
>>>>>>> Stashed changes
if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
bundle.setHasNonPersistentTopic(true);
Expand All @@ -48,6 +52,7 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac

@Override
<<<<<<< Updated upstream
<<<<<<< Updated upstream
=======
<<<<<<< HEAD
<<<<<<< HEAD
Expand Down Expand Up @@ -83,6 +88,10 @@ public void init(PulsarService pulsarService) {
public void init(PulsarService pulsarService) {
this.pulsar = pulsarService;
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
public long calculateBundleHashCode(TopicName topicName) {
return getBundleHashFunc().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
>>>>>>> Stashed changes
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ public CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn)
}

public long getLongHashCode(String name) {
<<<<<<< Updated upstream
<<<<<<< Updated upstream
return this.topicBundleAssignmentStrategy.getHashCode(name);
=======
Expand All @@ -305,6 +306,9 @@ public long getLongHashCode(String name) {
=======
return this.topicBundleAssignmentStrategy.getHashCode(name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
>>>>>>> Stashed changes
=======
return this.topicBundleAssignmentStrategy.calculateBundleHashCode(TopicName.get(name));
>>>>>>> Stashed changes
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public interface TopicBundleAssignmentStrategy {
NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles);

<<<<<<< Updated upstream
<<<<<<< Updated upstream
long getHashCode(String name);
=======
Expand All @@ -39,6 +40,11 @@ default long calculateBundleHashCode(TopicName topicName) {
=======
long getHashCode(String name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
>>>>>>> Stashed changes
=======
default long calculateBundleHashCode(TopicName topicName) {
return Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
}
>>>>>>> Stashed changes

void init(PulsarService pulsarService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac
}

@Override
<<<<<<< Updated upstream
<<<<<<< HEAD
<<<<<<< HEAD
public long calculateBundleHashCode(TopicName topicName) {
Expand All @@ -96,6 +97,9 @@ public long getHashCode(String name) {
=======
public long getHashCode(String name) {
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
public long calculateBundleHashCode(TopicName topicName) {
>>>>>>> Stashed changes
return 0;
}

Expand Down Expand Up @@ -141,6 +145,7 @@ public static class RoundRobinBundleAssigner implements TopicBundleAssignmentStr

@Override
public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {
<<<<<<< Updated upstream
<<<<<<< HEAD
<<<<<<< HEAD
NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName));
Expand All @@ -150,13 +155,17 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac
=======
NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString()));
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName));
>>>>>>> Stashed changes
if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
bundle.setHasNonPersistentTopic(true);
}
return bundle;
}

@Override
<<<<<<< Updated upstream
<<<<<<< HEAD
<<<<<<< HEAD
public long calculateBundleHashCode(TopicName topicName) {
Expand All @@ -171,6 +180,10 @@ public long getHashCode(String name) {
// use topic name without partition id to decide the first hash value
TopicName topicName = TopicName.get(name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
public long calculateBundleHashCode(TopicName topicName) {
// use topic name without partition id to decide the first hash value
>>>>>>> Stashed changes
long currentPartitionTopicHash =
pulsar.getNamespaceService().getNamespaceBundleFactory().getHashFunc()
.hashString(topicName.getPartitionedTopicName(), Charsets.UTF_8).padToLong();
Expand Down

0 comments on commit b92a611

Please sign in to comment.