Skip to content

Commit

Permalink
Merge pull request #388 from TikhomirovSergey/0.24.7-ALPHA
Browse files Browse the repository at this point in the history
0.24.7-ALPHA: improved serialization of ConsumerRecord
  • Loading branch information
TikhomirovSergey authored Feb 7, 2023
2 parents f92f413 + 090a89a commit 52dfcab
Show file tree
Hide file tree
Showing 10 changed files with 376 additions and 33 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

ext {
globalVersion = '0.24.6-ALPHA'
globalVersion = '0.24.7-ALPHA'
}

repositories {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package ru.tinkoff.qa.neptune.kafka.captors;

import com.google.gson.GsonBuilder;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import ru.tinkoff.qa.neptune.core.api.event.firing.captors.StringCaptor;
import ru.tinkoff.qa.neptune.core.api.steps.annotations.Description;
import ru.tinkoff.qa.neptune.kafka.jackson.desrializer.KafkaJacksonModule;

import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -31,8 +32,15 @@ public final class KafkaConsumerRecordsCaptor extends StringCaptor<List<? extend
@Override
public StringBuilder getData(List<? extends ConsumerRecord<?, ?>> caught) {
var result = new StringBuilder();
var mapper = new ObjectMapper();
mapper.registerModule(new KafkaJacksonModule());
caught.forEach(r -> {
var stringToAppend = new GsonBuilder().setPrettyPrinting().create().toJson(r);
String stringToAppend = null;
try {
stringToAppend = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(r);
} catch (Exception e) {
stringToAppend = "Was not serialized because of:" + e.getClass() + " " + e.getMessage();
}
result.append("Consumer Record: ").append(stringToAppend).append("\r\n");
});
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ru.tinkoff.qa.neptune.kafka.captors;

import com.google.gson.GsonBuilder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

interface KafkaValueCaptor {

Expand All @@ -9,7 +10,12 @@ default StringBuilder getData(Object caught) {
if (caught instanceof String) {
value = (String) caught;
} else {
value = new GsonBuilder().setPrettyPrinting().create().toJson(caught);
try {
value = new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(caught);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}
return new StringBuilder(value);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package ru.tinkoff.qa.neptune.kafka.functions.poll;

import com.google.gson.Gson;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import ru.tinkoff.qa.neptune.core.api.steps.annotations.StepParameter;
import ru.tinkoff.qa.neptune.core.api.steps.parameters.ParameterValueGetter;
import ru.tinkoff.qa.neptune.core.api.steps.parameters.StepParameterPojo;
import ru.tinkoff.qa.neptune.kafka.KafkaStepContext;
import ru.tinkoff.qa.neptune.kafka.jackson.desrializer.KafkaJacksonModule;

import java.util.*;
import java.util.function.Function;
Expand Down Expand Up @@ -144,13 +145,19 @@ public String getParameterValue(String[] fieldValue) {

static final class KafkaRecordWrapper<K, V> {

private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new KafkaJacksonModule());

private final ConsumerRecord<K, V> consumerRecord;
private final String recordAsString;

KafkaRecordWrapper(ConsumerRecord<K, V> consumerRecord) {
checkNotNull(consumerRecord);
this.consumerRecord = consumerRecord;
recordAsString = new Gson().toJson(consumerRecord);
try {
recordAsString = MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(consumerRecord);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}

ConsumerRecord<K, V> getConsumerRecord() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ru.tinkoff.qa.neptune.kafka.jackson.desrializer;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;

import java.io.IOException;
import java.util.function.Function;

import static java.util.Objects.nonNull;

@SuppressWarnings("rawtypes")
public class ConsumerRecordSerializer extends JsonSerializer<ConsumerRecord> {

private void writeSerializedProperty(String property,
JsonGenerator gen,
ConsumerRecord<?, ?> consumerRecord,
Function<ConsumerRecord<?, ?>, Object> getPropertyValue) throws IOException {

var propertyValue = getPropertyValue.apply(consumerRecord);
if (nonNull(propertyValue)) {
gen.writeStringField(property, new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(propertyValue));
} else {
gen.writeObjectField(property, null);
}
}

@Override
public void serialize(ConsumerRecord value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeStartObject();
gen.writeStringField("topic", value.topic());
gen.writeNumberField("partition", value.partition());
gen.writeObjectField("leaderEpoch", value.leaderEpoch().orElse(null));
gen.writeNumberField("offset", value.offset());
if (nonNull(value.timestampType())) {
gen.writeObjectField(value.timestampType().toString(), value.timestamp());
}
serializers.findValueSerializer(Headers.class).serialize(value.headers(), gen, serializers);

writeSerializedProperty("key", gen, value, ConsumerRecord::key);
writeSerializedProperty("value", gen, value, ConsumerRecord::value);

gen.writeEndObject();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package ru.tinkoff.qa.neptune.kafka.jackson.desrializer;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.kafka.common.header.Headers;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Set;

import static java.util.Objects.isNull;

public class HeaderJsonSerializer extends JsonSerializer<Headers> {

@Override
public void serialize(Headers value, JsonGenerator gen, SerializerProvider serializers) throws IOException {

if (isNull(value)) {
return;
}

var headerMap = new LinkedHashMap<String, Set<String>>();
value.forEach(header -> {
var set = headerMap.computeIfAbsent(header.key(), s -> new LinkedHashSet<>());
set.add(new String(header.value()));
});
gen.writeObjectField("headers", headerMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ru.tinkoff.qa.neptune.kafka.jackson.desrializer;

import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;

public class KafkaJacksonModule extends SimpleModule {

public KafkaJacksonModule() {
addSerializer(Headers.class, new HeaderJsonSerializer());
addSerializer(ConsumerRecord.class, new ConsumerRecordSerializer());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package ru.tinkoff.qa.neptune.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import ru.tinkoff.qa.neptune.kafka.jackson.desrializer.KafkaJacksonModule;

import java.util.Date;
import java.util.Optional;

import static java.nio.ByteBuffer.allocateDirect;
import static org.apache.kafka.common.record.TimestampType.LOG_APPEND_TIME;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

public class ConsumerRecordDeserializationTest {

@DataProvider
public static Object[][] data() {
var date = new Date();
return new Object[][]{
{new ConsumerRecord<>("testTopic", 1,
1L,
date.getTime(),
LOG_APPEND_TIME,
0,
0,
new DraftDto().setName("Some Key"),
new DraftDto().setName("Some Value"),
new RecordHeaders()
.add(new RecordHeader("header1", "value1".getBytes()))
.add(new RecordHeader("header1", "value2".getBytes()))
.add(new RecordHeader("header2", "value1".getBytes())),
Optional.of(5)),
"{\n" +
" \"topic\" : \"testTopic\",\n" +
" \"partition\" : 1,\n" +
" \"leaderEpoch\" : 5,\n" +
" \"offset\" : 1,\n" +
" \"LogAppendTime\" : " + date.getTime() + ",\n" +
" \"headers\" : {\n" +
" \"header1\" : [ \"value1\", \"value2\" ],\n" +
" \"header2\" : [ \"value1\" ]\n" +
" },\n" +
" \"key\" : \"{\\n \\\"name\\\" : \\\"Some Key\\\"\\n}\",\n" +
" \"value\" : \"{\\n \\\"name\\\" : \\\"Some Value\\\"\\n}\"\n" +
"}"},

{new ConsumerRecord<>("testTopic", 1,
1L,
new Date().getTime(),
null,
0,
0,
null,
null,
new RecordHeaders(),
Optional.empty()),
"{\n" +
" \"topic\" : \"testTopic\",\n" +
" \"partition\" : 1,\n" +
" \"leaderEpoch\" : null,\n" +
" \"offset\" : 1,\n" +
" \"headers\" : { },\n" +
" \"key\" : null,\n" +
" \"value\" : null\n" +
"}"
},

{new ConsumerRecord<>("testTopic", 1,
1L,
new Date().getTime(),
null,
0,
0,
allocateDirect(5),
allocateDirect(5),
new RecordHeaders(),
Optional.empty()),
"{\n" +
" \"topic\" : \"testTopic\",\n" +
" \"partition\" : 1,\n" +
" \"leaderEpoch\" : null,\n" +
" \"offset\" : 1,\n" +
" \"headers\" : { },\n" +
" \"key\" : \"\\\"AAAAAAA=\\\"\",\n" +
" \"value\" : \"\\\"AAAAAAA=\\\"\"\n" +
"}"
},
};
}

@Test(dataProvider = "data")
public void deserializationTest(ConsumerRecord<?, ?> consumerRecord, String expected) throws Exception {
var serialized = new ObjectMapper()
.registerModule(new KafkaJacksonModule())
.writerWithDefaultPrettyPrinter()
.writeValueAsString(consumerRecord);

assertThat(serialized, is(expected));
}
}
Loading

0 comments on commit 52dfcab

Please sign in to comment.