diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 5e1eaff1162ba..daef31bd26347 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -874,16 +874,6 @@ public class StreamsConfig extends AbstractConfig { public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms"; private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day"; - private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS = - new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.GROUP_PROTOCOL_CONFIG}; - private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS = - new String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG}; - private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS = - new String[] { - ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, - ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, - ProducerConfig.TRANSACTIONAL_ID_CONFIG - }; static { CONFIG = new ConfigDef() @@ -1282,33 +1272,59 @@ public class StreamsConfig extends AbstractConfig { // this is the list of configs for underlying clients // that streams prefer different default values - private static final Map PRODUCER_DEFAULT_OVERRIDES = Map.of(ProducerConfig.LINGER_MS_CONFIG, "100"); + private static final Map KS_DEFAULT_PRODUCER_CONFIGS; + static { + final Map tempProducerDefaultOverrides = new HashMap<>(); + tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100"); + + KS_DEFAULT_PRODUCER_CONFIGS = Collections.unmodifiableMap(tempProducerDefaultOverrides); + } + + private static final Map KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED; + static { + final Map tempProducerDefaultOverrides = new HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS); + tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); + // Reduce the transaction timeout for quicker pending offset expiration on broker side. + tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT); + + KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED = Collections.unmodifiableMap(tempProducerDefaultOverrides); + } - private static final Map PRODUCER_EOS_OVERRIDES; + private static final Map KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED; static { - final Map tempProducerDefaultOverrides = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES); - tempProducerDefaultOverrides.putAll(Map.of( - ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE, - ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true, - // Reduce the transaction timeout for quicker pending offset expiration on broker side. - ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT - )); - PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); + final Map tempProducerDefaultOverrides = new HashMap<>(); + tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null); + + KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED = Collections.unmodifiableMap(tempProducerDefaultOverrides); } - private static final Map CONSUMER_DEFAULT_OVERRIDES = Map.of( - ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000", - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", - "internal.leave.group.on.close", false, - ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic" - ); + private static final Map KS_DEFAULT_CONSUMER_CONFIGS; + static { + final Map tempConsumerDefaultOverrides = new HashMap<>(); + tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); + tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false); + tempConsumerDefaultOverrides.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"); + + KS_DEFAULT_CONSUMER_CONFIGS = Collections.unmodifiableMap(tempConsumerDefaultOverrides); + } - private static final Map CONSUMER_EOS_OVERRIDES; + private static final Map KS_CONTROLLED_CONSUMER_CONFIGS; static { - final Map tempConsumerDefaultOverrides = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES); + final Map tempConsumerDefaultOverrides = new HashMap<>(); + tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + tempConsumerDefaultOverrides.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); + + KS_CONTROLLED_CONSUMER_CONFIGS = Collections.unmodifiableMap(tempConsumerDefaultOverrides); + } + + private static final Map KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED; + static { + final Map tempConsumerDefaultOverrides = new HashMap<>(KS_CONTROLLED_CONSUMER_CONFIGS); tempConsumerDefaultOverrides.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_COMMITTED.toString()); - CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); + + KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED = Collections.unmodifiableMap(tempConsumerDefaultOverrides); } private static final Map ADMIN_CLIENT_OVERRIDES = @@ -1676,10 +1692,7 @@ private Map getCommonConsumerConfigs() { clientProvidedProps.remove(GROUP_PROTOCOL_CONFIG); - checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); - checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS); - - final Map consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES); + final Map consumerProps = new HashMap<>(KS_DEFAULT_CONSUMER_CONFIGS); if (StreamsConfigUtils.eosEnabled(this)) { consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true); } @@ -1692,74 +1705,46 @@ private Map getCommonConsumerConfigs() { return consumerProps; } - private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map clientProvidedProps, - final String[] nonConfigurableConfigs) { - // Streams does not allow users to configure certain consumer/producer configurations, for example, - // enable.auto.commit. In cases where user tries to override such non-configurable - // consumer/producer configurations, log a warning and remove the user defined value from the Map. - // Thus, the default values for these consumer/producer configurations that are suitable for - // Streams will be used instead. - - final String nonConfigurableConfigMessage = "Unexpected user-specified {} config '{}' found. {} setting ({}) will be ignored and the Streams default setting ({}) will be used."; - final String eosMessage = "'" + PROCESSING_GUARANTEE_CONFIG + "' is set to \"" + getString(PROCESSING_GUARANTEE_CONFIG) + "\". Hence, user"; - - for (final String config: nonConfigurableConfigs) { - if (clientProvidedProps.containsKey(config)) { - - if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) { - if (!clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config))) { - log.error( - nonConfigurableConfigMessage, - "consumer", - config, - "User", - clientProvidedProps.get(config), - CONSUMER_DEFAULT_OVERRIDES.get(config) - ); - clientProvidedProps.remove(config); - } - } else if (eosEnabled) { - if (CONSUMER_EOS_OVERRIDES.containsKey(config)) { - if (!clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) { - log.warn( - nonConfigurableConfigMessage, - "consumer", - config, - eosMessage, - clientProvidedProps.get(config), - CONSUMER_EOS_OVERRIDES.get(config) - ); - clientProvidedProps.remove(config); - } - } else if (PRODUCER_EOS_OVERRIDES.containsKey(config)) { - if (!clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) { - log.warn( - nonConfigurableConfigMessage, - "producer", - config, - eosMessage, - clientProvidedProps.get(config), - PRODUCER_EOS_OVERRIDES.get(config) - ); - clientProvidedProps.remove(config); - } - } else if (ProducerConfig.TRANSACTIONAL_ID_CONFIG.equals(config)) { - log.warn( - nonConfigurableConfigMessage, - "producer", - config, - eosMessage, - clientProvidedProps.get(config), - "-" - ); - clientProvidedProps.remove(config); - } - } + private void overwritePropertyMap(final Map props, final String configName, final Object unmodifiableValue, final Object unmodifiableLogValue, final String clientType) { + final String overwritePropertyLogMessage = "Unexpected {} config `{}` found. User setting ({}) will be ignored and the Kafka Streams setting ({}) will be used"; + + if (props.containsKey(configName) && (!Objects.equals(props.get(configName), unmodifiableValue))) { + log.warn(overwritePropertyLogMessage, clientType, configName, props.get(configName), unmodifiableLogValue); + } + + props.put(configName, unmodifiableValue); + } + + private void overwritePropertyMap(final Map props, final String configName, final Object unmodifiableValue, final String clientType) { + overwritePropertyMap(props, configName, unmodifiableValue, unmodifiableValue, clientType); + } + + private void validateConsumerPropertyMap(final Map props) { + if (eosEnabled) { + for (final Map.Entry entry : KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED.entrySet()) { + overwritePropertyMap(props, entry.getKey(), entry.getValue(), "consumer"); + } + } else { + for (final Map.Entry entry : KS_CONTROLLED_CONSUMER_CONFIGS.entrySet()) { + overwritePropertyMap(props, entry.getKey(), entry.getValue(), "consumer"); } } + } + private void validateProducerPropertyMap(final Map props) { if (eosEnabled) { - verifyMaxInFlightRequestPerConnection(clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)); + for (final Map.Entry entry : KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED.entrySet()) { + if (ProducerConfig.TRANSACTIONAL_ID_CONFIG.equals(entry.getKey())) { + // Special handling for transactional.id logging + final String transactionalIdExpression = getString(PROCESSING_GUARANTEE_CONFIG).equals(EXACTLY_ONCE_V2) ? + "--" : "-"; + overwritePropertyMap(props, entry.getKey(), entry.getValue(), transactionalIdExpression, "producer"); + } else { + overwritePropertyMap(props, entry.getKey(), entry.getValue(), "producer"); + } + } + // Verify max.in.flight.requests.per.connection + verifyMaxInFlightRequestPerConnection(props.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)); } } @@ -1807,6 +1792,8 @@ public Map getMainConsumerConfigs(final String groupId, final St final Map mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX); consumerProps.putAll(mainConsumerProps); + validateConsumerPropertyMap(consumerProps); + // this is a hack to work around StreamsConfig constructor inside StreamsPartitionAssignor to avoid casting consumerProps.put(APPLICATION_ID_CONFIG, groupId); @@ -1835,9 +1822,6 @@ public Map getMainConsumerConfigs(final String groupId, final St consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); consumerProps.put(TASK_ASSIGNOR_CLASS_CONFIG, getString(TASK_ASSIGNOR_CLASS_CONFIG)); - // disable auto topic creation - consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); - // verify that producer batch config is no larger than segment size, then add topic configs required for creating topics final Map topicProps = originalsWithPrefix(TOPIC_PREFIX, false); final Map producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()); @@ -1882,6 +1866,8 @@ public Map getRestoreConsumerConfigs(final String clientId) { final Map restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX); baseConsumerProps.putAll(restoreConsumerProps); + validateConsumerPropertyMap(baseConsumerProps); + // no need to set group id for a restore consumer baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG); // no need to set instance id for a restore consumer @@ -1915,6 +1901,8 @@ public Map getGlobalConsumerConfigs(final String clientId) { final Map globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX); baseConsumerProps.putAll(globalConsumerProps); + validateConsumerPropertyMap(baseConsumerProps); + // no need to set group id for a global consumer baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG); // no need to set instance id for a restore consumer @@ -1939,13 +1927,13 @@ public Map getGlobalConsumerConfigs(final String clientId) { public Map getProducerConfigs(final String clientId) { final Map clientProvidedProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()); - checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS); - // generate producer configs from original properties and overridden maps - final Map props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES); + final Map props = new HashMap<>(eosEnabled ? KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED : KS_DEFAULT_PRODUCER_CONFIGS); props.putAll(getClientCustomProps()); props.putAll(clientProvidedProps); + validateProducerPropertyMap(props); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG)); // add client id with stream client id prefix props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 6505cde08ed6e..b8aaac432bfd1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -286,6 +286,66 @@ public void testGetRestoreConsumerConfigs() { assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG)); } + @Test + public void shouldResetToDefaultIfMainConsumerAllowAutoCreateTopicsIsOverridden() { + props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLogger(StreamsConfig.class, Level.WARN); + + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); + + // Verify the controlled value is enforced + assertEquals("false", consumerConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG)); + + // Verify warning is logged when user tries to override controlled config + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("Unexpected consumer config `allow.auto.create.topics` found") && + msg.contains("User setting (true) will be ignored"))); + } + } + + @Test + public void shouldResetToDefaultIfRestoreConsumerAllowAutoCreateTopicsIsOverridden() { + props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLogger(StreamsConfig.class, Level.WARN); + + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId); + + // Verify the controlled value is enforced + assertEquals("false", consumerConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG)); + + // Verify warning is logged when user tries to override controlled config + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("Unexpected consumer config `allow.auto.create.topics` found") && + msg.contains("User setting (true) will be ignored"))); + } + } + + @Test + public void shouldResetToDefaultIfGlobalConsumerAllowAutoCreateTopicsIsOverridden() { + props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLogger(StreamsConfig.class, Level.WARN); + + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map consumerConfigs = streamsConfig.getGlobalConsumerConfigs(clientId); + + // Verify the controlled value is enforced + assertEquals("false", consumerConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG)); + + // Verify warning is logged when user tries to override controlled config + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("Unexpected consumer config `allow.auto.create.topics` found") && + msg.contains("User setting (true) will be ignored"))); + } + } + @SuppressWarnings("resource") @Test public void defaultSerdeShouldBeConfigured() {