Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TLS support for bootstrap server configuration #41

Open
ashrivastavagit opened this issue Apr 14, 2022 · 24 comments
Open

TLS support for bootstrap server configuration #41

ashrivastavagit opened this issue Apr 14, 2022 · 24 comments

Comments

@ashrivastavagit
Copy link

ashrivastavagit commented Apr 14, 2022

Currently it seems there is support to use TLS for bootstrapServers. We have Kafka brokers where TLS and authentication enabled to connect the broker server.

Can we have this support, so that we can use this helm chart to publish Kafka logs on Sumologic.

@vsinghal13
Copy link
Collaborator

TLS support is already available.
The extra volumes example refers to the truststore/keystore mounts needed and the settings just need to be put into the conf identified by endpointsSecret
https://github.com/SumoLogic/sumologic-kafka-push/tree/main/helm#extra-volumes

@vsinghal13
Copy link
Collaborator

you would include something like:

akka: {
  kafka.consumer: {
    security.protocol: SSL
    ssl.truststore.location: /opt/kafka/config/kafka.truststore.jks
    ssl.truststore.password: trustore_password
    ssl.keystore.location: /opt/kafka/config/client.keystore.jks
    ssl.keystore.password: keystore_password
    ssl.key.password: key_password
    ssl.enabled.protocols: TLSv1.2,TLSv1.1,TLSv1
    ssl.client.auth: required
  }
}

@ashrivastavagit
Copy link
Author

ashrivastavagit commented Apr 25, 2022 via email

@vsinghal13
Copy link
Collaborator

you can take a look at: https://github.com/SumoLogic/sumologic-kafka-push/blob/main/helm/README.md#configuration

Basically, the additional conf needs to be put in a file and that file will be specified at endpointsSecret parameter in the helm chart.

@ashrivastavagit
Copy link
Author

ashrivastavagit commented Apr 25, 2022 via email

@ashrivastavagit
Copy link
Author

ashrivastavagit commented Apr 27, 2022

@vsinghal13 I have added following configuration
Chart specification
spec:
chart: kafka-push
version: "0.2.5"
sourceRef:
kind: HelmRepository
name: sumologic-kafka-push

Below are the values that we are adding to override values file.

extraVolumes:
- name: instaclustr-cert
secret:
defaultMode: 420
secretName: insta-cert
extraVolumeMounts:
- mountPath: /etc/ssl/certs/kafka
name: instaclustr-cert
readOnly: true
endpointsSecret: akka-conf

extraVolumes & extraVolumeMounts are being used to add Kafka TLS certificate file.

And endpointsSecret, i am having below configuration and kept in one file i.e. "akka.conf" but that is created as secret since endpointSecret are being used as a secret in your helm chart.

akka: {
kafka.consumer: {
security.protocol: SSL
ssl.truststore.location: /etc/ssl/certs/kafka/truststore.jks
ssl.truststore.password: instaclustr
ssl.keystore.location: /etc/ssl/certs/kafka/keystore.jks
ssl.keystore.password: instaclustr
ssl.key.password: instaclustr
ssl.enabled.protocols: TLSv1.2,TLSv1.1,TLSv1
ssl.client.auth: required
sasl.jaas.config: "org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";"
}
}

While deploying this changes, I am getting below logs, & I think connection has not been made to kafka cluster & still having issues.

2022-04-27 00:51:28 WARN [default-dispatcher-21] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.92:9093 (id: -4 rack: null) disconnected
2022-04-27 00:51:28 WARN [default-dispatcher-14] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.100:9093 (id: -3 rack: null) disconnected
2022-04-27 00:51:28 WARN [default-dispatcher-17] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.27:9093 (id: -1 rack: null) disconnected
2022-04-27 00:51:29 WARN [default-dispatcher-25] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.137:9093 (id: -5 rack: null) disconnected
2022-04-27 00:51:29 WARN [default-dispatcher-26] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.40:9093 (id: -2 rack: null) disconnected

Could you please verify and let me know if I am missing any thing here or need some enhancement.
PS : SSL and authentication is enable for kafka server.

@vsinghal13
Copy link
Collaborator

were you able to run helm upgrade successfully after the changes that you made?

@ashrivastavagit
Copy link
Author

Yes

@ddaughertysl
Copy link
Collaborator

On pod startup you should see in the logs the kafka consumer configuration. Does it match with what you are setting in the endpointsSecret? The secret should also be formatted as follows:

{
"application.conf": "{"akka":{"kafka.consumer": { ... } }"
}

@ashrivastavagit
Copy link
Author

@ddaughertysl Here is my finding

This is the actual secret file configuration
{
"application.conf":
"{
"akka":{
"kafka.consumer":
"security.protocol": "SSL"
"ssl.truststore.location": "/etc/ssl/certs/kafka/truststore.jks"
"ssl.truststore.password": "instaclustr"
"ssl.keystore.location": "/etc/ssl/certs/kafka/truststore.jks"
"ssl.keystore.password": ""
"ssl.key.password": "
"
"ssl.enabled.protocols": "TLSv1.2,TLSv1.1,TLSv1"
"ssl.client.auth": "required"
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";"
}"
}

External Secret is getting created with this name akka-conf
akka.conf file is created and configured as
endpointsSecret: akka-conf

But still getting this from pod logs

2022-04-27 16:25:09 INFO [default-dispatcher-10] o.a.k.c.c.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.220.5.27:9093, 10.220.5.40:9093, 10.220.5.100:9093, 10.220.5.92:9093, 10.220.5.137:9093, 10.220.5.178:9093]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-kafka-push-logs-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = kafka-push-logs
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class com.sumologic.sumopush.serde.KubernetesLogEventSerde$

2022-04-27 16:25:10 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka version: 2.7.0
2022-04-27 16:25:10 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka commitId: 448719dc99a19793
2022-04-27 16:25:10 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka startTimeMs: 1651076710027
2022-04-27 16:25:10 INFO [default-dispatcher-10] o.a.k.c.c.KafkaConsumer - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Subscribed to pattern: 'v2dev-instaclustr.*-logs'
2022-04-27 16:25:10 WARN [default-dispatcher-14] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.100:9093 (id: -3 rack: null) disconnected

As per log, it seems value is not being override..

@vsinghal13
Copy link
Collaborator

Please have a look at #45

@aporwal3
Copy link

@vsinghal13 Have applied the same configuration as what you have mentioned in your PR. But still the SSL configuration dont works in our environment. We have other applications as well on our cluster which uses the same SSL configuration and it does work on them.

Also did performed the troubleshooting steps as well and all looks correct from that front.

So can you please advise as what might be going wrong ?

Logs:
`2022-06-24 11:28:34 INFO [.default-dispatcher-3] a.e.s.Slf4jLogger - Slf4jLogger started
2022-06-24 11:28:35 INFO [.default-dispatcher-7] a.k.i.SingleSourceLogic - [066ce] Starting. StageActor Actor[akka://sumo-push-main/system/Materializers/StreamSupervisor-0/$$c#97301754]
2022-06-24 11:28:35 INFO [default-dispatcher-10] o.a.k.c.c.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.2x0.x.27:9093, 10.2x0.x.40:9093, 10.2x0.x.137:9093, 10.2x0.x.178:9093]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-kafka-push-logs-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = kafka-push-logs
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = /etc/ssl/certs/kafka/truststore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = /etc/ssl/certs/kafka/truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class com.sumologic.sumopush.serde.KubernetesLogEventSerde$

2022-06-24 11:28:35 WARN [default-dispatcher-10] o.a.k.c.c.ConsumerConfig - The configuration 'ssl.client.auth' was supplied but isn't a known config.
2022-06-24 11:28:35 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka version: 2.7.0
2022-06-24 11:28:35 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka commitId: 448719dc99a19793
2022-06-24 11:28:35 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka startTimeMs: 1656070115704
2022-06-24 11:28:35 INFO [default-dispatcher-10] o.a.k.c.c.KafkaConsumer - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Subscribed to pattern: 'v2dev-instaclustr.*-logs'
2022-06-24 11:28:36 WARN [default-dispatcher-24] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.27:9093 (id: -1 rack: null) disconnected
2022-06-24 11:28:37 WARN [default-dispatcher-25] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.100:9093 (id: -3 rack: null) disconnected
2022-06-24 11:28:37 WARN [default-dispatcher-10] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.220.5.92:9093 (id: -4 rack: null) disconnected
`

@vsinghal13
Copy link
Collaborator

