Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka::MessageSizeTooLarge #323

Open
pcantea opened this issue Feb 27, 2020 · 10 comments
Open

Kafka::MessageSizeTooLarge #323

pcantea opened this issue Feb 27, 2020 · 10 comments
Labels

Comments

@pcantea
Copy link

pcantea commented Feb 27, 2020

config:

<match containers.** systemd.log auth.log kube-apiserver-audit.log>
      @type kafka2

      BROKERS

      username 
      password 
      scram_mechanism sha256
      sasl_over_ssl true

      ssl_ca_cert [CERT]

      <format>
        @type json
      </format>

      <buffer topic>
        @type file
        path /mnt/pos/buffers
        chunk_limit_size 64MB
        total_limit_size 1024MB
        flush_mode interval
        flush_interval 3s
        flush_thread_count 4
        retry_type exponential_backoff
        retry_max_interval 30
        retry_forever
      </buffer>

      max_send_limit_bytes 900000

      default_topic messages
      compression_codec snappy

      max_send_retries 4
      required_acks -1

      exclude_topic_key true
      exclude_partition true
      exclude_partition_key true
      exclude_message_key true
    </match>

Fluentd works fine for a while, then gets stuck on a large chunk. Usually seems to happen when one of the logs it's matching has a spike in throughput

2020-02-27 21:13:23 +0000 [warn]: #0 Send exception occurred: Kafka::MessageSizeTooLarge                                                                                                                                                                                   │
│ 2020-02-27 21:13:23 +0000 [warn]: #0 Exception Backtrace : /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/protocol.rb:160:in `handle_error'                                                                                                   │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:153:in `block in handle_response'                                                                                                                                         │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/protocol/produce_response.rb:36:in `block (2 levels) in each_partition'                                                                                                                        │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/protocol/produce_response.rb:35:in `each'                                                                                                                                                      │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/protocol/produce_response.rb:35:in `block in each_partition'                                                                                                                                   │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/protocol/produce_response.rb:34:in `each'                                                                                                                                                      │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/protocol/produce_response.rb:34:in `each_partition'                                                                                                                                            │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:144:in `handle_response'                                                                                                                                                  │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:133:in `block in send_buffered_messages'                                                                                                                                  │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:105:in `each'                                                                                                                                                             │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:105:in `send_buffered_messages'                                                                                                                                           │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:62:in `block in execute'                                                                                                                                                  │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/instrumenter.rb:23:in `instrument'                                                                                                                                                             │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:53:in `execute'                                                                                                                                                           │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-kafka-0.12.3/lib/fluent/plugin/kafka_producer_ext.rb:210:in `block in deliver_messages_with_retries'                                                                                                         │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-kafka-0.12.3/lib/fluent/plugin/kafka_producer_ext.rb:200:in `loop'                                                                                                                                           │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-kafka-0.12.3/lib/fluent/plugin/kafka_producer_ext.rb:200:in `deliver_messages_with_retries'                                                                                                                  │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-kafka-0.12.3/lib/fluent/plugin/kafka_producer_ext.rb:126:in `deliver_messages'                                                                                                                               │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-kafka-0.12.3/lib/fluent/plugin/out_kafka2.rb:265:in `write'                                                                                                                                                  │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.9.2/lib/fluent/plugin/output.rb:1133:in `try_flush'                                                                                                                                                              │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.9.2/lib/fluent/plugin/output.rb:1439:in `flush_thread_run'                                                                                                                                                       │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.9.2/lib/fluent/plugin/output.rb:461:in `block (2 levels) in start'                                                                                                                                               │
│ /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.9.2/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'                                                                                                                                            │
│ 2020-02-27 21:13:23 +0000 [info]: #0 initialized kafka producer: fluentd                                                                                                                                                                                                   │
│ 2020-02-27 21:13:23 +0000 [warn]: #0 failed to flush the buffer. retry_time=0 next_retry_seconds=2020-02-27 21:13:24 +0000 chunk="59f92c5b59f6035ea8236814fb465867" error_class=Kafka::MessageSizeTooLarge error="Kafka::MessageSizeTooLarge"                              │
│   2020-02-27 21:13:23 +0000 [warn]: #0 suppressed same stacktrace                                                                                                                                                                                                          │
│ 2020-02-27 21:13:24 +0000 [warn]: #0 retry succeeded. chunk_id="59f9530bf1269c3daa29759ad3f94b15"

