From 4372698b2e1c870a528156984d73e3c71bafc414 Mon Sep 17 00:00:00 2001
From: Takuro Ashie <ashie@clear-code.com>
Date: Fri, 8 Mar 2024 18:53:35 +0900
Subject: [PATCH] in_tail: Manage tail watchers that are `rorate_wait` state
 too (#4334)

After a tail watcher transitions to `rotate_wait` state, the
`rotate_wait` timer is no longer managed by in_tail, it might cause
unexpected behaviour. e.g.)

* It's never unwatched when shutdown occurs before `rotate_wait` passed.
* Needless `rotate_wait` timers are executed when it detects more
  rotations.

This patch fixes such unexpected behaviour.

Note: The comment about `detach_watcher` was added in 76f246ae6a5a543c2b302b1a1f61a4223be177eb.
At that time, closing was done by event-loop.
Now, the situation is completely different, so it should be removed.

---------

Signed-off-by: Takuro Ashie <ashie@clear-code.com>
Co-authored-by: Daijiro Fukuda <fukuda@clear-code.com>
---
 lib/fluent/plugin/in_tail.rb |  24 +++--
 test/plugin/test_in_tail.rb  | 169 ++++++++++++++++++++++++++++++++++-
 2 files changed, 184 insertions(+), 9 deletions(-)

diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb
index 44761b4061..7e0289f65b 100644
--- a/lib/fluent/plugin/in_tail.rb
+++ b/lib/fluent/plugin/in_tail.rb
@@ -52,6 +52,7 @@ def initialize
       super
       @paths = []
       @tails = {}
+      @tails_rotate_wait = {}
       @pf_file = nil
       @pf = nil
       @ignore_list = []
@@ -267,6 +268,9 @@ def shutdown
       @shutdown_start_time = Fluent::Clock.now
       # during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close.
       stop_watchers(existence_path, immediate: true, remove_watcher: false)
+      @tails_rotate_wait.keys.each do |tw|
+        detach_watcher(tw, @tails_rotate_wait[tw][:ino], false)
+      end
       @pf_file.close if @pf_file
 
       super
@@ -275,6 +279,7 @@ def shutdown
     def close
       super
       # close file handles after all threads stopped (in #close of thread plugin helper)
+      # It may be because we need to wait IOHanlder.ready_to_shutdown()
       close_watcher_handles
     end
 
@@ -516,6 +521,9 @@ def close_watcher_handles
           tw.close
         end
       end
+      @tails_rotate_wait.keys.each do |tw|
+        tw.close
+      end
     end
 
     # refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
@@ -570,10 +578,6 @@ def update_watcher(tail_watcher, pe, new_inode)
       detach_watcher_after_rotate_wait(tail_watcher, pe.read_inode)
     end
 
-    # TailWatcher#close is called by another thread at shutdown phase.
-    # It causes 'can't modify string; temporarily locked' error in IOHandler
-    # so adding close_io argument to avoid this problem.
-    # At shutdown, IOHandler's io will be released automatically after detached the event loop
     def detach_watcher(tw, ino, close_io = true)
       if @follow_inodes && tw.ino != ino
         log.warn("detach_watcher could be detaching an unexpected tail_watcher with a different ino.",
@@ -604,7 +608,11 @@ def detach_watcher_after_rotate_wait(tw, ino)
       if @open_on_every_update
         # Detach now because it's already closed, waiting it doesn't make sense.
         detach_watcher(tw, ino)
-      elsif throttling_is_enabled?(tw)
+      end
+
+      return if @tails_rotate_wait[tw]
+
+      if throttling_is_enabled?(tw)
         # When the throttling feature is enabled, it might not reach EOF yet.
         # Should ensure to read all contents before closing it, with keeping throttling.
         start_time_to_wait = Fluent::Clock.now
@@ -612,14 +620,18 @@ def detach_watcher_after_rotate_wait(tw, ino)
           elapsed = Fluent::Clock.now - start_time_to_wait
           if tw.eof? && elapsed >= @rotate_wait
             timer.detach
+            @tails_rotate_wait.delete(tw)
             detach_watcher(tw, ino)
           end
         end
+        @tails_rotate_wait[tw] = { ino: ino, timer: timer }
       else
         # when the throttling feature isn't enabled, just wait @rotate_wait
-        timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
+        timer = timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
+          @tails_rotate_wait.delete(tw)
           detach_watcher(tw, ino)
         end
+        @tails_rotate_wait[tw] = { ino: ino, timer: timer }
       end
     end
 
diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb
index 8919866683..58006c0b98 100644
--- a/test/plugin/test_in_tail.rb
+++ b/test/plugin/test_in_tail.rb
@@ -3016,6 +3016,92 @@ def test_path_resurrection
         },
       )
     end
