From 398b08e0269843e957ec2189c63258b27a9362b5 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Wed, 15 Nov 2023 17:45:42 +0900 Subject: [PATCH 1/3] buffer: Avoid to process discarded chunks in `write_step_by_step` It fixes following error when many `chunk bytes limit exceeds` errors are occurred: ``` 2020-07-28 14:59:26 +0000 [warn]: #0 emit transaction failed: error_class=IOError error="closed stream" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `pos'" tag="cafiscode-eks-cluster.default" 2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `pos' 2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `rollback' 2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer.rb:339:in `rescue in block in write' 2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer.rb:332:in `block in write' 2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer.rb:331:in `each' ... ``` Fix #3089 Signed-off-by: Takuro Ashie --- lib/fluent/plugin/buffer.rb | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index d04ae08296..1f8ccec102 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -728,7 +728,6 @@ def write_once(metadata, data, format: nil, size: nil, &block) def write_step_by_step(metadata, data, format, splits_count, &block) splits = [] - errors = [] if splits_count > data.size splits_count = data.size end @@ -757,15 +756,15 @@ def write_step_by_step(metadata, data, format, splits_count, &block) else synchronize { @stage[modified_metadata] ||= generate_chunk(modified_metadata).staged! } end - modified_chunks << c - c + modified_chunks << {chunk: c, adding_bytesize: 0, errors: []} + return c, modified_chunks.last[:errors] } writing_splits_index = 0 enqueue_chunk_before_retry = false while writing_splits_index < splits.size - chunk = get_next_chunk.call + chunk, errors = get_next_chunk.call chunk.synchronize do raise ShouldRetry unless chunk.writable? staged_chunk_used = true if chunk.staged? @@ -851,15 +850,18 @@ def write_step_by_step(metadata, data, format, splits_count, &block) raise end - block.call(chunk, chunk.bytesize - original_bytesize, errors) - errors = [] + modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize end end + modified_chunks.each do |data| + block.call(data[:chunk], data[:adding_bytesize], data[:errors]) + end rescue ShouldRetry - modified_chunks.each do |mc| - mc.rollback rescue nil - if mc.unstaged? - mc.purge rescue nil + modified_chunks.each do |data| + chunk = data[:chunk] + chunk.rollback rescue nil + if chunk.unstaged? + chunk.purge rescue nil end end enqueue_chunk(metadata) if enqueue_chunk_before_retry From 856d164252c74cd39c8c8a664815408308a04dc1 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 12 Dec 2023 13:17:56 +0900 Subject: [PATCH 2/3] Add a test for issue #3089 https://github.com/fluent/fluentd/issues/3089 Signed-off-by: Takuro Ashie --- test/plugin/test_buffer.rb | 51 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index d35d2d6ce7..451358a5b1 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -850,6 +850,57 @@ def create_chunk_es(metadata, es) test '#compress returns :text' do assert_equal :text, @p.compress end + + # https://github.com/fluent/fluentd/issues/3089 + test "closed chunk should not be committed" do + assert_equal 8 * 1024 * 1024, @p.chunk_limit_size + assert_equal 0.95, @p.chunk_full_threshold + + purge_count = 0 + + stub.proxy(@p).generate_chunk(anything) do |chunk| + stub.proxy(chunk).purge do |result| + purge_count += 1 + result + end + stub.proxy(chunk).commit do |result| + assert_false(chunk.closed?) + result + end + stub.proxy(chunk).rollback do |result| + assert_false(chunk.closed?) + result + end + chunk + end + + m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) + small_row = "x" * 1024 * 400 + big_row = "x" * 1024 * 1024 * 8 # just `chunk_size_limit`, it does't cause BufferOverFlowError. + + # Write 42 events in 1 event stream, last one is for triggering `ShouldRetry` + @p.write({m => [small_row] * 40 + [big_row] + ["x"]}) + + # Above event strem will be splitted twice by `Buffer#write_step_by_step` + # + # 1. `write_once`: 42 [events] * 1 [stream] + # 2. `write_step_by_step`: 4 [events]* 10 [streams] + 2 [events] * 1 [stream] + # 3. `write_step_by_step` (by `ShouldRetry`): 1 [event] * 42 [streams] + # + # The problematic data is built in the 2nd stage. + # In the 2nd stage, 5 streams are packed in a chunk. + # ((1024 * 400) [bytes] * 4 [events] * 5 [streams] = 8192000 [bytes] < `chunk_limit_size` (8MB)). + # So 3 chunks are used to store all data. + # The 1st chunk is already staged by `write_once`. + # The 2nd & 3rd chunks are newly created as unstaged. + # The 3rd chunk is purged before `ShouldRetry`, it's no problem: + # https://github.com/fluent/fluentd/blob/7e9eba736ff40ad985341be800ddc46558be75f2/lib/fluent/plugin/buffer.rb#L850 + # The 2nd chunk is purged in `rescue ShouldRetry`: + # https://github.com/fluent/fluentd/blob/7e9eba736ff40ad985341be800ddc46558be75f2/lib/fluent/plugin/buffer.rb#L862 + # It causes the issue described in https://github.com/fluent/fluentd/issues/3089#issuecomment-1811839198 + + assert_equal 2, purge_count + end end sub_test_case 'standard format with configuration for test with lower chunk limit size' do From 8cf683f1ba01a86f41718952322fc179ff25533a Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Wed, 13 Dec 2023 18:28:12 +0900 Subject: [PATCH 3/3] buffer: Simplify get_next_chunk Signed-off-by: Takuro Ashie --- lib/fluent/plugin/buffer.rb | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 1f8ccec102..26cdefc8ae 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -748,23 +748,23 @@ def write_step_by_step(metadata, data, format, splits_count, &block) modified_chunks = [] modified_metadata = metadata get_next_chunk = ->(){ - c = if staged_chunk_used - # Staging new chunk here is bad idea: - # Recovering whole state including newly staged chunks is much harder than current implementation. - modified_metadata = modified_metadata.dup_next - generate_chunk(modified_metadata) - else - synchronize { @stage[modified_metadata] ||= generate_chunk(modified_metadata).staged! } - end - modified_chunks << {chunk: c, adding_bytesize: 0, errors: []} - return c, modified_chunks.last[:errors] + if staged_chunk_used + # Staging new chunk here is bad idea: + # Recovering whole state including newly staged chunks is much harder than current implementation. + modified_metadata = modified_metadata.dup_next + generate_chunk(modified_metadata) + else + synchronize { @stage[modified_metadata] ||= generate_chunk(modified_metadata).staged! } + end } writing_splits_index = 0 enqueue_chunk_before_retry = false while writing_splits_index < splits.size - chunk, errors = get_next_chunk.call + chunk = get_next_chunk.call + errors = [] + modified_chunks << {chunk: chunk, adding_bytesize: 0, errors: errors} chunk.synchronize do raise ShouldRetry unless chunk.writable? staged_chunk_used = true if chunk.staged?