The max message bytes limit on the kafka brokers is 2000000.

@juliantaylor
Copy link

we are seeing the same thing with version 0.12.3 and max_send_limit_bytes far below kafkas limit.
the kafka_buffered type works fine

@simonasr
Copy link

simonasr commented Mar 2, 2020

same here. version 0.12.3 and Kafka::MessageSizeTooLarge errors. I also metntioned this problem in issue #319

@repeatedly
Copy link
Member

repeatedly commented Mar 2, 2020

max_send_limit_bytes

The problem is misleading parameter name. This parameter is for removing larger record from record batch, not limit the size of record batch.
So if you want to send smaller record batch to avoid "message size too large", you need to change chunk_limit_size parameter of buffer. kafka2 uses one buffer chunk for one record batch.

For example, if you set message.max.bytes=5242880 # 5MB in kafka server configuration, chunk_limit_size parameter should be smaller value like chunk_limit_size 2m.

@juliantaylor
Copy link

juliantaylor commented Mar 5, 2020

thanks setting the chunk_limit_size does work, this should probably be added to the README documentation, MessageSizeTooLarge is only mentioned together with max_send_limit_bytes

but with:

max_send_limit_bytes 100000
<buffer>
chunk_limit_size 1000000

we get lots of warnings of this type:

[warn]: #0 chunk bytes limit exceeds for an emitted event stream: 1044975bytes

That size is larger than the kafka max message size, but there are no errors.
What exactly is happening here? what size of message is sent to the kafka?
should the chunksize be significantly below the kafka max message size to accommodate for potentially larger chunks than configured?

@snowjunkie
Copy link

From looking at the code, what I am seeing is that there's some balancing to do across these numbers.
Chunk is held as MsgPack and therefore chunk_limit_size is measured based in the size of the MsgPack buffer file.
However, as we read the records from the chunk, the tag and time are added and then the record is formatted into the output plugin's configured output - such as JSON. This act inflates the total size of the data - so even though you have set a max chunk size, by the time that chunk is inflated with the other data and formatting it is still exceeding the configured limit.
And just to clarify, the max_send_limit_bytes is evaluated AFTER the record has been inflated.

As @repeatedly suggested in a Kafka config with a max message of 5MB you really need a chunk limit of quite a bit less than that (in his example 2MB). We went through this ourselves recently and are finding stability with a Kafka max of 1MB and max chunk size of 600K.
Because there's variability in the record size, I think you have to be quite careful and there won't necessarily be a fixed inflation ratio that works for everyone.

I totally agree that this whole area of sizing across the chunk, record and kafka message really needs a specific section in the kafka plugin documentation.

@snowjunkie
Copy link

@juliantaylor this may provide some context fluent/fluentd#2084

@snowjunkie
Copy link

I took another look into the buffer code here:
image

If I'm right the number at the end of the "chunk bytes limit exceeds for an emitted event stream.." is equal to the number of bytes added during the formatting activity.
Calculated by taking the original chunk.bytesize away from the resulting chunk.bytesize - so that gives you some indication of the expansion factor that your formatting is introducing.
I thought originally that this figure was the amount (in bytes) the chunk actually exceeded the chunk_limit_size but that would be calculated by chunk.bytesize - chunk_limit_size and that's not what's used in the error message.

@repeatedly in this case where people are seeing many of these due to the formatting expansion, would it be advised to use the chunk_full_threshold as a way to always give the chunk the headroom for that expansion in size?

That may help reduce the chunk.rollback occurrences?

@xidiandb
Copy link

My chunk_limit_size setting to 10M works as well. WHY?
message.max.bytes I didn't set this place, it should be the default size
Is the compression strategy working??
@repeatedly Could you please give me some answers.

@github-actions
Copy link

github-actions bot commented Jul 6, 2021

This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days

@github-actions github-actions bot added the stale label Jul 6, 2021
@kenhys
Copy link
Contributor

kenhys commented Jul 7, 2021

TODO: #323 (comment) need to update explanation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants