From f6808ea871c13b2958f4712ba4f7a1ab13541d7b Mon Sep 17 00:00:00 2001 From: Shizuo Fujita Date: Wed, 18 Jun 2025 16:19:22 +0900 Subject: [PATCH] in_forward: enable skip_invalid_event by default Signed-off-by: Shizuo Fujita --- lib/fluent/plugin/in_forward.rb | 2 +- test/plugin/test_in_forward.rb | 8 +------- test/plugin/test_out_forward.rb | 4 ++-- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 0c2216883c..489c474261 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -55,7 +55,7 @@ class ForwardInput < Input desc 'Received chunk is dropped if it is larger than this value.' config_param :chunk_size_limit, :size, default: nil desc 'Skip an event if incoming event is invalid.' - config_param :skip_invalid_event, :bool, default: false + config_param :skip_invalid_event, :bool, default: true desc "The field name of the client's source address." config_param :source_address_key, :string, default: nil diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index ab014ce049..48546d4e32 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -580,9 +580,6 @@ def create_driver(conf=base_config) end chunk = ["tag1", entries, { 'compressed' => 'gzip' }].to_msgpack - # check CompressedMessagePackEventStream is created - mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0, compress: :gzip) - d.run do Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj| option = d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK) @@ -604,13 +601,10 @@ def create_driver(conf=base_config) entries = '' events.each do |_tag, _time, record| v = [_time, record].to_msgpack - entries << compress(v) + entries << compress(v, type: :zstd) end chunk = ["tag1", entries, { 'compressed' => 'zstd' }].to_msgpack - # check CompressedMessagePackEventStream is created - mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0, compress: :zstd) - d.run do Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj| option = d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK) diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index f59e9372f3..3ec2f321f1 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -541,7 +541,7 @@ def try_write(chunk) end test 'send_compressed_message_pack_stream_if_compress_is_gzip' do - target_input_driver = create_target_input_driver + target_input_driver = create_target_input_driver(conf: target_config + "skip_invalid_event false") @d = d = create_driver(config + %[ flush_interval 1s @@ -571,7 +571,7 @@ def try_write(chunk) end test 'send_compressed_message_pack_stream_if_compress_is_zstd' do - target_input_driver = create_target_input_driver + target_input_driver = create_target_input_driver(conf: target_config + "skip_invalid_event false") @d = d = create_driver(config + %[ flush_interval 1s