+
+    def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait
+      config = config_element(
+        "ROOT",
+        "",
+        {
+          "path" => "#{@tmp_dir}/tail.txt*",
+          "pos_file" => "#{@tmp_dir}/tail.pos",
+          "tag" => "t1",
+          "format" => "none",
+          "read_from_head" => "true",
+          "follow_inodes" => "true",
+          "rotate_wait" => "3s",
+          "refresh_interval" => "1h",
+          # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file,
+          # so disable it in order to reproduce the same condition stably.
+          "enable_stat_watcher" => "false",
+        }
+      )
+      d = create_driver(config, false)
+
+      tail_watchers = []
+      stub.proxy(d.instance).setup_watcher do |tw|
+        tail_watchers.append(tw)
+        mock.proxy(tw).close.once # Note: Currently, there is no harm in duplicate calls.
+        tw
+      end
+
+      Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file1 log1"}
+
+      d.run(expect_records: 6, timeout: 15) do
+        Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"}
+
+        sleep 1.5 # Need to be larger than 1s (the interval of watch_timer)
+
+        FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1")
+        Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"}
+        Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"}
+
+        sleep 1.5 # Need to be larger than 1s (the interval of watch_timer)
+
+        # Rotate again (Old TailWatcher waiting rotate_wait also calls update_watcher)
+        [1, 0].each do |i|
+          FileUtils.move("#{@tmp_dir}/tail.txt#{i}", "#{@tmp_dir}/tail.txt#{i + 1}")
+        end
+        Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file3 log1"}
+        Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"}
+
+        # Wait rotate_wait to confirm that TailWatcher.close is not called in duplicate.
+        # (Note: Currently, there is no harm in duplicate calls)
+        sleep 4
+      end
+
+      inode_0 = tail_watchers[0]&.ino
+      inode_1 = tail_watchers[1]&.ino
+      inode_2 = tail_watchers[2]&.ino
+      record_values = d.events.collect { |event| event[2]["message"] }.sort
+      position_entries = []
+      Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
+        f.readlines(chomp: true).each do |line|
+          values = line.split("\t")
+          position_entries.append([values[0], values[1], values[2].to_i(16)])
+        end
+      end
+
+      assert_equal(
+        {
+          record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2", "file3 log1", "file3 log2"],
+          tail_watcher_paths: ["#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0"],
+          tail_watcher_inodes: [inode_0, inode_1, inode_2],
+          tail_watcher_io_handler_opened_statuses: [false, false, false],
+          position_entries: [
+            ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_0],
+            ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_1],
+            ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_2],
+          ],
+        },
+        {
+          record_values: record_values,
+          tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
+          tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
+          tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
+          position_entries: position_entries
+        },
+      )
+    end
   end
 
   sub_test_case "Update watchers for rotation without follow_inodes" do
@@ -3084,9 +3170,6 @@ def test_refreshTW_during_rotation
         sleep 3
 
         Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"}
-
-        # Wait `rotate_wait` for file2 to make sure to close all IO handlers
-        sleep 3
       end
 
       inode_0 = tail_watchers[0]&.ino
@@ -3121,5 +3204,85 @@ def test_refreshTW_during_rotation
         },
       )
     end
+
+    def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait
+      config = config_element(
+        "ROOT",
+        "",
+        {
+          "path" => "#{@tmp_dir}/tail.txt0",
+          "pos_file" => "#{@tmp_dir}/tail.pos",
+          "tag" => "t1",
+          "format" => "none",
+          "read_from_head" => "true",
+          "rotate_wait" => "3s",
+          "refresh_interval" => "1h",
+        }
+      )
+      d = create_driver(config, false)
+
+      tail_watchers = []
+      stub.proxy(d.instance).setup_watcher do |tw|
+        tail_watchers.append(tw)
+        mock.proxy(tw).close.once # Note: Currently, there is no harm in duplicate calls.
+        tw
+      end
+
+      Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file1 log1"}
+
+      d.run(expect_records: 6, timeout: 15) do
+        Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"}
+
+        sleep 1.5 # Need to be larger than 1s (the interval of watch_timer)
+
+        FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1")
+        Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"}
+        Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"}
+
+        sleep 1.5 # Need to be larger than 1s (the interval of watch_timer)
+
+        # Rotate again (Old TailWatcher waiting rotate_wait also calls update_watcher)
+        [1, 0].each do |i|
+          FileUtils.move("#{@tmp_dir}/tail.txt#{i}", "#{@tmp_dir}/tail.txt#{i + 1}")
+        end
+        Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file3 log1"}
+        Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"}
+
+        # Wait rotate_wait to confirm that TailWatcher.close is not called in duplicate.
+        # (Note: Currently, there is no harm in duplicate calls)
+        sleep 4
+      end
+
+      inode_0 = tail_watchers[0]&.ino
+      inode_1 = tail_watchers[1]&.ino
+      inode_2 = tail_watchers[2]&.ino
+      record_values = d.events.collect { |event| event[2]["message"] }.sort
+      position_entries = []
+      Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
+        f.readlines(chomp: true).each do |line|
+          values = line.split("\t")
+          position_entries.append([values[0], values[1], values[2].to_i(16)])
+        end
+      end
+
+      assert_equal(
+        {
+          record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2", "file3 log1", "file3 log2"],
+          tail_watcher_paths: ["#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0"],
+          tail_watcher_inodes: [inode_0, inode_1, inode_2],
+          tail_watcher_io_handler_opened_statuses: [false, false, false],
+          position_entries: [
+            ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_2],
+          ],
+        },
+        {
+          record_values: record_values,
+          tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
+          tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
+          tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
+          position_entries: position_entries
+        },
+      )
+    end
   end
 end