From 603d60550c791763621b0b797a20f51de9349ae9 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Sat, 25 Oct 2025 22:57:15 +0800 Subject: [PATCH] MINOR: Improve some requests/responses toString method (#20759) Improve some requests/responses toString method to log only the required info, including the request.Builder toString methods. 1. AlterConfigsRequest 2. AlterUserScramCredentialsRequest 3. ExpireDelegationTokenRequest 4. IncrementalAlterConfigsRequest 5. RenewDelegationTokenRequest 6. SaslAuthenticateRequest 7. createDelegationTokenResponse 8. describeDelegationTokenResponse 9. SaslAuthenticateResponse Reviewers: Chia-Ping Tsai , Manikumar Reddy --- .../common/requests/AlterConfigsRequest.java | 21 ++++++++ .../AlterUserScramCredentialsRequest.java | 23 ++++---- .../CreateDelegationTokenResponse.java | 9 ++++ .../DescribeDelegationTokenResponse.java | 11 ++++ .../ExpireDelegationTokenRequest.java | 14 ++++- .../IncrementalAlterConfigsRequest.java | 24 ++++----- .../requests/RenewDelegationTokenRequest.java | 14 ++++- .../requests/SaslAuthenticateRequest.java | 8 +++ .../requests/SaslAuthenticateResponse.java | 8 +++ .../common/requests/RequestResponseTest.java | 54 +++++++++++++++++-- .../kafka/network/RequestChannelTest.scala | 18 +++++-- 11 files changed, 170 insertions(+), 34 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java index b4d35d52ae35f..df3ce5ee2e700 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java @@ -88,6 +88,11 @@ public Builder(Map configs, boolean validateOnly) { public AlterConfigsRequest build(short version) { return new AlterConfigsRequest(data, version); } + + @Override + public String toString() { + return maskData(data); + } } private final AlterConfigsRequestData data; @@ -135,4 +140,20 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { public static AlterConfigsRequest parse(ByteBuffer buffer, short version) { return new AlterConfigsRequest(new AlterConfigsRequestData(new ByteBufferAccessor(buffer), version), version); } + + // It is not safe to print all config values + private static String maskData(AlterConfigsRequestData data) { + AlterConfigsRequestData tempData = data.duplicate(); + tempData.resources().forEach(resource -> { + resource.configs().forEach(config -> { + config.setValue("REDACTED"); + }); + }); + return tempData.toString(); + } + + @Override + public String toString() { + return maskData(data); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java index c779f6d9a84da..a51aa7dc40641 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java @@ -17,14 +17,10 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData; -import org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter; import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - import java.nio.ByteBuffer; import java.util.List; import java.util.Set; @@ -48,7 +44,7 @@ public AlterUserScramCredentialsRequest build(short version) { @Override public String toString() { - return data.toString(); + return maskData(data); } } @@ -87,15 +83,18 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { return new AlterUserScramCredentialsResponse(new AlterUserScramCredentialsResponseData().setResults(results)); } + private static String maskData(AlterUserScramCredentialsRequestData data) { + AlterUserScramCredentialsRequestData tempData = data.duplicate(); + tempData.upsertions().forEach(upsertion -> { + upsertion.setSalt(new byte[0]); + upsertion.setSaltedPassword(new byte[0]); + }); + return tempData.toString(); + } + // Do not print salt or saltedPassword @Override public String toString() { - JsonNode json = AlterUserScramCredentialsRequestDataJsonConverter.write(data, version()).deepCopy(); - - for (JsonNode upsertion : json.get("upsertions")) { - ((ObjectNode) upsertion).put("salt", ""); - ((ObjectNode) upsertion).put("saltedPassword", ""); - } - return AlterUserScramCredentialsRequestDataJsonConverter.read(json, version()).toString(); + return maskData(data); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java index 0a9f9a8991bdc..287013d3aadb8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java @@ -103,4 +103,13 @@ public boolean hasError() { public boolean shouldClientThrottle(short version) { return version >= 1; } + + // Do not print tokenId and Hmac, overwrite a temp copy of the data with empty content + @Override + public String toString() { + CreateDelegationTokenResponseData tempData = data.duplicate(); + tempData.setTokenId("REDACTED"); + tempData.setHmac(new byte[0]); + return tempData.toString(); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java index a922f056a89aa..87b358249f5d7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java @@ -128,4 +128,15 @@ public boolean hasError() { public boolean shouldClientThrottle(short version) { return version >= 1; } + + // Do not print tokenId and Hmac, overwrite a temp copy of the data with empty content + @Override + public String toString() { + DescribeDelegationTokenResponseData tempData = data.duplicate(); + tempData.tokens().forEach(token -> { + token.setTokenId("REDACTED"); + token.setHmac(new byte[0]); + }); + return tempData.toString(); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java index 3660a45646059..ab4bcd9533e63 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java @@ -74,7 +74,19 @@ public ExpireDelegationTokenRequest build(short version) { @Override public String toString() { - return data.toString(); + return maskData(data); } } + + private static String maskData(ExpireDelegationTokenRequestData data) { + ExpireDelegationTokenRequestData tempData = data.duplicate(); + tempData.setHmac(new byte[0]); + return tempData.toString(); + } + + // Do not print Hmac, overwrite a temp copy of the data with empty content + @Override + public String toString() { + return maskData(data); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java index 54540837756c5..bfac0e4074d73 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java @@ -21,15 +21,11 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource; -import org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map; @@ -77,7 +73,7 @@ public IncrementalAlterConfigsRequest build(short version) { @Override public String toString() { - return data.toString(); + return maskData(data); } } @@ -113,14 +109,18 @@ public AbstractResponse getErrorResponse(final int throttleTimeMs, final Throwab } // It is not safe to print all config values + private static String maskData(IncrementalAlterConfigsRequestData data) { + IncrementalAlterConfigsRequestData tempData = data.duplicate(); + tempData.resources().forEach(resource -> { + resource.configs().forEach(config -> { + config.setValue("REDACTED"); + }); + }); + return tempData.toString(); + } + @Override public String toString() { - JsonNode json = IncrementalAlterConfigsRequestDataJsonConverter.write(data, version()).deepCopy(); - for (JsonNode resource : json.get("resources")) { - for (JsonNode config : resource.get("configs")) { - ((ObjectNode) config).put("value", "REDACTED"); - } - } - return IncrementalAlterConfigsRequestDataJsonConverter.read(json, version()).toString(); + return maskData(data); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java index 963097093dc2d..deeb0dbf23aa0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java @@ -66,7 +66,19 @@ public RenewDelegationTokenRequest build(short version) { @Override public String toString() { - return data.toString(); + return maskData(data); } } + + private static String maskData(RenewDelegationTokenRequestData data) { + RenewDelegationTokenRequestData tempData = data.duplicate(); + tempData.setHmac(new byte[0]); + return tempData.toString(); + } + + // Do not print Hmac, overwrite a temp copy of the data with empty content + @Override + public String toString() { + return maskData(data); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java index 47dd5fd315769..0816744a97c3e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java @@ -78,4 +78,12 @@ public static SaslAuthenticateRequest parse(ByteBuffer buffer, short version) { return new SaslAuthenticateRequest(new SaslAuthenticateRequestData(new ByteBufferAccessor(buffer), version), version); } + + // Do not print authBytes, overwrite a temp copy of the data with empty bytes + @Override + public String toString() { + SaslAuthenticateRequestData tempData = data.duplicate(); + tempData.setAuthBytes(new byte[0]); + return tempData.toString(); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java index d6ca8c170dc45..810595279f07c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java @@ -80,4 +80,12 @@ public SaslAuthenticateResponseData data() { public static SaslAuthenticateResponse parse(ByteBuffer buffer, short version) { return new SaslAuthenticateResponse(new SaslAuthenticateResponseData(new ByteBufferAccessor(buffer), version)); } + + // Do not print authBytes, overwrite a temp copy of the data with empty bytes + @Override + public String toString() { + SaslAuthenticateResponseData tempData = data.duplicate(); + tempData.setAuthBytes(new byte[0]); + return tempData.toString(); + } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 5a9c8193d18ad..af1b46438c1b8 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -3317,7 +3317,7 @@ private DescribeConfigsResponse createDescribeConfigsResponse(short version) { } private AlterConfigsRequest createAlterConfigsRequest(short version) { - Map configs = new HashMap<>(); + Map configs = new LinkedHashMap<>(); List configEntries = asList( new AlterConfigsRequest.ConfigEntry("config_name", "config_value"), new AlterConfigsRequest.ConfigEntry("another_name", "another value") @@ -3325,7 +3325,19 @@ private AlterConfigsRequest createAlterConfigsRequest(short version) { configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new AlterConfigsRequest.Config(configEntries)); configs.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), new AlterConfigsRequest.Config(emptyList())); - return new AlterConfigsRequest.Builder(configs, false).build(version); + AlterConfigsRequest alterConfigsRequest = new AlterConfigsRequest.Builder(configs, false).build(version); + assertEquals( + "AlterConfigsRequestData(resources=[" + + "AlterConfigsResource(resourceType=" + ConfigResource.Type.BROKER.id() + ", " + + "resourceName='0', " + + "configs=[AlterableConfig(name='config_name', value='REDACTED'), " + + "AlterableConfig(name='another_name', value='REDACTED')]), " + + "AlterConfigsResource(resourceType=" + ConfigResource.Type.TOPIC.id() + ", " + + "resourceName='topic', configs=[])], " + + "validateOnly=false)", + alterConfigsRequest.toString() + ); + return alterConfigsRequest; } private AlterConfigsResponse createAlterConfigsResponse() { @@ -3428,7 +3440,12 @@ private CreateDelegationTokenResponse createCreateTokenResponse() { .setMaxTimestampMs(System.currentTimeMillis()) .setTokenId("token1") .setHmac("test".getBytes()); - return new CreateDelegationTokenResponse(data); + CreateDelegationTokenResponse response = new CreateDelegationTokenResponse(data); + + String responseStr = response.toString(); + assertTrue(responseStr.contains("tokenId='REDACTED'")); + assertTrue(responseStr.contains("hmac=[]")); + return response; } private RenewDelegationTokenRequest createRenewTokenRequest(short version) { @@ -3484,7 +3501,14 @@ private DescribeDelegationTokenResponse createDescribeTokenResponse(short versio tokenList.add(new DelegationToken(tokenInfo1, "test".getBytes())); tokenList.add(new DelegationToken(tokenInfo2, "test".getBytes())); - return new DescribeDelegationTokenResponse(version, 20, Errors.NONE, tokenList); + DescribeDelegationTokenResponse response = new DescribeDelegationTokenResponse(version, 20, Errors.NONE, tokenList); + + String responseStr = response.toString(); + String[] parts = responseStr.split(","); + // The 2 token info should both be redacted + assertEquals(2, Arrays.stream(parts).filter(s -> s.trim().contains("tokenId='REDACTED'")).count()); + assertEquals(2, Arrays.stream(parts).filter(s -> s.trim().contains("hmac=[]")).count()); + return response; } private ElectLeadersRequest createElectLeadersRequestNullPartitions() { @@ -4101,4 +4125,26 @@ public void testInvalidTaggedFieldsWithSaslAuthenticateRequest() { parseRequest(SASL_AUTHENTICATE, SASL_AUTHENTICATE.latestVersion(), accessor.buffer())).getMessage(); assertEquals("Error reading byte array of 32767 byte(s): only 3 byte(s) available", msg); } + + @Test + public void testSaslAuthenticateRequestResponseToStringMasksSensitiveData() { + byte[] sensitiveAuthBytes = "sensitive-auth-token-123".getBytes(StandardCharsets.UTF_8); + SaslAuthenticateRequestData requestData = new SaslAuthenticateRequestData().setAuthBytes(sensitiveAuthBytes); + SaslAuthenticateRequest request = new SaslAuthenticateRequest(requestData, (short) 2); + + String requestString = request.toString(); + + // Verify that the authBytes field is present but empty in the output + assertTrue(requestString.contains("authBytes=[]"), + "authBytes field should be empty in toString() output"); + + SaslAuthenticateResponseData responseData = new SaslAuthenticateResponseData().setAuthBytes(sensitiveAuthBytes); + SaslAuthenticateResponse response = new SaslAuthenticateResponse(responseData); + + String responseString = response.toString(); + + // Verify that the authBytes field is present but empty in the output + assertTrue(responseString.contains("authBytes=[]"), + "authBytes field should be empty in toString() output"); + } } diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala index 46bd1cf004a10..7d2073da66cba 100644 --- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala +++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala @@ -62,13 +62,23 @@ class RequestChannelTest { val sensitiveValue = "secret" def verifyConfig(resource: ConfigResource, entries: Seq[ConfigEntry], expectedValues: Map[String, String]): Unit = { - val alterConfigs = request(new AlterConfigsRequest.Builder( - Collections.singletonMap(resource, new Config(entries.asJavaCollection)), true).build()) + val alterConfigs = new AlterConfigsRequest.Builder( + Collections.singletonMap(resource, new Config(entries.asJavaCollection)), true).build() - val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest] + val alterConfigsString = alterConfigs.toString + entries.foreach { entry => + if (!alterConfigsString.contains(entry.name())) { + fail("Config names should be in the request string") + } + if (entry.value() != null && alterConfigsString.contains(entry.value())) { + fail("Config values should not be in the request string") + } + } + val alterConfigsReq = request(alterConfigs) + val loggableAlterConfigs = alterConfigsReq.loggableRequest.asInstanceOf[AlterConfigsRequest] val loggedConfig = loggableAlterConfigs.configs.get(resource) assertEquals(expectedValues, toMap(loggedConfig)) - val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog, alterConfigs.isForwarded).toString + val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigsReq.header, alterConfigsReq.requestLog, alterConfigsReq.isForwarded).toString assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc") }