Skip to content

Commit

Permalink
add avro deserializer (#13)
Browse files Browse the repository at this point in the history
Co-authored-by: Sebastian Rabiej <[email protected]>
  • Loading branch information
SebastianRabiej and Sebastian Rabiej authored Jan 25, 2025
1 parent e38b57b commit 04e6dae
Show file tree
Hide file tree
Showing 14 changed files with 287 additions and 7 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ This project is particularly useful for developers who need to monitor traffic d

The following properties need to be set in the `application.properties` file:

- `read.mode`: The mode to use for deserialization (either `proto` or `plaintext`).
- `read.mode`: The mode to use for deserialization (either `proto`, `plaintext` or `avro`).
- `read.subject`: The subject to read messages from.
- `read.proto.pathToDescriptor`: The path to the protobuf descriptor file (only required if `read.mode` is set to `proto`).
- `read.avro.pathToSchema`: The path to the avro schema (only required if `read.mode` is set to `avro`).
- `read.store.limit`: The maximum number of messages to store in memory. Default - 10000
- `read.startDate`: Optional date from which to start reading messages.

Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@
<version>${json.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.12.0</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/otter/jet/avro/AvroMessageDeserializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package otter.jet.avro;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import otter.jet.reader.DeserializedMessage;
import otter.jet.reader.MessageDeserializer;

public class AvroMessageDeserializer implements MessageDeserializer {

private final Schema schema;

public AvroMessageDeserializer(Schema schema) {
this.schema = schema;
}

@Override
public DeserializedMessage deserializeMessage(ByteBuffer buffer) {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(buffer.array(), null);
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);

GenericRecord read = null;
try {
read = reader.read(null, decoder);
} catch (IOException e) {
throw new RuntimeException(e);
}
return new DeserializedMessage(read.getSchema().getName(), read.toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package otter.jet.avro;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParser;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import otter.jet.reader.MessageDeserializer;

@Configuration
@ConditionalOnProperty(value = "read.mode", havingValue = "avro")
public class AvroMessageDeserializerConfiguration {

@Bean
public MessageDeserializer simpleAvroMessageDeserializer(
@Value("${read.avro.pathToSchema}") String pathToSchema) throws IOException {
readSchemaFile(pathToSchema);
Schema schema = new SchemaParser().parse(readSchemaFile(pathToSchema)).mainSchema();
return new AvroMessageDeserializer(schema);
}

private File readSchemaFile(String pathToDesc) throws FileNotFoundException {
File file = new File(pathToDesc);
if (!file.exists()) {
throw new FileNotFoundException("File not found!");
}
return file;
}

}
31 changes: 31 additions & 0 deletions src/test/java/otter/jet/avro/RandomPersonAvro.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package otter.jet.avro;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;

public record RandomPersonAvro(int id, String name, String email, String phoneNumber,
Schema schema) {

public byte[] toByteArray() throws IOException {
GenericData.Record record = new GenericData.Record(schema);
record.put("id", id);
record.put("name", name);
record.put("email", email);
record.put("numbers", List.of(phoneNumber));

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
writer.write(record, encoder);
encoder.flush();

return outputStream.toByteArray();
}
}
77 changes: 77 additions & 0 deletions src/test/java/otter/jet/avro/SimpleAvroMessageReaderTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package otter.jet.avro;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import org.apache.avro.Schema;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.TestPropertySource;
import otter.jet.AbstractIntegrationTest;
import otter.jet.JetStreamContainerInitializer;
import otter.jet.JetStreamUtils;
import otter.jet.assertions.ComparisonConfiguration;
import otter.jet.examples.avro.RandomAvroPersonGenerator;
import otter.jet.reader.ReadMessage;
import otter.jet.reader.ReaderConfigurationProperties;
import otter.jet.store.Filters;
import otter.jet.store.MessageStore;

@TestPropertySource(
properties = {
"read.mode=avro",
"read.subject=avro_person",
"read.avro.pathToSchema=src/test/resources/person.avsc"
})
class SimpleAvroMessageReaderTest extends AbstractIntegrationTest {

private static final LocalDateTime ignoredMessageTimestamp =
LocalDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC);
@Autowired
private MessageStore messageStore;
@Autowired
private ReaderConfigurationProperties readerConfigurationProperties;

@Test
public void shouldReadProtoMessageSentAsSpecificType() throws IOException {
// given
JetStreamUtils.createSubjectStream(
readerConfigurationProperties.getSubject(),
JetStreamContainerInitializer.getNatsServerUrl());
var schema = new Schema.Parser().parse(new File("src/test/resources/person.avsc"));
var person = RandomAvroPersonGenerator.randomPerson(schema);
byte[] data = person.toByteArray();

// when
JetStreamUtils.tryToSendMessage(
data,
readerConfigurationProperties.getSubject(),
JetStreamContainerInitializer.getNatsServerUrl());

// then
await()
.untilAsserted(
() ->
assertThat(messageStore.filter(Filters.empty(), 0, 10))
.usingRecursiveFieldByFieldElementComparator(
ComparisonConfiguration.configureReadMessageComparisonWithJSONBody())
.contains(
new ReadMessage(
readerConfigurationProperties.getSubject(),
"Person",
new JSONObject()
.put("id", person.id())
.put("name", person.name())
.put("email", person.email())
.put("numbers", new JSONArray().put(person.phoneNumber()))
.toString(),
ignoredMessageTimestamp)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package otter.jet.examples.avro;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParser;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import otter.jet.JetStreamUtils;
import otter.jet.avro.RandomPersonAvro;

@Configuration
class AvroMessagePublisherConfiguration {

@Bean
@Profile("!test")
@ConditionalOnProperty(value = "read.mode", havingValue = "avro")
CommandLineRunner simplePublisher(@Value("${nats.server.url}") String serverUrl) {
return (args) -> {
String subject = "avro";
JetStreamUtils.createSubjectStream(subject, serverUrl);

ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();

Schema schema = new SchemaParser().parse(new File("src/test/resources/person.avsc"))
.mainSchema();

scheduledExecutorService.scheduleAtFixedRate(
() -> {
RandomPersonAvro randomPersonAvro = RandomAvroPersonGenerator.randomPerson(schema);
try {
JetStreamUtils.tryToSendMessage(randomPersonAvro.toByteArray(), subject,
serverUrl);
} catch (IOException e) {
throw new RuntimeException(e);
}
},
0,
2,
TimeUnit.SECONDS);
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package otter.jet.examples.avro;

import org.apache.avro.Schema;
import otter.jet.avro.RandomPersonAvro;

public class RandomAvroPersonGenerator {

public static RandomPersonAvro randomPerson(Schema schema) {
var faker = new com.github.javafaker.Faker();
return new RandomPersonAvro(
faker.number().numberBetween(1, Integer.MAX_VALUE),
faker.name().firstName(),
faker.bothify("????##@gmail.com"),
faker.phoneNumber().phoneNumber(),
schema);
}

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package otter.jet.examples;
package otter.jet.examples.protobuf;

import com.github.javafaker.Faker;
import org.jetbrains.annotations.NotNull;
import otter.jet.examples.protobuf.PersonProtos;

public class RandomProtoPersonGenerator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.concurrent.TimeUnit;

import otter.jet.JetStreamUtils;
import otter.jet.examples.RandomProtoPersonGenerator;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import otter.jet.reader.ReadMessage;
import otter.jet.reader.ReaderConfigurationProperties;
import otter.jet.assertions.ComparisonConfiguration;
import otter.jet.examples.RandomProtoPersonGenerator;
import otter.jet.examples.protobuf.RandomProtoPersonGenerator;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import otter.jet.JetStreamContainerInitializer;
import otter.jet.JetStreamUtils;
import otter.jet.assertions.ComparisonConfiguration;
import otter.jet.examples.RandomProtoPersonGenerator;
import otter.jet.examples.protobuf.RandomProtoPersonGenerator;
import otter.jet.examples.protobuf.PersonProtos.Person;
import otter.jet.reader.ReadMessage;
import otter.jet.reader.ReaderConfigurationProperties;
Expand Down
4 changes: 3 additions & 1 deletion src/test/resources/application-local.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
read:
mode: "proto"
mode: "avro"
proto:
pathToDescriptor: "src/test/resources/person.desc"
avro:
pathToSchema: "src/test/resources/person.avsc"
subject: "*"
28 changes: 28 additions & 0 deletions src/test/resources/person.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"type": "record",
"name": "Person",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "numbers",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "email",
"type": [
"null",
"string"
]
}
]
}

0 comments on commit 04e6dae

Please sign in to comment.