diff --git a/core/src/main/java/kafka/autobalancer/model/AbstractInstanceUpdater.java b/core/src/main/java/kafka/autobalancer/model/AbstractInstanceUpdater.java index 3619081adc..6a2cf7a51e 100644 --- a/core/src/main/java/kafka/autobalancer/model/AbstractInstanceUpdater.java +++ b/core/src/main/java/kafka/autobalancer/model/AbstractInstanceUpdater.java @@ -31,6 +31,15 @@ public abstract class AbstractInstanceUpdater { protected Map metricSampleMap = new HashMap<>(); protected long lastUpdateTimestamp = 0L; protected MetricVersion metricVersion = defaultVersion(); + private final long createTimestamp; + + public AbstractInstanceUpdater() { + this.createTimestamp = System.currentTimeMillis(); + } + + public long createTimestamp() { + return createTimestamp; + } public boolean update(Iterable> metricsMap, long time) { lock.lock(); diff --git a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java index 768276dd44..959e0352a2 100644 --- a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java +++ b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java @@ -32,7 +32,7 @@ public class ClusterModel { protected final Logger logger; private static final String DEFAULT_RACK_ID = "rack_default"; private static final long DEFAULT_MAX_TOLERATED_METRICS_DELAY_MS = 60000L; - + private static final long DEFAULT_METRICS_DELAY_EXEMPTION_TIME_MS = 60000L; /* * Guard the access on cluster structure (read/add/remove for brokers, replicas) */ @@ -78,6 +78,10 @@ Map calculateBrokerLatestMetricsTime() { Map replicaMap = entry.getValue(); for (Map.Entry tpEntry : replicaMap.entrySet()) { TopicPartitionReplicaUpdater replicaUpdater = tpEntry.getValue(); + if (System.currentTimeMillis() - replicaUpdater.createTimestamp() <= DEFAULT_METRICS_DELAY_EXEMPTION_TIME_MS) { + // exempt the newly created partition + continue; + } metricsTimeMap.put(brokerId, Math.min(metricsTimeMap.get(brokerId), replicaUpdater.getLastUpdateTimestamp())); } } diff --git a/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java b/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java index 13a84fec82..4c3bd19433 100644 --- a/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java +++ b/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java @@ -392,7 +392,7 @@ public void testMetricsTime() { Map metricsTimeMap = clusterModel.calculateBrokerLatestMetricsTime(); Assertions.assertEquals(1, metricsTimeMap.size()); - Assertions.assertEquals(now - 2000, metricsTimeMap.get(brokerId)); + Assertions.assertEquals(now, metricsTimeMap.get(brokerId)); } @Test