Skip to content

KAFKA-19432: Add an ERROR log message if broker.heartbeat.interval.ms is too large #20046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,11 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms" +
" to prevent frequent changes in ISR")

if (brokerHeartbeatIntervalMs * 2 > brokerSessionTimeoutMs) {
error(s"${KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG} ($brokerHeartbeatIntervalMs ms) must be less than or equal to half of the ${KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG} ($brokerSessionTimeoutMs ms). " +
s"Please increase ${KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG} or decrease ${KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG}.")
}

val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(l => ListenerName.normalised(l.listener)).toSet

// validate KRaft-related configs
Expand Down
16 changes: 16 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.{CompressionType, Records}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.utils.LogCaptureAppender
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
Expand All @@ -40,11 +41,13 @@ import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfi
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.storage.internals.log.CleanerConfig
import org.apache.logging.log4j.Level
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable

import scala.jdk.CollectionConverters._
import scala.util.Using

class KafkaConfigTest {

Expand Down Expand Up @@ -1886,4 +1889,17 @@ class KafkaConfigTest {
val message = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage
assertEquals("requirement failed: controller.listener.names must contain at least one value appearing in the 'listeners' configuration when running the KRaft controller role", message)
}

@Test
def testLogBrokerHeartbeatIntervalMsShouldBeLowerThanHalfOfBrokerSessionTimeoutMs(): Unit = {
val props = createDefaultConfig()
Using.resource(LogCaptureAppender.createAndRegister) { appender =>
appender.setClassLogger(KafkaConfig.getClass, Level.ERROR)
props.setProperty(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, "4500")
props.setProperty(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG, "8999")
KafkaConfig.fromProps(props)
assertTrue(appender.getMessages.contains("broker.heartbeat.interval.ms (4500 ms) must be less than or equal to half of the broker.session.timeout.ms (8999 ms). " +
"Please increase broker.session.timeout.ms or decrease broker.heartbeat.interval.ms."))
}
}
}