Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5 sec lag when consuming from multiple topics in multi-broker cluster #996

Closed
sparcs360 opened this issue Jan 1, 2021 · 2 comments
Closed

Comments

@sparcs360
Copy link

sparcs360 commented Jan 1, 2021

Describe the bug
I've observed #370 behaviour when subscribing to multiple topics from a single Consumer

To Reproduce

  • A 3-broker cluster
  • Create topic1 and topic2 - 1 partition, 3 replicas
  • Run a consumer that subscribes to both topics
    • Note Fetching from 2 partitions for 2 out of 2 topics (from here) every 5 seconds (requires logLevel.DEBUG)
  • Publish a message to one of these topics every 1 second (I used kafkacat)

Expected behavior
The consumer's eachMessage "callback" is executed every second (mills after a message is published)

Observed behavior
The consumer's eachMessage "callback" is executed every 5 seconds (the default maxWaitTimeInMs value)

Environment:

  • OS: Ubuntu Ubuntu 18.04.5 LTS (GNU/Linux 4.15.0-128-generic x86_64)
  • KafkaJS version: 1.15.0
  • Kafka version: confluentinc/cp-kafka:5.0.1 (docker)
  • NodeJS version: v10.19.0

Additional Info
If we only subscribe to the topic we're publishing to then it works as expected

@sparcs360
Copy link
Author

sparcs360 commented Jan 2, 2021

Here are some relevant logs from one of my test runs. The consumer is connected to 2 of the 3 brokers in the cluster (<host>:9092 and <host>:9093)

Here's a 5 second polling cycle where nothing is going on (i.e., no messages published)...

{"t":"2021-01-02T14:45:14.264Z","m":"Request Heartbeat(key: 12, version: 2)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":72,"expectResponse":true,"size":98}}
{"t":"2021-01-02T14:45:14.265Z","m":"Response Heartbeat(key: 12, version: 2)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":72,"size":10,"data":{"throttleTime":0,"errorCode":0,"clientSideThrottleTime":0}}}
{"t":"2021-01-02T14:45:14.265Z","m":"Fetching from 2 partitions for 2 out of 2 topics","args":{"namespace":"ConsumerGroup","topics":["topic2","topic1"],"activeTopicPartitions":[{"topic":"topic2","partitions":[0]},{"topic":"topic1","partitions":[0]}],"pausedTopicPartitions":[]}}
{"t":"2021-01-02T14:45:14.266Z","m":"Request Fetch(key: 1, version: 8)","args":{"namespace":"Connection","broker":"<host>:9092","correlationId":37,"expectResponse":true,"size":104}}
{"t":"2021-01-02T14:45:14.266Z","m":"Request Fetch(key: 1, version: 8)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":73,"expectResponse":true,"size":102}}
{"t":"2021-01-02T14:45:19.267Z","m":"Response Fetch(key: 1, version: 8)","args":{"namespace":"Connection","broker":"<host>:9092","correlationId":37,"size":80,"data":"[filtered]"}}
{"t":"2021-01-02T14:45:19.268Z","m":"Response Fetch(key: 1, version: 8)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":73,"size":78,"data":"[filtered]"}}

~1 second after the next cycle started (at 14:45:20.269) I published 3 messages using kafkacat. I also had a kafkacat consumer running - it immediately displayed the 3 messages

Note that the first message is received almost immediately from <host>:9093, but the next fetch request isn't sent until <host>:9092's request times out.

{"t":"2021-01-02T14:45:19.269Z","m":"Request Heartbeat(key: 12, version: 2)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":74,"expectResponse":true,"size":98}}
{"t":"2021-01-02T14:45:19.270Z","m":"Response Heartbeat(key: 12, version: 2)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":74,"size":10,"data":{"throttleTime":0,"errorCode":0,"clientSideThrottleTime":0}}}
{"t":"2021-01-02T14:45:19.270Z","m":"Fetching from 2 partitions for 2 out of 2 topics","args":{"namespace":"ConsumerGroup","topics":["topic2","topic1"],"activeTopicPartitions":[{"topic":"topic2","partitions":[0]},{"topic":"topic1","partitions":[0]}],"pausedTopicPartitions":[]}}
{"t":"2021-01-02T14:45:19.271Z","m":"Request Fetch(key: 1, version: 8)","args":{"namespace":"Connection","broker":"<host>:9092","correlationId":38,"expectResponse":true,"size":104}}
{"t":"2021-01-02T14:45:19.272Z","m":"Request Fetch(key: 1, version: 8)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":75,"expectResponse":true,"size":102}}
{"t":"2021-01-02T14:45:20.269Z","m":"Response Fetch(key: 1, version: 8)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":75,"size":656,"data":"[filtered]"}}
{"t":"2021-01-02T14:45:20.269Z","m":"Consumed","args":{"messageId":"a9a2600f-6366-4c2c-88d0-18f1f109fa7b","messageType":"MESSAGE","topics":["topic1","topic2"]}}
{"t":"2021-01-02T14:45:20.270Z","m":"Request OffsetCommit(key: 8, version: 4)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":76,"expectResponse":true,"size":146}}
{"t":"2021-01-02T14:45:20.273Z","m":"Response OffsetCommit(key:8, version: 4)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":76,"size":40,"data":{"throttleTime":0,"responses":[{"topic":"topic1","partitions":[{"partition":0,"errorCode":0}]}],"clientSideThrottleTime":0}}}
{"t":"2021-01-02T14:45:24.273Z","m":"Response Fetch(key: 1, version: 8)","args":{"namespace":"Connection","broker":"<host>:9092","correlationId":38,"size":80,"data":"[filtered]"}}

The remaining two messages are received in the next cycle.

{"t":"2021-01-02T14:45:24.274Z","m":"Request Heartbeat(key: 12, version: 2)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":77,"expectResponse":true,"size":98}}
{"t":"2021-01-02T14:45:24.275Z","m":"Response Heartbeat(key: 12, version: 2)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":77,"size":10,"data":{"throttleTime":0,"errorCode":0,"clientSideThrottleTime":0}}}
{"t":"2021-01-02T14:45:24.275Z","m":"Fetching from 2 partitions for 2 out of 2 topics","args":{"namespace":"ConsumerGroup","topics":["topic2","topic1"],"activeTopicPartitions":[{"topic":"topic2","partitions":[0]},{"topic":"topic1","partitions":[0]}],"pausedTopicPartitions":[]}}
{"t":"2021-01-02T14:45:24.276Z","m":"Request Fetch(key: 1, version: 8)","args":{"namespace":"Connection","broker":"<host>:9092","correlationId":39,"expectResponse":true,"size":104}}
{"t":"2021-01-02T14:45:24.276Z","m":"Request Fetch(key: 1, version: 8)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":78,"expectResponse":true,"size":102}}
{"t":"2021-01-02T14:45:24.278Z","m":"Response Fetch(key: 1, version: 8)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":78,"size":1234,"data":"[filtered]"}}
{"t":"2021-01-02T14:45:24.278Z","m":"Consumed","args":{"messageId":"39425c99-4976-45a8-a2b8-8a4abb5fed69","messageType":"MESSAGE","topics":["topic1","topic2"]}}
{"t":"2021-01-02T14:45:24.279Z","m":"Consumed","args":{"messageId":"3f3a5ad4-3ceb-4d47-8342-21fccf11982c","messageType":"MESSAGE","topics":["topic1","topic2"]}}
{"t":"2021-01-02T14:45:29.277Z","m":"Response Fetch(key: 1, version: 8)","args":{"namespace":"Connection","broker":"<host>:9092","correlationId":39,"size":80,"data":"[filtered]"}}
{"t":"2021-01-02T14:45:29.277Z","m":"Request OffsetCommit(key: 8, version: 4)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":79,"expectResponse":true,"size":146}}
{"t":"2021-01-02T14:45:29.280Z","m":"Response OffsetCommit(key:8, version: 4)","args":{"namespace":"Connection","broker":"<host>:9093","correlationId":79,"size":40,"data":{"throttleTime":0,"responses":[{"topic":"topic1","partitions":[{"partition":0,"errorCode":0}]}],"clientSideThrottleTime":0}}}

@sparcs360
Copy link
Author

I've just discovered #683... so closing this issue as a duplicate

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants