diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java index cd04fea67021e..0e2d72d2b3a99 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -27,6 +27,7 @@ public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignme private PulsarService pulsar; @Override public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { +<<<<<<< Updated upstream <<<<<<< Updated upstream NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString())); ======= @@ -39,6 +40,9 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac ======= NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString())); >>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf +>>>>>>> Stashed changes +======= + NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName)); >>>>>>> Stashed changes if (topicName.getDomain().equals(TopicDomain.non_persistent)) { bundle.setHasNonPersistentTopic(true); @@ -48,6 +52,7 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac @Override <<<<<<< Updated upstream +<<<<<<< Updated upstream ======= <<<<<<< HEAD <<<<<<< HEAD @@ -83,6 +88,10 @@ public void init(PulsarService pulsarService) { public void init(PulsarService pulsarService) { this.pulsar = pulsarService; >>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf +======= + public long calculateBundleHashCode(TopicName topicName) { + return getBundleHashFunc().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong(); +>>>>>>> Stashed changes } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 39508ed4b418b..91515ba569ef0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -293,6 +293,7 @@ public CompletableFuture getFullBundleAsync(NamespaceName fqnn) } public long getLongHashCode(String name) { +<<<<<<< Updated upstream <<<<<<< Updated upstream return this.topicBundleAssignmentStrategy.getHashCode(name); ======= @@ -305,6 +306,9 @@ public long getLongHashCode(String name) { ======= return this.topicBundleAssignmentStrategy.getHashCode(name); >>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf +>>>>>>> Stashed changes +======= + return this.topicBundleAssignmentStrategy.calculateBundleHashCode(TopicName.get(name)); >>>>>>> Stashed changes } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java index 0757f11499fc7..4c3821e704d19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java @@ -25,6 +25,7 @@ public interface TopicBundleAssignmentStrategy { NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles); +<<<<<<< Updated upstream <<<<<<< Updated upstream long getHashCode(String name); ======= @@ -39,6 +40,11 @@ default long calculateBundleHashCode(TopicName topicName) { ======= long getHashCode(String name); >>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf +>>>>>>> Stashed changes +======= + default long calculateBundleHashCode(TopicName topicName) { + return Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong(); + } >>>>>>> Stashed changes void init(PulsarService pulsarService); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java index f1dfcc06e19eb..83b8057621f5f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java @@ -87,6 +87,7 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac } @Override +<<<<<<< Updated upstream <<<<<<< HEAD <<<<<<< HEAD public long calculateBundleHashCode(TopicName topicName) { @@ -96,6 +97,9 @@ public long getHashCode(String name) { ======= public long getHashCode(String name) { >>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf +======= + public long calculateBundleHashCode(TopicName topicName) { +>>>>>>> Stashed changes return 0; } @@ -141,6 +145,7 @@ public static class RoundRobinBundleAssigner implements TopicBundleAssignmentStr @Override public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { +<<<<<<< Updated upstream <<<<<<< HEAD <<<<<<< HEAD NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName)); @@ -150,6 +155,9 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac ======= NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString())); >>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf +======= + NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName)); +>>>>>>> Stashed changes if (topicName.getDomain().equals(TopicDomain.non_persistent)) { bundle.setHasNonPersistentTopic(true); } @@ -157,6 +165,7 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac } @Override +<<<<<<< Updated upstream <<<<<<< HEAD <<<<<<< HEAD public long calculateBundleHashCode(TopicName topicName) { @@ -171,6 +180,10 @@ public long getHashCode(String name) { // use topic name without partition id to decide the first hash value TopicName topicName = TopicName.get(name); >>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf +======= + public long calculateBundleHashCode(TopicName topicName) { + // use topic name without partition id to decide the first hash value +>>>>>>> Stashed changes long currentPartitionTopicHash = pulsar.getNamespaceService().getNamespaceBundleFactory().getHashFunc() .hashString(topicName.getPartitionedTopicName(), Charsets.UTF_8).padToLong();