@ddaughertysl any suggestions on this?

@ddaughertysl
Copy link
Collaborator

i think this shows progress in that the ssl settings are now being applied and the security protocol is SSL. The question is why aren't they working. Which is a bit harder for us to answer without comparing both the settings that work in your other applications and the sumo-push settings. Also i notice that your ssl.protocol is set to TLSv1.3 and ssl.enabled.protocols don't include that value. So you might also try including v1.3 in the list of enabled protocols.

@aporwal3
Copy link

aporwal3 commented Jul 1, 2022

@ddaughertysl Did the requested change but still the issue persist.

2022-07-01 09:27:03 INFO [.default-dispatcher-3] a.e.s.Slf4jLogger - Slf4jLogger started
2022-07-01 09:27:05 INFO [.default-dispatcher-5] a.k.i.SingleSourceLogic - [f1e6a] Starting. StageActor Actor[akka://sumo-push-main/system/Materializers/StreamSupervisor-0/$$c#-38628872]
2022-07-01 09:27:05 INFO [default-dispatcher-10] o.a.k.c.c.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.x.27:9093, 10.x.40:9093, 10.x.100:9093, 10.x.92:9093, 10.x.137:9093, 10.x.178:9093]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-kafka-push-logs-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = kafka-push-logs
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.3, TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = /etc/ssl/certs/kafka/truststore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = /etc/ssl/certs/kafka/truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class com.sumologic.sumopush.serde.KubernetesLogEventSerde$

2022-07-01 09:27:05 WARN [default-dispatcher-10] o.a.k.c.c.ConsumerConfig - The configuration 'ssl.client.auth' was supplied but isn't a known config.
2022-07-01 09:27:05 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka version: 2.7.0
2022-07-01 09:27:05 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka commitId: 448719dc99a19793
2022-07-01 09:27:05 INFO [default-dispatcher-10] o.a.k.c.u.AppInfoParser - Kafka startTimeMs: 1656667625750
2022-07-01 09:27:05 INFO [default-dispatcher-10] o.a.k.c.c.KafkaConsumer - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Subscribed to pattern: 'v2dev-instaclustr.*-logs'
2022-07-01 09:27:06 WARN [default-dispatcher-24] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.x.27:9093 (id: -1 rack: null) disconnected
2022-07-01 09:27:07 WARN [default-dispatcher-23] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.x.100:9093 (id: -3 rack: null) disconnected
2022-07-01 09:27:07 WARN [default-dispatcher-24] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.x.137:9093 (id: -5 rack: null) disconnected
2022-07-01 09:27:08 WARN [default-dispatcher-23] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker 10.x.40:9093 (id: -2 rack: null) disconnected

@ddaughertysl
Copy link
Collaborator

Please send the broker logs since they might have more information about what is happening. Also noticing that you are using the same file for both keystore and truststore. Is that intentional? Can you also send us the settings you are using in your working applications with the passwords redacted?

@ddaughertysl
Copy link
Collaborator

ddaughertysl commented Jul 1, 2022

The sasl.jaas.config line in the config also looks a little strange. Can you escape the quotes in that config so it looks like the following:

sasl.jaas.config: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";"

@aporwal3
Copy link

aporwal3 commented Jul 6, 2022

Thanks @ddaughertysl for the hint. We have corrected jaas config now. But the tool is failing now with SASL client authenticator error. Several articles online suggests that it is a temporary error which should get resolved once we restart the app. But that is not happening in our case.

2022-07-06 13:28:34 WARN [default-dispatcher-20] o.a.k.c.NetworkClient - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Bootstrap broker xxxxx:9093 (id: -4 rack: null) disconnected
2022-07-06 13:28:34 INFO [default-dispatcher-24] o.a.k.c.n.SaslChannelBuilder - [Consumer clientId=consumer-kafka-push-logs-1, groupId=kafka-push-logs] Failed to create channel due to
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.firstPrincipal(SaslClientAuthenticator.java:620)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:200)
at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:275)
at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:216)
at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:142)
at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:224)
at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329)
at org.apache.kafka.common.network.Selector.connect(Selector.java:256)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:1004)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:498)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:244)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
at akka.kafka.internal.KafkaConsumerActor.poll(KafkaConsumerActor.scala:529)
at akka.kafka.internal.KafkaConsumerActor.commitAndPoll(KafkaConsumerActor.scala:515)
at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:499)
at akka.kafka.internal.KafkaConsumerActor$$anonfun$regularReceive$1.applyOrElse(KafkaConsumerActor.scala:289)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:205)
at akka.actor.Timers.aroundReceive(Timers.scala:52)
at akka.actor.Timers.aroundReceive$(Timers.scala:41)
at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:205)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

Can you please advise, as what can be an issue now ?

@ddaughertysl
Copy link
Collaborator

have you configured your security.protocol and sasl.mechanism as follows? https://kafka.apache.org/documentation/#security_sasl_scram_clientconfig

@aporwal3
Copy link

aporwal3 commented Jul 6, 2022

@ddaughertysl thanks again. it has worked now. But log parsing is failing now with below error. Do we need to configure serdeClass ?

Error Logs:

2022-07-06 18:19:16 ERROR [default-dispatcher-28] c.s.s.a.LogProcessor$ - unable to parse log message payload: Jul 01 13:42:06 8f844057-a8f5-4f0d-9b66-3347d9815750 [2022-07-01 13:42:06,957] INFO [SocketServer listenerType=ZK_BROKER, nodeId=2] Failed authentication with /10.x.x.8 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
java.lang.Exception: payload: Jul 01 13:42:06 8f844057-a8f5-4f0d-9b66-3347d9815750 [2022-07-01 13:42:06,957] INFO [SocketServer listenerType=ZK_BROKER, nodeId=2] Failed authentication with /10.x.x.8 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
at com.sumologic.sumopush.serde.KubernetesLogEventSerde$.deserialize(KubernetesLogEventSerde.scala:15)
at com.sumologic.sumopush.serde.KubernetesLogEventSerde$.deserialize(KubernetesLogEventSerde.scala:8)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at com.sumologic.sumopush.serde.LogEventSerde.deserialize(LogEventSerde.scala:8)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1387)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1618)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1454)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:687)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:638)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
at akka.kafka.internal.KafkaConsumerActor.poll(KafkaConsumerActor.scala:549)
at akka.kafka.internal.KafkaConsumerActor$$anonfun$regularReceive$1.applyOrElse(KafkaConsumerActor.scala:296)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:205)
at akka.actor.Timers.aroundReceive(Timers.scala:56)
at akka.actor.Timers.aroundReceive$(Timers.scala:41)
at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:205)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.json4s.ParserUtil$ParseException: unknown token J
Near: J
at org.json4s.native.JsonParser$Parser.fail(JsonParser.scala:236)
at org.json4s.native.JsonParser$Parser.nextToken(JsonParser.scala:324)
at org.json4s.native.JsonParser$.$anonfun$astParser$1(JsonParser.scala:188)
at org.json4s.native.JsonParser$.$anonfun$astParser$1$adapted(JsonParser.scala:145)
at org.json4s.native.JsonParser$.parse(JsonParser.scala:133)
at org.json4s.native.JsonParser$.parse(JsonParser.scala:71)
at org.json4s.native.JsonParser$.parse(JsonParser.scala:50)
at org.json4s.native.Serialization$.read(Serialization.scala:71)
at org.json4s.Serialization.read(Serialization.scala:25)
at org.json4s.Serialization.read$(Serialization.scala:25)
at org.json4s.native.Serialization$.read(Serialization.scala:32)
at com.sumologic.sumopush.model.KubernetesLogEventSerializer$.fromJson(KubernetesLogEvent.scala:47)
at com.sumologic.sumopush.serde.KubernetesLogEventSerde$.deserialize(KubernetesLogEventSerde.scala:13)
... 27 common frames omitted

@ddaughertysl
Copy link
Collaborator

Yes you should be using the JsonLogEventSerde but it also looks like your messages aren't in Json format. Can you fix that?

@aporwal3
Copy link

aporwal3 commented Jul 7, 2022

@ddaughertysl Logs are in text/string format. Do we need to compulsory move them to JSON format ?

@ddaughertysl
Copy link
Collaborator

Yes per the docs https://github.com/SumoLogic/sumologic-kafka-push#supported-message-formats we only support json format at this time.

@ashrivastavagit
Copy link
Author

ashrivastavagit commented Oct 11, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants