Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer: Avoid to process discarded chunks in write_step_by_step (for v1.16) #4363

Merged
merged 3 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,6 @@ def write_once(metadata, data, format: nil, size: nil, &block)

def write_step_by_step(metadata, data, format, splits_count, &block)
splits = []
errors = []
if splits_count > data.size
splits_count = data.size
end
Expand All @@ -749,23 +748,23 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
modified_chunks = []
modified_metadata = metadata
get_next_chunk = ->(){
c = if staged_chunk_used
# Staging new chunk here is bad idea:
# Recovering whole state including newly staged chunks is much harder than current implementation.
modified_metadata = modified_metadata.dup_next
generate_chunk(modified_metadata)
else
synchronize { @stage[modified_metadata] ||= generate_chunk(modified_metadata).staged! }
end
modified_chunks << c
c
if staged_chunk_used
# Staging new chunk here is bad idea:
# Recovering whole state including newly staged chunks is much harder than current implementation.
modified_metadata = modified_metadata.dup_next
generate_chunk(modified_metadata)
else
synchronize { @stage[modified_metadata] ||= generate_chunk(modified_metadata).staged! }
end
}

writing_splits_index = 0
enqueue_chunk_before_retry = false

while writing_splits_index < splits.size
chunk = get_next_chunk.call
errors = []
modified_chunks << {chunk: chunk, adding_bytesize: 0, errors: errors}
chunk.synchronize do
raise ShouldRetry unless chunk.writable?
staged_chunk_used = true if chunk.staged?
Expand Down Expand Up @@ -851,15 +850,18 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
raise
end

block.call(chunk, chunk.bytesize - original_bytesize, errors)
errors = []
modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize
end
end
modified_chunks.each do |data|
block.call(data[:chunk], data[:adding_bytesize], data[:errors])
end
rescue ShouldRetry
modified_chunks.each do |mc|
mc.rollback rescue nil
if mc.unstaged?
mc.purge rescue nil
modified_chunks.each do |data|
chunk = data[:chunk]
chunk.rollback rescue nil
if chunk.unstaged?
chunk.purge rescue nil
end
end
enqueue_chunk(metadata) if enqueue_chunk_before_retry
Expand Down
51 changes: 51 additions & 0 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,57 @@ def create_chunk_es(metadata, es)
test '#compress returns :text' do
assert_equal :text, @p.compress
end

# https://github.com/fluent/fluentd/issues/3089
test "closed chunk should not be committed" do
assert_equal 8 * 1024 * 1024, @p.chunk_limit_size
assert_equal 0.95, @p.chunk_full_threshold

purge_count = 0

stub.proxy(@p).generate_chunk(anything) do |chunk|
stub.proxy(chunk).purge do |result|
purge_count += 1
result
end
stub.proxy(chunk).commit do |result|
assert_false(chunk.closed?)
result
end
stub.proxy(chunk).rollback do |result|
assert_false(chunk.closed?)
result
end
chunk
end

m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
small_row = "x" * 1024 * 400
big_row = "x" * 1024 * 1024 * 8 # just `chunk_size_limit`, it does't cause BufferOverFlowError.

# Write 42 events in 1 event stream, last one is for triggering `ShouldRetry`
@p.write({m => [small_row] * 40 + [big_row] + ["x"]})

# Above event strem will be splitted twice by `Buffer#write_step_by_step`
#
# 1. `write_once`: 42 [events] * 1 [stream]
# 2. `write_step_by_step`: 4 [events]* 10 [streams] + 2 [events] * 1 [stream]
# 3. `write_step_by_step` (by `ShouldRetry`): 1 [event] * 42 [streams]
#
# The problematic data is built in the 2nd stage.
# In the 2nd stage, 5 streams are packed in a chunk.
# ((1024 * 400) [bytes] * 4 [events] * 5 [streams] = 8192000 [bytes] < `chunk_limit_size` (8MB)).
# So 3 chunks are used to store all data.
# The 1st chunk is already staged by `write_once`.
# The 2nd & 3rd chunks are newly created as unstaged.
# The 3rd chunk is purged before `ShouldRetry`, it's no problem:
# https://github.com/fluent/fluentd/blob/7e9eba736ff40ad985341be800ddc46558be75f2/lib/fluent/plugin/buffer.rb#L850
# The 2nd chunk is purged in `rescue ShouldRetry`:
# https://github.com/fluent/fluentd/blob/7e9eba736ff40ad985341be800ddc46558be75f2/lib/fluent/plugin/buffer.rb#L862
# It causes the issue described in https://github.com/fluent/fluentd/issues/3089#issuecomment-1811839198

assert_equal 2, purge_count
end
end

sub_test_case 'standard format with configuration for test with lower chunk limit size' do
Expand Down
Loading