Skip to content

Commit

Permalink
Avro serializer for Kafka #11
Browse files Browse the repository at this point in the history
  • Loading branch information
zarusz committed Feb 22, 2020
1 parent fcf9e8f commit c27fe65
Show file tree
Hide file tree
Showing 36 changed files with 139 additions and 276 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i
* [Azure EventHubs](docs/provider_azure_eventhubs.md)
* [Redis](docs/provider_redis.md)
* [Memory](docs/provider_memory.md)
* [Serialization Plugins](docs/serialization.md)

## Packages

Expand All @@ -46,10 +47,9 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i
| `SlimMessageBus.Host.Redis` | Transport provider for Redis | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Redis.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Redis) |
| `SlimMessageBus.Host.Memory` | Transport provider implementation for in-process (in memory) message passing (no messaging infrastructure required) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Memory.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Memory) |
| **Serialization** | | |
| `SlimMessageBus.Host.Serialization.Json` | Message serialization adapter for JSON (Json.NET) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Serialization.Json.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.Json) |
| `SlimMessageBus.Host.Serialization.Avro` | Message serialization adapter for Avro (Apache.Avro library) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Serialization.Avro.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.Avro) |
| `SlimMessageBus.Host.Serialization.AvroConvert` | Message serialization adapter for Avro (AvroConvert library) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Serialization.AvroConvert.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.AvroConvert) |
| `SlimMessageBus.Host.Serialization.Routing` | Serialization router that delegates serialization to other serializers based on message type | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Serialization.Routing.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.Routing) |
| `SlimMessageBus.Host.Serialization.Json` | Serialization plugin for JSON (Json.NET library) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Serialization.Json.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.Json) |
| `SlimMessageBus.Host.Serialization.Avro` | Serialization plugin for Avro (Apache.Avro library) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Serialization.Avro.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.Avro) |
| `SlimMessageBus.Host.Serialization.Hybrid` | Plugin that delegates serialization to other serializers based on message type | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Serialization.Hybrid.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.Hybrid) |
| **Container** | | |
| `SlimMessageBus.Host.AspNetCore` | Integration for ASP.NET Core 2.x (DI adapter, config helpers) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AspNetCore.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AspNetCore) |
| `SlimMessageBus.Host.Autofac` | DI adapter for Autofac container | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Autofac.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Autofac) |
Expand Down
3 changes: 1 addition & 2 deletions build/tasks.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ $projects = @(
"SlimMessageBus",
"SlimMessageBus.Host",
"SlimMessageBus.Host.Serialization",
"SlimMessageBus.Host.Serialization.Routing",
"SlimMessageBus.Host.Serialization.Hybrid",
"SlimMessageBus.Host.Serialization.Json",
"SlimMessageBus.Host.Serialization.Avro",
"SlimMessageBus.Host.Serialization.AvroConvert",
"SlimMessageBus.Host.DependencyResolver",
"SlimMessageBus.Host.ServiceLocator",
"SlimMessageBus.Host.Autofac",
Expand Down
24 changes: 24 additions & 0 deletions docs/serialization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Serialization plugins for SlimMessageBus

## Introduction

Please read the [Introduction](intro.md) before reading this serialization documentation.

## Json

Nuget package: [SlimMessageBus.Host.Serialization.Json](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.Json)

> ToDo
## Avro

Nuget package: [SlimMessageBus.Host.Serialization.Avro](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.Avro)

> ToDo
## Hybrid

Nuget package: [SlimMessageBus.Host.Serialization.Hybrid](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.Hybrid)

> ToDo
11 changes: 0 additions & 11 deletions src/Samples/Sample.AvroSer.Messages.CodeFirst/DivideRequest.cs

This file was deleted.

This file was deleted.

5 changes: 0 additions & 5 deletions src/Samples/Sample.AvroSer.Messages.CodeFirst/README.md

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
using Microsoft.Extensions.Configuration;
using Sample.AvroSer.Messages.ContractFirst;
using Sample.AvroSer.Messages.CodeFirst;
using Sample.Serialization.MessagesAvro;
using SecretStore;
using SlimMessageBus;
using SlimMessageBus.Host.Config;
using SlimMessageBus.Host.DependencyResolver;
using SlimMessageBus.Host.Kafka;
using SlimMessageBus.Host.Kafka.Configs;
using SlimMessageBus.Host.Memory;
using SlimMessageBus.Host.Redis;
using SlimMessageBus.Host.Serialization;
using SlimMessageBus.Host.Serialization.Avro;
using SlimMessageBus.Host.Serialization.AvroConvert;
using SlimMessageBus.Host.Serialization.Routing;
using SlimMessageBus.Host.Serialization.Hybrid;
using SlimMessageBus.Host.Serialization.Json;
using System;
using System.Collections.Generic;
using System.Threading;
Expand All @@ -22,7 +19,7 @@ namespace Sample.Avro.ConsoleApp
{
enum Provider
{
Kafka,
//Kafka,
//AzureServiceBus,
//AzureEventHub,
Redis,
Expand All @@ -31,9 +28,8 @@ enum Provider

/// <summary>
/// This sample shows:
/// 1. How tu use the Avro serializer (for contract IDL first apprach)
/// 2. How to use the AvroConvert serializer (for C# code first approach)
/// 3. How to combine two serializer approaches in one app (using the Routing serializer).
/// 1. How tu use the Avro serializer (for contract Avro IDL first apprach to generate C# code)
/// 2. How to combine two serializer approaches in one app (using the Hybrid serializer).
/// </summary>
class Program
{
Expand All @@ -53,7 +49,7 @@ static async Task Main(string[] args)
private static IMessageBus CreateBus(IConfiguration configuration)
{
// Note: remember that Memory provider does not support req-resp yet.
var provider = Provider.Memory;
var provider = Provider.Redis;

/*
var sl = new DictionarySchemaLookupStrategy();
Expand All @@ -76,34 +72,27 @@ private static IMessageBus CreateBus(IConfiguration configuration)
var avroSerializer = new AvroMessageSerializer();

// Avro serialized using the AvroConvert library - no schema generation neeeded upfront.
var avroConvertSerializer = new AvroConvertMessageSerializer();
var jsonSerializer = new JsonMessageSerializer();

// Note: Certain messages will be serialized by one Avro serializer, other using the other Avro serializer
var routingSerializer = new RoutingMessageSerializer(new Dictionary<IMessageSerializer, Type[]>
// Note: Certain messages will be serialized by one Avro serializer, other using the Json serializer
var routingSerializer = new HybridMessageSerializer(new Dictionary<IMessageSerializer, Type[]>
{
[avroConvertSerializer] = new Type[] { typeof(SubtractCommand) }, // the first one will be the default serializer, no need to declare types here
[avroSerializer] = new Type[] { typeof(AddCommand), typeof(MultiplyRequest), typeof(MultiplyResponse) },
[jsonSerializer] = new[] { typeof(SubtractCommand) }, // the first one will be the default serializer, no need to declare types here
[avroSerializer] = new[] { typeof(AddCommand), typeof(MultiplyRequest), typeof(MultiplyResponse) },
});

return MessageBusBuilder.Create()
.Produce<AddCommand>(x => x.DefaultTopic("AddCommand"))
.Consume<AddCommand>(x => x.Topic("AddCommand").WithConsumer<AddCommandConsumer>()
.Group("ConsoleApp") // for Kafka only
)
.Consume<AddCommand>(x => x.Topic("AddCommand").WithConsumer<AddCommandConsumer>())

.Produce<SubtractCommand>(x => x.DefaultTopic("SubtractCommand"))
.Consume<SubtractCommand>(x => x.Topic("SubtractCommand").WithConsumer<SubtractCommandConsumer>()
.Group("ConsoleApp") // for Kafka only
)
.Consume<SubtractCommand>(x => x.Topic("SubtractCommand").WithConsumer<SubtractCommandConsumer>())

.Produce<MultiplyRequest>(x => x.DefaultTopic("MultiplyRequest"))
.Handle<MultiplyRequest, MultiplyResponse>(x => x.Topic("MultiplyRequest").WithHandler<MultiplyRequestHandler>()
.Group("ConsoleApp") // for Kafka only
)
.Handle<MultiplyRequest, MultiplyResponse>(x => x.Topic("MultiplyRequest").WithHandler<MultiplyRequestHandler>())

.ExpectRequestResponses(x => x.ReplyToTopic("ConsoleApp")
.Group("ConsoleApp") // for Kafka only
)

.ExpectRequestResponses(x => x.ReplyToTopic("ConsoleApp"))

.WithSerializer(routingSerializer) // Use Avro for message serialization
.WithDependencyResolver(new LookupDependencyResolver(type =>
Expand Down Expand Up @@ -139,14 +128,14 @@ private static IMessageBus CreateBus(IConfiguration configuration)
// builder.WithProviderEventHub(new EventHubMessageBusSettings(eventHubConnectionString, storageConnectionString, storageContainerName)); // Use Azure Event Hub as provider
// break;

case Provider.Kafka:
// Ensure your Kafka broker is running
var kafkaBrokers = configuration["Kafka:Brokers"];
var kafkaUsername = Secrets.Service.PopulateSecrets(configuration["Kafka:Username"]);
var kafkaPassword = Secrets.Service.PopulateSecrets(configuration["Kafka:Password"]);
//case Provider.Kafka:
// // Ensure your Kafka broker is running
// var kafkaBrokers = configuration["Kafka:Brokers"];
// var kafkaUsername = Secrets.Service.PopulateSecrets(configuration["Kafka:Username"]);
// var kafkaPassword = Secrets.Service.PopulateSecrets(configuration["Kafka:Password"]);

builder.WithProviderKafka(new KafkaMessageBusSettings(kafkaBrokers)); // Or use Apache Kafka as provider
break;
// builder.WithProviderKafka(new KafkaMessageBusSettings(kafkaBrokers)); // Or use Apache Kafka as provider
// break;

case Provider.Redis:
// Ensure your Redis broker is running
Expand Down Expand Up @@ -259,4 +248,14 @@ public async Task<MultiplyResponse> OnHandle(MultiplyRequest request, string nam
return new MultiplyResponse { Result = request.Left * request.Right, OperationId = request.OperationId };
}
}

/// <summary>
/// This will be serialized as JSON.
/// </summary>
public class SubtractCommand
{
public string OperationId { get; set; }
public int Left { get; set; }
public int Right { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AvroConvert" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.1" />
<PackageReference Include="System.Collections" Version="4.3.0" />
Expand All @@ -17,12 +16,11 @@
<ProjectReference Include="..\..\SlimMessageBus.Host.Kafka\SlimMessageBus.Host.Kafka.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.Memory\SlimMessageBus.Host.Memory.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.Redis\SlimMessageBus.Host.Redis.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.Serialization.AvroConvert\SlimMessageBus.Host.Serialization.AvroConvert.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.Serialization.Avro\SlimMessageBus.Host.Serialization.Avro.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.Serialization.Routing\SlimMessageBus.Host.Serialization.Routing.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.Serialization.Hybrid\SlimMessageBus.Host.Serialization.Hybrid.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.Serialization.Json\SlimMessageBus.Host.Serialization.Json.csproj" />
<ProjectReference Include="..\..\Tools\SecretStore\SecretStore.csproj" />
<ProjectReference Include="..\Sample.AvroSer.Messages.CodeFirst\Sample.AvroSer.Messages.CodeFirst.csproj" />
<ProjectReference Include="..\Sample.AvroSer.Messages.ContractFirst\Sample.AvroSer.Messages.ContractFirst.csproj" />
<ProjectReference Include="..\Sample.Serialization.MessagesAvro\Sample.Serialization.MessagesAvro.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@namespace("Sample.AvroSer.Messages.ContractFirst")
@namespace("Sample.Serialization.MessagesAvro")

protocol SampleProtocol {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@namespace("Sample.AvroSer.Messages.ContractFirst")
@namespace("Sample.Serialization.MessagesAvro")

protocol SampleProtocol {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@namespace("Sample.AvroSer.Messages.ContractFirst")
@namespace("Sample.Serialization.MessagesAvro")

protocol SampleProtocol {
import idl "AddCommand.avdl";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"protocol" : "SampleProtocol",
"namespace" : "Sample.AvroSer.Messages.ContractFirst",
"namespace" : "Sample.Serialization.MessagesAvro",
"types" : [ {
"type" : "record",
"name" : "AddCommand",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using SlimMessageBus;

namespace Sample.AvroSer.Messages.ContractFirst
namespace Sample.Serialization.MessagesAvro
{
/// <summary>
/// Adds the marker interface to the generated class <see cref="MultiplyRequest"/>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Apache.Avro" Version="1.9.1" />
<PackageReference Include="Apache.Avro" Version="1.9.2" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace Sample.AvroSer.Messages.ContractFirst
namespace Sample.Serialization.MessagesAvro
{
using System;
using System.Collections.Generic;
Expand All @@ -15,9 +15,9 @@ namespace Sample.AvroSer.Messages.ContractFirst

public partial class AddCommand : ISpecificRecord
{
public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"AddCommand\",\"namespace\":\"Sample.AvroSer.Messages.Contrac" +
"tFirst\",\"fields\":[{\"name\":\"OperationId\",\"type\":\"string\"},{\"name\":\"Left\",\"type\":\"" +
"int\"},{\"name\":\"Right\",\"type\":\"int\"}]}");
public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"AddCommand\",\"namespace\":\"Sample.Serialization.MessagesAv" +
"ro\",\"fields\":[{\"name\":\"OperationId\",\"type\":\"string\"},{\"name\":\"Left\",\"type\":\"int\"" +
"},{\"name\":\"Right\",\"type\":\"int\"}]}");
private string _OperationId;
private int _Left;
private int _Right;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace Sample.AvroSer.Messages.ContractFirst
namespace Sample.Serialization.MessagesAvro
{
using System;
using System.Collections.Generic;
Expand All @@ -15,9 +15,9 @@ namespace Sample.AvroSer.Messages.ContractFirst

public partial class MultiplyRequest : ISpecificRecord
{
public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"MultiplyRequest\",\"namespace\":\"Sample.AvroSer.Messages.Co" +
"ntractFirst\",\"fields\":[{\"name\":\"OperationId\",\"type\":\"string\"},{\"name\":\"Left\",\"ty" +
"pe\":\"int\"},{\"name\":\"Right\",\"type\":\"int\"}]}");
public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"MultiplyRequest\",\"namespace\":\"Sample.Serialization.Messa" +
"gesAvro\",\"fields\":[{\"name\":\"OperationId\",\"type\":\"string\"},{\"name\":\"Left\",\"type\":" +
"\"int\"},{\"name\":\"Right\",\"type\":\"int\"}]}");
private string _OperationId;
private int _Left;
private int _Right;
Expand Down
Loading

0 comments on commit c27fe65

Please sign in to comment.