Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_forward: Handle multiply concatenated gzip payloads #8665

Merged

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Apr 3, 2024

On out_forward in Fluentd payloads, sometimes gzip compressed payloads are concatenated because Fluentd has a capability to compress its buffers with gzip compression.
With this circumstance, Fluentd sometimes concatenates its payloads on forward protocol.
This causes that Fluent Bit gives up to uncompress for the concatenated ones.
In this PR, I implemented for decoding way to distinguish whether the ingested forwarded payloads is concatenated or not.

This is not mandatory specification and not well-documented behavior on Ruby's zlib gem.
But, in fact, Fluentd is heavily relying on this behavior for saving the CPU usage to avoid unnecessary compressions/decompressions.

This is why I found that the reason Fluentd is actually relying on the concatenated gzipped payloads:
https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin/compressable.rb#L57-L93


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
service:
    flush: 1
    log_level: debug
pipeline:
    inputs:
        - listen: 0.0.0.0
          port: "24224"
          buffer_max_size: "300mb"
          buffer_chunk_size: "32mb"
          Name: forward
    outputs:
        - Name: stdout
          match: "*"
  • Debug log output from testing the change
Fluent Bit v3.0.1
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

___________.__                        __    __________.__  __          ________  
\_   _____/|  |  __ __   ____   _____/  |_  \______   \__|/  |_  ___  _\_____  \ 
 |    __)  |  | |  |  \_/ __ \ /    \   __\  |    |  _/  \   __\ \  \/ / _(__  < 
 |     \   |  |_|  |  /\  ___/|   |  \  |    |    |   \  ||  |    \   / /       \
 \___  /   |____/____/  \___  >___|  /__|    |______  /__||__|     \_/ /______  /
     \/                     \/     \/               \/                        \/ 

[2024/04/04 11:18:31] [ info] Configuration:
[2024/04/04 11:18:31] [ info]  flush time     | 1.000000 seconds
[2024/04/04 11:18:31] [ info]  grace          | 5 seconds
[2024/04/04 11:18:31] [ info]  daemon         | 0
[2024/04/04 11:18:31] [ info] ___________
[2024/04/04 11:18:31] [ info]  inputs:
[2024/04/04 11:18:31] [ info]      forward
[2024/04/04 11:18:31] [ info] ___________
[2024/04/04 11:18:31] [ info]  filters:
[2024/04/04 11:18:31] [ info] ___________
[2024/04/04 11:18:31] [ info]  outputs:
[2024/04/04 11:18:31] [ info]      stdout.0
[2024/04/04 11:18:31] [ info] ___________
[2024/04/04 11:18:31] [ info]  collectors:
[2024/04/04 11:18:31] [ info] [fluent bit] version=3.0.1, commit=dc5089bf8f, pid=1608824
[2024/04/04 11:18:31] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/04/04 11:18:31] [ info] [storage] ver=1.1.6, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/04/04 11:18:31] [ info] [cmetrics] version=0.7.1
[2024/04/04 11:18:31] [ info] [ctraces ] version=0.4.0
[2024/04/04 11:18:31] [ info] [input:forward:forward.0] initializing
[2024/04/04 11:18:31] [ info] [input:forward:forward.0] storage_strategy='memory' (memory only)
[2024/04/04 11:18:31] [debug] [forward:forward.0] created event channels: read=21 write=22
[2024/04/04 11:18:31] [ info] [output:stdout:stdout.0] worker #0 started
[2024/04/04 11:18:31] [debug] [in_fw] Listen='0.0.0.0' TCP_Port=24224
[2024/04/04 11:18:31] [debug] [downstream] listening on 0.0.0.0:24224
[2024/04/04 11:18:31] [ info] [input:forward:forward.0] listening on 0.0.0.0:24224
[2024/04/04 11:18:31] [debug] [stdout:stdout.0] created event channels: read=24 write=25
[2024/04/04 11:18:31] [ info] [sp] stream processor started
[2024/04/04 11:18:44] [debug] [input:forward:forward.0] concatenated gzip payload count is 2
[2024/04/04 11:18:44] [debug] [input chunk] update output instances with new chunk size diff=201922, records=1166, input=forward.0
[2024/04/04 11:18:44] [debug] [input:forward:forward.0] left unconsumed 21861 byte(s)
[2024/04/04 11:18:44] [debug] [input chunk] update output instances with new chunk size diff=185247, records=1229, input=forward.0
[2024/04/04 11:18:44] [debug] [input:forward:forward.0] left unconsumed 6368 byte(s)
[2024/04/04 11:18:44] [debug] [input chunk] update output instances with new chunk size diff=66720, records=472, input=forward.0
[2024/04/04 11:18:44] [debug] [input:forward:forward.0] left unconsumed 0 byte(s)
[2024/04/04 11:18:45] [debug] [task] created task=0x60df810 id=0 OK
[2024/04/04 11:18:45] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
<snip>
[2024/04/04 11:18:45] [debug] [out flush] cb_destroy coro_id=0
[2024/04/04 11:18:45] [debug] [task] destroy task=0x60df810 (task_id=0)
^C[2024/04/04 11:18:48] [engine] caught signal (SIGINT)
[2024/04/04 11:18:48] [ warn] [engine] service will shutdown in max 5 seconds
[2024/04/04 11:18:48] [ info] [input] pausing forward.0
[2024/04/04 11:18:48] [ info] [engine] service has stopped (0 pending tasks)
[2024/04/04 11:18:48] [ info] [input] pausing forward.0
[2024/04/04 11:18:48] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2024/04/04 11:18:48] [ info] [output:stdout:stdout.0] thread worker #0 stopped

  • Attached Valgrind output that shows no leaks or memory corruption was found
==1608824== 
==1608824== HEAP SUMMARY:
==1608824==     in use at exit: 0 bytes in 0 blocks
==1608824==   total heap usage: 8,924 allocs, 8,924 frees, 58,435,290 bytes allocated
==1608824== 
==1608824== All heap blocks were freed -- no leaks are possible
==1608824== 
==1608824== For lists of detected and suppressed errors, rerun with: -s
==1608824== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@cosmo0920 cosmo0920 marked this pull request as ready for review April 4, 2024 02:22
@edsiper edsiper merged commit 2bc526b into master Apr 8, 2024
50 checks passed
@edsiper edsiper deleted the cosmo0920-multi-concatenated-gzip-payloads-on-in_forward branch April 8, 2024 16:31
@lecaros lecaros added this to the Fluent Bit v3.0.2 milestone Apr 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants