Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch partitions in parallel #683

Closed
hmagarotto opened this issue Mar 30, 2020 · 19 comments
Closed

Fetch partitions in parallel #683

hmagarotto opened this issue Mar 30, 2020 · 19 comments

Comments

@hmagarotto
Copy link

When the processing of one partition is slow, all partitions are affected.
It's because the "fetch" for other partitions only is performed after the process completion for all partitions.
The "fetch" for all topics and partitions are synchronous.
So, it's not possible to use the partitions as a unit of parallelism.

Can we deal with this using some setting or using any different strategy?

Test with a slow process (exaggerated) on partition 0:
https://github.com/hmagarotto/kafkajs-test-partition-parallelism

@ankon
Copy link
Contributor

ankon commented Mar 31, 2020

This sounds a bit related to #370 (#570) to me. If so, maybe you could try the beta releases with the test?

@hmagarotto
Copy link
Author

Hi, thanks for your response.

What I understood about this refactoring is that:

The consumer will process batches immediately after a response for each fetch request.
But, a new fetch for this consumer group will only be started after all batches has already been processed. For a new fetch start, we need to wait for "enqueuedTasks" on "Runner::fetch".

So, if I have understood correctly, a slow task for some partition will block new fetches for all other partitions.

I tried the test with the new beta version and all partitions stay blocked by slower partition task.

Test code updated with beta KafkaJS: https://github.com/hmagarotto/kafkajs-test-partition-parallelism

Similar needs for another library/language: akka/alpakka-kafka#110

@Nevon
Copy link
Collaborator

Nevon commented Mar 31, 2020

Your understanding is correct, @hmagarotto. Currently the fetch loop will wait until all partitions have been processed before issuing new fetch requests. I've been having the same discussion on our Slack today, and I think it makes a lot of sense to start the next fetch for a broker as soon as we have processed the last response from it, to avoid having to wait for the slowest one.

@tristan-redox
Copy link

tristan-redox commented May 28, 2020

@Nevon, we recently tried a BETA version of KafkaJS and noticed that another fetch started before the current processing fetch completed. The problem we encountered was that the second fetch received the same message/offset from the same topic/partition that was currently being processed in the yet to finish fetch. This resulted in the same message being processed twice "concurrently" within <50 ms of each other. This seems like a bug, should I raise a bug report against the BETA version?

There are a number of concerns around starting the next fetch before the previous finishes. For example, preventing the behavior above (duplicate concurrent processing of the same message) or guaranteeing FIFO processing for each topic/partition a consumer is assigned. If this feature is added, I believe you would need to ensure the second fetch does not fetch from topic/partitions that are still being processed by the current unfinished fetch. This seems like it could be difficult to ensure.

@Nevon
Copy link
Collaborator

Nevon commented May 28, 2020

we recently tried a BETA version of KafkaJS and noticed that another fetch started before the current processing fetch completed. The problem we encountered was that the second fetch received the same message/offset from the same topic/partition that was currently being processed in the yet to finish fetch. This resulted in the same message being processed twice "concurrently" within <50 ms of each other. This seems like a bug, should I raise a bug report against the BETA version?

Please do. Especially if you can produce a failing test or at least a repro case for this.

To be clear, we would never start a fetch for a partition for which we haven't processed all messages in the current batch yet, as partitions are the unit of concurrency in Kafka. However, there's theoretically nothing stopping us from issuing a new fetch request for partition 1 just because we haven't finished processing the messages for partition 2 yet.

@sparcs360
Copy link

Hi all - are there any plans to work on this?

Regarding workarounds:

  1. Lower maxWaitTimeInMs.
  2. Create slow consumers on separate clients (would that work? or do they end up being pooled?)
  3. Any more?

@sparcs360
Copy link

workaround 2 gives me No active topic partitions, sleeping for 5000ms - I can't make sense of that...

If I put each consumer in it's own consumer group then it works (although I prefer workaround 1)

@ankon
Copy link
Contributor

ankon commented Jan 5, 2021

workaround 2 gives me No active topic partitions, sleeping for 5000ms - I can't make sense of that...

The 5000 is simply the maxWaitTime you configured, and the message is logged when the consumer is not subscribed to any topics, or didn't get any partitions to fetch assigned to it. In that case KafkaJS just waits that time before issuing another fetch attempt, in the hope that until then maybe the assignment has changed.

Check whether you subscribed to the correct topics in all your consumers.

@KristjanTammekivi
Copy link
Contributor

I ran into this bug, with 1 kafka instance I get messages pretty much instantly (minBytes is 1), with 2 it hits the maxWaitTimeInMs

@analytik
Copy link

I'm also hitting this bug. Does this need repro steps, or anything else I can do to prioritize fixing this?

@t-d-d
Copy link
Contributor

t-d-d commented Aug 30, 2021 via email

@analytik
Copy link

analytik commented Sep 1, 2021

I'll try to work on a demo video and source code, but just to confirm - in my case, there is one producer (one Kafka Connect task/job) that sends to any number of partitions. The problem only happens if there are 2+ Kafka brokers and works correctly with 1 Kafka broker.

@hmagarotto
Copy link
Author

Maybe the example added on issue opening is still valid. I've added a repository with a simple example to reproduce the issue (with docker-compose).

https://github.com/hmagarotto/kafkajs-test-partition-parallelism

This example shows that the fetch of the partitions is synchronous. So, when the processing of some partition is slow, it impacts the consumption of all other partitions. It happens even on a single broker.

@analytik
Copy link

analytik commented Sep 1, 2021

Actually, now testing with 1.15.0 and 1.16.0-beta.21 and I can reproduce the bug (or maybe a different bug?) with a single-node Kafka cluster.

The code is pretty much the same as the one in the OP's repro repo - for clarity, pasting here. I will upload videos shortly.

// last tested on 2021-09-01 with [email protected]
// last tested on 2021-09-01 with [email protected]
const { Kafka } = require("kafkajs");
const { delay } = require("bluebird");
const { toPath } = require("lodash");

const topic = 'TEST_bug_683_repro_single_node_beta_test2';
const numPartitions = 4;
const groupId = 'test_bug_683_repro_cg_single_node_beta_test2';
let startShuttingDown = false;
let totalProduced = 0;
let totalConsumed = 0;

let client = new Kafka({
  brokers: [
      'kafka00.default.svc.cluster.local:9092',
    ],
});

async function createTopic() {
  const admin = client.admin();
  await admin.connect();
  await admin.createTopics({
    topics: [{
      topic,
      numPartitions,
    }],
  });
  await admin.disconnect();
}

async function produce() {
  const producer = client.producer();
  await producer.connect();
  for (let id = 0; id < numPartitions*10; id++) {
    const message = { id, created_timestamp: Date.now() };
    const messageStr = JSON.stringify(message);
    const partition = id % numPartitions;
    const envelope = {
        topic,
        messages: [{
          key: JSON.stringify({id}),
          value: messageStr,
          partition,
        }],
      };
    // console.dir(envelope);
    await producer.send(envelope);
    totalProduced++;
  }
  await producer.disconnect();
  if (!startShuttingDown) {
    console.log(`Produced ${totalProduced} messages total.`);
    setTimeout(produce, 1500);
  }
}

async function consume() {
  const consumer = client.consumer({
    groupId,
    maxBytesPerPartition: 1024,
    readUncommitted: false,
  });
  await consumer.connect();
  await consumer.subscribe({ topic });
  await consumer.run({
    autoCommitInterval: 500,
    partitionsConsumedConcurrently: 4,
    eachMessage: async ({ topic, partition, message }) => {
      const msg = JSON.parse(message.value.toString());
      if (partition === 0) {
        console.log(`Consuming a message id ${msg.id} on the delayed partition 0`)
        // 8 seconds delay on partition 0 to demonstrate the delay is not related to the 5s maxWaitTimeInMs
        await delay(8 * 1000);
      }
      console.log(`Message processed from [${topic}:${partition}] with id [${msg.id}] - total consumed: ${++totalConsumed}`);
    },
  });
}

async function main() {
  await createTopic();
  await consume();
  produce();
  setTimeout(() => { startShuttingDown = true; }, 40_000);
}

main();

@analytik
Copy link

analytik commented Sep 1, 2021

Uploaded a repro video, in case there's any issue with getting the same results elsewhere: https://www.youtube.com/watch?v=4qr64l3Fg-8 (Please note it might take a few hours until youtube processes the higher resolution versions, until then it's in unreadable 360p.)

To sum up the video:

  • using the code above
  • first run with stable kafkajs, 4 partitions, 2 partitionsConsumedConcurrently
  • another test run without the sleep to demo the code/cluster works correctly
  • beta version run with more logging to clearly show when the "poison" consumer starts, and that it blocks any parallel consumers from any partition - now running partitionsConsumedConcurrently = 4 just to be clear
  • another beta version run with kafka-console-consumer running in parallel to demo the cluster works correctly

@t-d-d
Copy link
Contributor

t-d-d commented Sep 18, 2021

This isn't a bug - it is the expected behaviour with the current implementation.

The partitionsConsumedConcurrently feature does concurrent partition processing for the batches that are retrieved in each fetch loop. All processing still needs to complete before the next fetch request is issued.

A feature request for a 'more concurrent' consumer makes complete sense. But it's a fairly substantial piece of work.

@analytik
Copy link

It baffles me how you can claim this is as designed. It's clearly called partitionsConsumedConcurrently, if you're waiting for another messaged to be processed, then you're not really consuming concurrently, are you?

Furthermore, this clearly goes against the Kafka architecture, where individual consumers processed concurrently, regardless in how many clients they are running. If I have 20 partitions and one node.js process, I expect it to process 20 partitions concurrently by default.

priitkaard added a commit to priitkaard/kafkajs that referenced this issue Nov 16, 2021
* split runner into partitionsConsumedConcurrently promises
* fetch batches independently in each concurrent task
* clean up old barrier and concurrency logic
priitkaard added a commit to priitkaard/kafkajs that referenced this issue Dec 20, 2021
priitkaard added a commit to priitkaard/kafkajs that referenced this issue Dec 20, 2021
priitkaard added a commit to priitkaard/kafkajs that referenced this issue Dec 20, 2021
Nevon added a commit that referenced this issue Mar 15, 2022
@dhessler
Copy link

Per #1258 (comment), is this resolved?

@Nevon
Copy link
Collaborator

Nevon commented Aug 18, 2022

There's always more that could be done, but I would consider it resolved by #1258.

The current state is that essentially each broker gets its own fetch loop, so now there's no dependency between the processing of partitions fetched from different brokers, but if you are fetching partition 1 and 2 from the same broker, you'll still be waiting for all both partitions to finish processing before fetching the next batch for those partitions, but it won't prevent fetching new data from other partitions.

Increasing parallelism further would increase the chattiness towards the brokers. For example, say you finished processing partition 1 but are still working on partition 2, then you could make a new fetch request for just partition 1. But that would mean multiplying the number of fetch requests by the number of subscribed partitions, which is not how the API is meant to be used. It might be possible to come up with a design where you use some kind of buffer of fetched-but-unprocessed batches and try to cleverly refill the buffers in a way that doesn't require making 1 fetch request per partition, but it seems wildly complex and I'm not sure it would actually yield that much better results in the real world.

@Nevon Nevon closed this as completed Aug 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants