Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_kafka Fluent-bit kafka consumer can't reach high throughput #7991

Open
anosulchik opened this issue Sep 29, 2023 · 10 comments
Open

in_kafka Fluent-bit kafka consumer can't reach high throughput #7991

anosulchik opened this issue Sep 29, 2023 · 10 comments

Comments

@anosulchik
Copy link

anosulchik commented Sep 29, 2023

I'm working on replacing promtail kafka consumer with fluent-bit in high-load setup. Overall, our configuration of fluent-bit kafka consumer works and it consumes messages, but the problem is that it can't reach consumption throughput that is successfully handled by promtail. For example, promtail consumer is able to reach 140-150MB/s and max that we can squeeze from fluent-bit kafka consumer running the same resources - 10-15 MB/s.

Configuration

Fluent-bit v2.1.8

Kafka Brokers and Topics:
6 brokers kafka.m5.large (2 vCPU, 8 GB)
Topic with 128 partitions, replication factor = 3

{
    "min.insync.replicas" = 2
    "local.retention.ms"  = "7200000"  // 2h
    "retention.ms"        = "21400000" // 6h
    "segment.ms"          = "21400000" // 6h
    "compression.type"    = "snappy"
  }

Fluent-bit Config

      [SERVICE]
          Daemon Off
          Parsers_File parsers.conf
          Parsers_File custom_parsers.conf
          HTTP_Server On
          HTTP_Listen 0.0.0.0
          HTTP_Port 2020
          Health_Check Off

      [INPUT]
          name kafka
          brokers <kafka brokers>
          topics <topic name>
          group_id <fluentbit consumer group name>
          poll_ms 100
          rdkafka.security.protocol ssl
          rdkafka.queued.min.messages 500000
          rdkafka.fetch.wait.max.ms 250
          rdkafka.socket.blocking.max.ms 250
          rdkafka.fetch.error.backoff.ms 1000
          rdkafka.partition.assignment.strategy roundrobin

      [INPUT]
          Name fluentbit_metrics
          Tag internal_metrics

      [FILTER]
          Name parser
          Match kafka*
          Key_Name payload
          Reserve_Data true
          Parser logs-kafka

      [PARSER]
          Name logs-kafka
          Format json

      [OUTPUT]
          name null
          alias devnull
          match*

      [OUTPUT]
          Name prometheus_exporter
          Match internal_metrics
          Port 2020

Fluent-bit Resources:

128 pods running at c5n.large:

    Limits:
      cpu:     2
      memory:  4Gi
    Requests:
      cpu:      1
      memory:   2Gi

Promtail Config:

scrapeConfigs: |
        - job_name: kafka
          kafka:
            brokers:
            - <kafka brokers>
            - ...
            group_id: <group name>
            authentication:
              type: ssl
            topics:
              - mi-logs
            version: 2.8.1
            assignor: roundrobin
            use_incoming_timestamp: false
          relabel_configs:
          - action: replace
            source_labels:
              - __meta_kafka_topic
            target_label: kafka_topic
          - action: replace
            source_labels:
              - __meta_kafka_group_id
            target_label: kafka_group_id
          pipeline_stages:
            - json:
                expressions:
                  job: job
                  instance_id: instance_id
                  instance: instance
                  availability_zone: availability_zone
                  datacenter: datacenter
                  role: role
                  filename: filename
            - labels:
                job:
                instance_id:
                instance:
                availability_zone:
                datacenter:
                role:
                filename:
    clients:
      - url: <loki url>
        backoff_config:
          max_period: 600s
          min_period: 500ms
          max_retries: 144
        timeout: 60s
        batchsize: 95240
        batchwait: 2s

Promtail Resources:

128 pods running at c5n.large:

    Limits:
      cpu:     2
      memory:  4Gi
    Requests:
      cpu:      1
      memory:   2Gi

Our goal is to ensure fluent-bit can reach throughput that is handled by promtail. During POC run (fluent-bit kafka consumer), all kafka partitions in our consumer group are properly distributed between fluent-bit consumers. We use fluentbit_input_bytes_total metric to get the rate of fluent-bit kafka consumer throughput. Fluent-bit doesn't restart or crash - it just consumers messages but slowly.

We noticed that CPU of the fluent-bit consumers doesn't increase higher than 0.04-0.05 (in k8s CPU units) when there are messages available to consume from kafka. Like it waits for some backoff timer and reads messages only occasionally.

Loki___Fluentbit_-Loki-Dashboards-_Grafana

@anosulchik
Copy link
Author

Anyone?

@1123183721
Copy link

I run into the same problem, I can't limit or improve the throughput of consumers through any means and configuration items, except to add instances of fluent-bit, maybe using fluent-bit is not a good idea?

@tianhao98
Copy link

I have also encountered that each Fluent-Bit resource usage rate is not high, but the consumption rate is not up

@aliSadegh
Copy link

I am also experiencing the same problem. I have tried and tested many properties of librdkafka, but there hasn't been any improvement.
Here is my configuration:

[INPUT]
    Name kafka
    brokers kafka:9092
    topics test-inter
    poll_ms 100
    group_id test004
    rdkafka.queued.min.messages 500000
    rdkafka.partition.assignment.strategy roundrobin

    rdkafka.auto.commit.interval.ms 1
    rdkafka.fetch.wait.max.ms 100
    rdkafka.enable.partition.eof false
    rdkafka.enable.auto.commit true
    rdkafka.enable.auto.offset.store true
    rdkafka.statistics.interval.ms 1000
    rdkafka.fetch.message.max.bytes 10000000
    rdkafka.max.partition.fetch.bytes 10000000
    rdkafka.socket.send.buffer.bytes 100
    rdkafka.fetch.error.backoff.ms 1000

[OUTPUT]
    Name null
    Match *

@simontilmant
Copy link

simontilmant commented Mar 5, 2024

The problem seems linked to the fact that a call to rd_kafka_commit is performed for every single message.
If we look at https://docs.huihoo.com/apache/kafka/confluent/3.1/clients/consumer.html, rd_kafka_commit is called every 1000 messages.

The following change taken for the site above is dramatically improving performance

static void in_kafka_callback(int write_fd, void *data)
{
    struct flb_input_thread *it = data;
    struct flb_in_kafka_config *ctx = data - offsetof(struct flb_in_kafka_config, it);
    mpack_writer_t *writer = &ctx->it.writer;
    static const int MIN_COMMIT_COUNT = 1000;

    int msg_count = 0;

    while (!flb_input_thread_exited(it)) {
        rd_kafka_message_t *rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 500);

        if (rkm) {
            process_message(writer, rkm);
            fflush(ctx->it.write_file);
            rd_kafka_message_destroy(rkm);

            if ((++msg_count % MIN_COMMIT_COUNT) == 0)
                rd_kafka_commit(ctx->kafka.rk, NULL, 0);
        }
    }
}

In https://docs.huihoo.com/apache/kafka/confluent/3.1/clients/consumer.html, it is mentioned
"In this example, we trigger a synchronous commit every 1000 messages. The second argument to rd_kafka_commit is the list of offsets to be committed; if set to NULL, librdkafka will commit the latest offsets for the assigned positions. The third argument in rd_kafka_commit is a flag which controls whether this call is asynchronous. We could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly."
I have not enough knowledge in Kafka to figure out what should be the best fix ( time based vs item count commit, asynchronous/synchronous), hope this helps anyway

It seems related to #8400

Copy link
Contributor

github-actions bot commented Jun 8, 2024

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.

@github-actions github-actions bot added the Stale label Jun 8, 2024
Copy link
Contributor

This issue was closed because it has been stalled for 5 days with no activity.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Jun 13, 2024
@edsiper edsiper reopened this Oct 11, 2024
@edsiper
Copy link
Member

edsiper commented Oct 11, 2024

cc: @lecaros

@github-actions github-actions bot removed the Stale label Oct 12, 2024
@weily2
Copy link

weily2 commented Oct 23, 2024

any update ?

@lyhgo
Copy link

lyhgo commented Dec 19, 2024

#9726 I have created a PR for this. It added an option to enable auto-commit to reach high throughput.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants