Skip to content

Commit

Permalink
feat: add avro serializer and schema registry support
Browse files Browse the repository at this point in the history
  • Loading branch information
Douglas Lima authored and filipeesch committed Feb 19, 2021
1 parent da7d5d2 commit 2fc7985
Show file tree
Hide file tree
Showing 28 changed files with 730 additions and 20 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ init_broker:
@echo command | date
@echo Initializing Kafka broker
docker-compose -f docker-compose.yml up -d
docker exec kafka bash -c "kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-avro;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-json;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-json-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf-gzip-2"

shutdown_broker:
@echo command | date
Expand Down
58 changes: 46 additions & 12 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,55 @@
version: '3'

services:
zookeeper:
image: wurstmeister/zookeeper
image: confluentinc/cp-zookeeper:6.1.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
hostname: zookeeper
kafka:
image: wurstmeister/kafka
command: [start-kafka.sh]
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-server:6.1.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
hostname: kafka
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_PORT: 9092
volumes:
- /var/run/docker.sock:/var/run/docker.sock
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"

schema-registry:
image: confluentinc/cp-schema-registry:6.1.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- "zookeeper"
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"

kafka-tools:
image: confluentinc/cp-kafka:6.1.0
hostname: kafka
container_name: kafka
command: ["tail", "-f", "/dev/null"]
network_mode: "host"
21 changes: 21 additions & 0 deletions samples/KafkaFlow.Sample.Avro/Handlers/AvroMessageHandler1.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
namespace KafkaFlow.Sample.Avro.Handlers
{
using System;
using System.Threading.Tasks;
using KafkaFlow.TypedHandler;
using MessageTypes;

public class AvroMessageHandler1 : IMessageHandler<LogMessages1>
{
public Task Handle(IMessageContext context, LogMessages1 message)
{
Console.WriteLine(
"Partition: {0} | Offset: {1} | Message: {2}",
context.Partition,
context.Offset,
message.Severity.ToString());

return Task.CompletedTask;
}
}
}
21 changes: 21 additions & 0 deletions samples/KafkaFlow.Sample.Avro/Handlers/AvroMessageHandler2.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
namespace KafkaFlow.Sample.Avro.Handlers
{
using System;
using System.Threading.Tasks;
using KafkaFlow.TypedHandler;
using MessageTypes;

public class AvroMessageHandler2 : IMessageHandler<LogMessages2>
{
public Task Handle(IMessageContext context, LogMessages2 message)
{
Console.WriteLine(
"Partition: {0} | Offset: {1} | Message: {2}",
context.Partition,
context.Offset,
message.Message);

return Task.CompletedTask;
}
}
}
28 changes: 28 additions & 0 deletions samples/KafkaFlow.Sample.Avro/KafkaFlow.Sample.Avro.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<IsPackable>false</IsPackable>
<GenerateDocumentationFile>false</GenerateDocumentationFile>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\KafkaFlow.Admin\KafkaFlow.Admin.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Compressor.Gzip\KafkaFlow.Compressor.Gzip.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Compressor\KafkaFlow.Compressor.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.ApacheAvro\KafkaFlow.Serializer.ApacheAvro.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.Json\KafkaFlow.Serializer.Json.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.ProtoBuf\KafkaFlow.Serializer.ProtoBuf.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer\KafkaFlow.Serializer.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.TypedHandler\KafkaFlow.TypedHandler.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow\KafkaFlow.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.5" />
</ItemGroup>

</Project>
27 changes: 27 additions & 0 deletions samples/KafkaFlow.Sample.Avro/MessageTypes/LogLevel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// ------------------------------------------------------------------------------
// <auto-generated>
// Generated by avrogen, version 1.10.0.0
// Changes to this file may cause incorrect behavior and will be lost if code
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace MessageTypes
{
using System;
using System.Collections.Generic;
using System.Text;
using Avro;
using Avro.Specific;

/// <summary>
/// Enumerates the set of allowable log levels.
/// </summary>
public enum LogLevel
{
None,
Verbose,
Info,
Warning,
Error,
}
}
60 changes: 60 additions & 0 deletions samples/KafkaFlow.Sample.Avro/MessageTypes/LogMessages1.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// ------------------------------------------------------------------------------
// <auto-generated>
// Generated by avrogen, version 1.10.0.0
// Changes to this file may cause incorrect behavior and will be lost if code
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace MessageTypes
{
using System;
using System.Collections.Generic;
using System.Text;
using Avro;
using Avro.Specific;

/// <summary>
/// A simple log message type as used by this blog post.
/// </summary>
public partial class LogMessages1 : ISpecificRecord
{
public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"LogMessages1\",\"namespace\":\"MessageTypes\",\"fields\":[{\"nam" +
"e\":\"Severity\",\"type\":{\"type\":\"enum\",\"name\":\"LogLevel\",\"namespace\":\"MessageTypes\"" +
",\"symbols\":[\"None\",\"Verbose\",\"Info\",\"Warning\",\"Error\"]}}]}");
private MessageTypes.LogLevel _Severity;
public virtual Schema Schema
{
get
{
return LogMessages1._SCHEMA;
}
}
public MessageTypes.LogLevel Severity
{
get
{
return this._Severity;
}
set
{
this._Severity = value;
}
}
public virtual object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.Severity;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public virtual void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.Severity = (MessageTypes.LogLevel)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
}
59 changes: 59 additions & 0 deletions samples/KafkaFlow.Sample.Avro/MessageTypes/LogMessages2.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// ------------------------------------------------------------------------------
// <auto-generated>
// Generated by avrogen, version 1.10.0.0
// Changes to this file may cause incorrect behavior and will be lost if code
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace MessageTypes
{
using System;
using System.Collections.Generic;
using System.Text;
using Avro;
using Avro.Specific;

/// <summary>
/// A simple log message type as used by this blog post.
/// </summary>
public partial class LogMessages2 : ISpecificRecord
{
public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"LogMessages2\",\"namespace\":\"MessageTypes\",\"fields\":[{\"nam" +
"e\":\"Message\",\"type\":\"string\"}]}");
private string _Message;
public virtual Schema Schema
{
get
{
return LogMessages2._SCHEMA;
}
}
public string Message
{
get
{
return this._Message;
}
set
{
this._Message = value;
}
}
public virtual object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.Message;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public virtual void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.Message = (System.String)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
}
23 changes: 23 additions & 0 deletions samples/KafkaFlow.Sample.Avro/MessageTypes/logmessages1.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"type": "record",
"name": "LogMessages1",
"namespace": "MessageTypes",
"doc": "A simple log message type as used by this blog post.",
"fields": [
{
"name": "Severity",
"type": {
"type": "enum",
"name": "LogLevel",
"doc": "Enumerates the set of allowable log levels.",
"symbols": [
"None",
"Verbose",
"Info",
"Warning",
"Error"
]
}
}
]
}
12 changes: 12 additions & 0 deletions samples/KafkaFlow.Sample.Avro/MessageTypes/logmessages2.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"type": "record",
"name": "LogMessages2",
"namespace": "MessageTypes",
"doc": "A simple log message type as used by this blog post.",
"fields": [
{
"name": "Message",
"type": "string"
}
]
}
Loading

0 comments on commit 2fc7985

Please sign in to comment.