From ee65ca22b5cd15f54a852848626dc6fb3d9473d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20D=C3=B8ssing?= Date: Sun, 24 Dec 2023 21:27:59 +0100 Subject: [PATCH] KAFKA-16021: Eagerly look up StringSerializer encoding during configure (#15024) This commit changes the validation for the correctness of encoding defined in properties from (de)seralization time to initialization time. Reviewers: Divij Vaidya --- .../serialization/StringDeserializer.java | 42 +++++++++---------- .../serialization/StringSerializer.java | 28 +++++++------ .../serialization/SerializationTest.java | 15 +++++++ 3 files changed, 52 insertions(+), 33 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java index 935c58889c591..ae3cad199082c 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java @@ -20,17 +20,19 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.utils.Utils; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.IllegalCharsetNameException; import java.nio.charset.StandardCharsets; +import java.nio.charset.UnsupportedCharsetException; import java.util.Map; /** - * String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding, - * value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last. + * String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding, + * value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last. */ public class StringDeserializer implements Deserializer { - private String encoding = StandardCharsets.UTF_8.name(); + private Charset encoding = StandardCharsets.UTF_8; @Override public void configure(Map configs, boolean isKey) { @@ -38,20 +40,22 @@ public void configure(Map configs, boolean isKey) { Object encodingValue = configs.get(propertyName); if (encodingValue == null) encodingValue = configs.get("deserializer.encoding"); - if (encodingValue instanceof String) - encoding = (String) encodingValue; + if (encodingValue instanceof String) { + String encodingName = (String) encodingValue; + try { + encoding = Charset.forName(encodingName); + } catch (UnsupportedCharsetException | IllegalCharsetNameException e) { + throw new SerializationException("Unsupported encoding " + encodingName, e); + } + } } @Override public String deserialize(String topic, byte[] data) { - try { - if (data == null) - return null; - else - return new String(data, encoding); - } catch (UnsupportedEncodingException e) { - throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); - } + if (data == null) + return null; + else + return new String(data, encoding); } @Override @@ -60,13 +64,9 @@ public String deserialize(String topic, Headers headers, ByteBuffer data) { return null; } - try { - if (data.hasArray()) { - return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), encoding); - } - return new String(Utils.toArray(data), encoding); - } catch (UnsupportedEncodingException e) { - throw new SerializationException("Error when deserializing ByteBuffer to string due to unsupported encoding " + encoding); + if (data.hasArray()) { + return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), encoding); } + return new String(Utils.toArray(data), encoding); } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java index ee01f1a590a41..af7b04be231ef 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java @@ -18,8 +18,10 @@ import org.apache.kafka.common.errors.SerializationException; -import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; +import java.nio.charset.IllegalCharsetNameException; import java.nio.charset.StandardCharsets; +import java.nio.charset.UnsupportedCharsetException; import java.util.Map; /** @@ -27,7 +29,7 @@ * value.serializer.encoding or serializer.encoding. The first two take precedence over the last. */ public class StringSerializer implements Serializer { - private String encoding = StandardCharsets.UTF_8.name(); + private Charset encoding = StandardCharsets.UTF_8; @Override public void configure(Map configs, boolean isKey) { @@ -35,19 +37,21 @@ public void configure(Map configs, boolean isKey) { Object encodingValue = configs.get(propertyName); if (encodingValue == null) encodingValue = configs.get("serializer.encoding"); - if (encodingValue instanceof String) - encoding = (String) encodingValue; + if (encodingValue instanceof String) { + String encodingName = (String) encodingValue; + try { + encoding = Charset.forName(encodingName); + } catch (UnsupportedCharsetException | IllegalCharsetNameException e) { + throw new SerializationException("Unsupported encoding " + encodingName, e); + } + } } @Override public byte[] serialize(String topic, String data) { - try { - if (data == null) - return null; - else - return data.getBytes(encoding); - } catch (UnsupportedEncodingException e) { - throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); - } + if (data == null) + return null; + else + return data.getBytes(encoding); } } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index 6f45e30089b5e..c27637ec48714 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -129,6 +129,21 @@ public void stringSerdeShouldSupportDifferentEncodings() { } } + + @Test + public void stringSerdeConfigureThrowsOnUnknownEncoding() { + String encoding = "encoding-does-not-exist"; + try (Serde serDeser = Serdes.String()) { + Map serializerConfigs = new HashMap<>(); + serializerConfigs.put("key.serializer.encoding", encoding); + assertThrows(SerializationException.class, () -> serDeser.serializer().configure(serializerConfigs, true)); + + Map deserializerConfigs = new HashMap<>(); + deserializerConfigs.put("key.deserializer.encoding", encoding); + assertThrows(SerializationException.class, () -> serDeser.deserializer().configure(deserializerConfigs, true)); + } + } + @SuppressWarnings("unchecked") @Test public void listSerdeShouldReturnEmptyCollection() {