Skip to content

Commit

Permalink
KAFKA-16021: Eagerly look up StringSerializer encoding during configu…
Browse files Browse the repository at this point in the history
…re (apache#15024)

This commit changes the validation for the correctness of encoding defined in properties from (de)seralization time to initialization time.

Reviewers: Divij Vaidya <[email protected]>
  • Loading branch information
srdo authored Dec 24, 2023
1 parent c0967aa commit ee65ca2
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,42 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Utils;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.StandardCharsets;
import java.nio.charset.UnsupportedCharsetException;
import java.util.Map;

/**
* String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
* value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
* String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
* value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
*/
public class StringDeserializer implements Deserializer<String> {
private String encoding = StandardCharsets.UTF_8.name();
private Charset encoding = StandardCharsets.UTF_8;

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("deserializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
if (encodingValue instanceof String) {
String encodingName = (String) encodingValue;
try {
encoding = Charset.forName(encodingName);
} catch (UnsupportedCharsetException | IllegalCharsetNameException e) {
throw new SerializationException("Unsupported encoding " + encodingName, e);
}
}
}

@Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
if (data == null)
return null;
else
return new String(data, encoding);
}

@Override
Expand All @@ -60,13 +64,9 @@ public String deserialize(String topic, Headers headers, ByteBuffer data) {
return null;
}

try {
if (data.hasArray()) {
return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), encoding);
}
return new String(Utils.toArray(data), encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing ByteBuffer to string due to unsupported encoding " + encoding);
if (data.hasArray()) {
return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), encoding);
}
return new String(Utils.toArray(data), encoding);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,40 @@

import org.apache.kafka.common.errors.SerializationException;

import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.StandardCharsets;
import java.nio.charset.UnsupportedCharsetException;
import java.util.Map;

/**
* String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding,
* value.serializer.encoding or serializer.encoding. The first two take precedence over the last.
*/
public class StringSerializer implements Serializer<String> {
private String encoding = StandardCharsets.UTF_8.name();
private Charset encoding = StandardCharsets.UTF_8;

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
if (encodingValue instanceof String) {
String encodingName = (String) encodingValue;
try {
encoding = Charset.forName(encodingName);
} catch (UnsupportedCharsetException | IllegalCharsetNameException e) {
throw new SerializationException("Unsupported encoding " + encodingName, e);
}
}
}

@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
if (data == null)
return null;
else
return data.getBytes(encoding);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,21 @@ public void stringSerdeShouldSupportDifferentEncodings() {
}
}


@Test
public void stringSerdeConfigureThrowsOnUnknownEncoding() {
String encoding = "encoding-does-not-exist";
try (Serde<String> serDeser = Serdes.String()) {
Map<String, Object> serializerConfigs = new HashMap<>();
serializerConfigs.put("key.serializer.encoding", encoding);
assertThrows(SerializationException.class, () -> serDeser.serializer().configure(serializerConfigs, true));

Map<String, Object> deserializerConfigs = new HashMap<>();
deserializerConfigs.put("key.deserializer.encoding", encoding);
assertThrows(SerializationException.class, () -> serDeser.deserializer().configure(deserializerConfigs, true));
}
}

@SuppressWarnings("unchecked")
@Test
public void listSerdeShouldReturnEmptyCollection() {
Expand Down

0 comments on commit ee65ca2

Please sign in to comment.