Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sasl sample #420

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,28 @@ services:
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/zookeeper_server_jaas.conf
-Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
-Dzookeeper.allowSaslFailedClients=false
-Dzookeeper.requireClientAuthScheme=sasl
volumes:
- ./zookeeper_server_jaas.conf:/etc/kafka/zookeeper_server_jaas.conf

broker:
image: confluentinc/cp-server:7.2.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
- "9093:9093" # SASL_PLAINTEXT
- "9092:9092" # PLAINTEXT
- "9101:9101" # JMX
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SASL_PLAINTEXT:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092,SASL_PLAINTEXT://localhost:9093
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
Expand All @@ -39,6 +46,10 @@ services:
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf"
volumes:
- ./kafka_server_jaas.conf:/etc/kafka/secrets/kafka_server_jaas.conf

schema-registry:
image: confluentinc/cp-schema-registry:7.2.1
Expand Down
12 changes: 12 additions & 0 deletions kafka_server_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};

Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
24 changes: 24 additions & 0 deletions samples/KafkaFlow.Sample.Auth/KafkaFlow.Sample.Auth.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.JsonCore\KafkaFlow.Serializer.JsonCore.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.ProtobufNet\KafkaFlow.Serializer.ProtobufNet.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer\KafkaFlow.Serializer.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.TypedHandler\KafkaFlow.TypedHandler.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow\KafkaFlow.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.32" />
</ItemGroup>

</Project>
19 changes: 19 additions & 0 deletions samples/KafkaFlow.Sample.Auth/PrintConsoleHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace KafkaFlow.Sample.Auth;

using System;
using System.Threading.Tasks;
using KafkaFlow.TypedHandler;

public class PrintConsoleHandler : IMessageHandler<TestMessage>
{
public Task Handle(IMessageContext context, TestMessage message)
{
Console.WriteLine(
"Partition: {0} | Offset: {1} | Message: {2}",
context.ConsumerContext.Partition,
context.ConsumerContext.Offset,
message.Text);

return Task.CompletedTask;
}
}
83 changes: 83 additions & 0 deletions samples/KafkaFlow.Sample.Auth/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using KafkaFlow;
using KafkaFlow.Configuration;
using KafkaFlow.Producers;
using KafkaFlow.Sample.Auth;
using KafkaFlow.Serializer;
using KafkaFlow.TypedHandler;
using Microsoft.Extensions.DependencyInjection;

var services = new ServiceCollection();

const string producerName = "PrintConsole";
const string topicName = "sample-authenticated-topic";

services.AddKafka(
kafka => kafka
.UseConsoleLog()
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9093" })
.CreateTopicIfNotExists(topicName, 6, 1)
.AddProducer(
producerName,
producer => producer
.DefaultTopic(topicName)
.AddMiddlewares(m => m.AddSerializer<ProtobufNetSerializer>())
)
.AddConsumer(
consumer => consumer
.Topic(topicName)
.WithGroupId("print-console-handler")
.WithBufferSize(100)
.WithWorkersCount(3)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ProtobufNetSerializer>()
.AddTypedHandlers(h => h.AddHandler<PrintConsoleHandler>())
)
)
.WithSecurityInformation(information =>
{
information.SaslUsername = "admin";
information.SaslPassword = "admin-secret";
information.SaslMechanism = SaslMechanism.Plain;
information.SecurityProtocol = SecurityProtocol.SaslPlaintext;
})
)
);

var provider = services.BuildServiceProvider();

var bus = provider.CreateKafkaBus();

await bus.StartAsync();

var producer = provider
.GetRequiredService<IProducerAccessor>()
.GetProducer(producerName);

Console.WriteLine("Type the number of messages to produce or 'exit' to quit:");

while (true)
{
var input = Console.ReadLine();

if (int.TryParse(input, out var count))
{
for (var i = 0; i < count; i++)
{
await producer.ProduceAsync(
topicName,
Guid.NewGuid().ToString(),
new TestMessage { Text = $"Message: {Guid.NewGuid()}"});
}
}

if (input!.Equals("exit", StringComparison.OrdinalIgnoreCase))
{
await bus.StopAsync();
break;
}
}

await Task.Delay(3000);
10 changes: 10 additions & 0 deletions samples/KafkaFlow.Sample.Auth/TestMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace KafkaFlow.Sample.Auth;

using System.Runtime.Serialization;

[DataContract]
public class TestMessage
{
[DataMember(Order = 1)]
public string? Text { get; set; }
}
7 changes: 7 additions & 0 deletions src/KafkaFlow.sln
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.PauseConsu
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.ConsumerThrottling", "..\samples\KafkaFlow.Sample.ConsumerThrottling\KafkaFlow.Sample.ConsumerThrottling.csproj", "{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.Auth", "..\samples\KafkaFlow.Sample.Auth\KafkaFlow.Sample.Auth.csproj", "{C22E1AFB-0491-4F90-BF6C-8EADBC5B89B4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -225,6 +227,10 @@ Global
{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Release|Any CPU.Build.0 = Release|Any CPU
{C22E1AFB-0491-4F90-BF6C-8EADBC5B89B4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C22E1AFB-0491-4F90-BF6C-8EADBC5B89B4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C22E1AFB-0491-4F90-BF6C-8EADBC5B89B4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C22E1AFB-0491-4F90-BF6C-8EADBC5B89B4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -264,6 +270,7 @@ Global
{8EAF0D96-F760-4FEF-9237-92779F66482D} = {EF626895-FDAE-4B28-9110-BA85671CBBF2}
{B4A9E7CE-7A37-411E-967E-D9B5FD1A3992} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
{4A16F519-FAF8-432C-AD0A-CC44F7BD392D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
{C22E1AFB-0491-4F90-BF6C-8EADBC5B89B4} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB}
Expand Down
4 changes: 4 additions & 0 deletions zookeeper_server_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="admin-secret";
};