diff --git a/docker-compose.yml b/docker-compose.yml
index 7c462c978..eec716d7e 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -10,7 +10,13 @@ services:
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
-
+ KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/zookeeper_server_jaas.conf
+ -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
+ -Dzookeeper.allowSaslFailedClients=false
+ -Dzookeeper.requireClientAuthScheme=sasl
+ volumes:
+ - ./zookeeper_server_jaas.conf:/etc/kafka/zookeeper_server_jaas.conf
+
broker:
image: confluentinc/cp-server:7.2.1
hostname: broker
@@ -18,13 +24,14 @@ services:
depends_on:
- zookeeper
ports:
- - "9092:9092"
- - "9101:9101"
+ - "9093:9093" # SASL_PLAINTEXT
+ - "9092:9092" # PLAINTEXT
+ - "9101:9101" # JMX
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SASL_PLAINTEXT:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092,SASL_PLAINTEXT://localhost:9093
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
@@ -39,6 +46,10 @@ services:
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
+ KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf"
+ volumes:
+ - ./kafka_server_jaas.conf:/etc/kafka/secrets/kafka_server_jaas.conf
schema-registry:
image: confluentinc/cp-schema-registry:7.2.1
diff --git a/kafka_server_jaas.conf b/kafka_server_jaas.conf
new file mode 100644
index 000000000..ba70ffcb0
--- /dev/null
+++ b/kafka_server_jaas.conf
@@ -0,0 +1,12 @@
+KafkaServer {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="admin"
+ password="admin-secret"
+ user_admin="admin-secret";
+};
+
+Client {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="admin"
+ password="admin-secret";
+};
\ No newline at end of file
diff --git a/samples/KafkaFlow.Sample.Auth/KafkaFlow.Sample.Auth.csproj b/samples/KafkaFlow.Sample.Auth/KafkaFlow.Sample.Auth.csproj
new file mode 100644
index 000000000..871ffdbe3
--- /dev/null
+++ b/samples/KafkaFlow.Sample.Auth/KafkaFlow.Sample.Auth.csproj
@@ -0,0 +1,24 @@
+
+
+
+ Exe
+ net6.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/samples/KafkaFlow.Sample.Auth/PrintConsoleHandler.cs b/samples/KafkaFlow.Sample.Auth/PrintConsoleHandler.cs
new file mode 100644
index 000000000..5c8962213
--- /dev/null
+++ b/samples/KafkaFlow.Sample.Auth/PrintConsoleHandler.cs
@@ -0,0 +1,19 @@
+namespace KafkaFlow.Sample.Auth;
+
+using System;
+using System.Threading.Tasks;
+using KafkaFlow.TypedHandler;
+
+public class PrintConsoleHandler : IMessageHandler
+{
+ public Task Handle(IMessageContext context, TestMessage message)
+ {
+ Console.WriteLine(
+ "Partition: {0} | Offset: {1} | Message: {2}",
+ context.ConsumerContext.Partition,
+ context.ConsumerContext.Offset,
+ message.Text);
+
+ return Task.CompletedTask;
+ }
+}
\ No newline at end of file
diff --git a/samples/KafkaFlow.Sample.Auth/Program.cs b/samples/KafkaFlow.Sample.Auth/Program.cs
new file mode 100644
index 000000000..1b523adcf
--- /dev/null
+++ b/samples/KafkaFlow.Sample.Auth/Program.cs
@@ -0,0 +1,83 @@
+using KafkaFlow;
+using KafkaFlow.Configuration;
+using KafkaFlow.Producers;
+using KafkaFlow.Sample.Auth;
+using KafkaFlow.Serializer;
+using KafkaFlow.TypedHandler;
+using Microsoft.Extensions.DependencyInjection;
+
+var services = new ServiceCollection();
+
+const string producerName = "PrintConsole";
+const string topicName = "sample-authenticated-topic";
+
+services.AddKafka(
+ kafka => kafka
+ .UseConsoleLog()
+ .AddCluster(
+ cluster => cluster
+ .WithBrokers(new[] { "localhost:9093" })
+ .CreateTopicIfNotExists(topicName, 6, 1)
+ .AddProducer(
+ producerName,
+ producer => producer
+ .DefaultTopic(topicName)
+ .AddMiddlewares(m => m.AddSerializer())
+ )
+ .AddConsumer(
+ consumer => consumer
+ .Topic(topicName)
+ .WithGroupId("print-console-handler")
+ .WithBufferSize(100)
+ .WithWorkersCount(3)
+ .AddMiddlewares(
+ middlewares => middlewares
+ .AddSerializer()
+ .AddTypedHandlers(h => h.AddHandler())
+ )
+ )
+ .WithSecurityInformation(information =>
+ {
+ information.SaslUsername = "admin";
+ information.SaslPassword = "admin-secret";
+ information.SaslMechanism = SaslMechanism.Plain;
+ information.SecurityProtocol = SecurityProtocol.SaslPlaintext;
+ })
+ )
+);
+
+var provider = services.BuildServiceProvider();
+
+var bus = provider.CreateKafkaBus();
+
+await bus.StartAsync();
+
+var producer = provider
+ .GetRequiredService()
+ .GetProducer(producerName);
+
+Console.WriteLine("Type the number of messages to produce or 'exit' to quit:");
+
+while (true)
+{
+ var input = Console.ReadLine();
+
+ if (int.TryParse(input, out var count))
+ {
+ for (var i = 0; i < count; i++)
+ {
+ await producer.ProduceAsync(
+ topicName,
+ Guid.NewGuid().ToString(),
+ new TestMessage { Text = $"Message: {Guid.NewGuid()}"});
+ }
+ }
+
+ if (input!.Equals("exit", StringComparison.OrdinalIgnoreCase))
+ {
+ await bus.StopAsync();
+ break;
+ }
+}
+
+await Task.Delay(3000);
diff --git a/samples/KafkaFlow.Sample.Auth/TestMessage.cs b/samples/KafkaFlow.Sample.Auth/TestMessage.cs
new file mode 100644
index 000000000..e14ac912e
--- /dev/null
+++ b/samples/KafkaFlow.Sample.Auth/TestMessage.cs
@@ -0,0 +1,10 @@
+namespace KafkaFlow.Sample.Auth;
+
+using System.Runtime.Serialization;
+
+[DataContract]
+public class TestMessage
+{
+ [DataMember(Order = 1)]
+ public string? Text { get; set; }
+}
\ No newline at end of file
diff --git a/src/KafkaFlow.sln b/src/KafkaFlow.sln
index a3012d169..1387bafdc 100644
--- a/src/KafkaFlow.sln
+++ b/src/KafkaFlow.sln
@@ -91,6 +91,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.PauseConsu
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.ConsumerThrottling", "..\samples\KafkaFlow.Sample.ConsumerThrottling\KafkaFlow.Sample.ConsumerThrottling.csproj", "{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.Auth", "..\samples\KafkaFlow.Sample.Auth\KafkaFlow.Sample.Auth.csproj", "{C22E1AFB-0491-4F90-BF6C-8EADBC5B89B4}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -225,6 +227,10 @@ Global
{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Release|Any CPU.Build.0 = Release|Any CPU
+ {C22E1AFB-0491-4F90-BF6C-8EADBC5B89B4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C22E1AFB-0491-4F90-BF6C-8EADBC5B89B4}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C22E1AFB-0491-4F90-BF6C-8EADBC5B89B4}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C22E1AFB-0491-4F90-BF6C-8EADBC5B89B4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -264,6 +270,7 @@ Global
{8EAF0D96-F760-4FEF-9237-92779F66482D} = {EF626895-FDAE-4B28-9110-BA85671CBBF2}
{B4A9E7CE-7A37-411E-967E-D9B5FD1A3992} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
{4A16F519-FAF8-432C-AD0A-CC44F7BD392D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
+ {C22E1AFB-0491-4F90-BF6C-8EADBC5B89B4} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB}
diff --git a/zookeeper_server_jaas.conf b/zookeeper_server_jaas.conf
new file mode 100644
index 000000000..2b1754fba
--- /dev/null
+++ b/zookeeper_server_jaas.conf
@@ -0,0 +1,4 @@
+Server {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_admin="admin-secret";
+};
\ No newline at end of file