From 61142abee46a273c4544ed14261573da4b7bc359 Mon Sep 17 00:00:00 2001 From: guyboertje Date: Sat, 5 Dec 2015 07:34:15 +0000 Subject: [PATCH] Introduce close_older and ignore_older config options begin to add ignore_after config option depends on filewatch 0.6.8 add timed_out method to TailListener and specs revert Gemfile changes for PR split and rename ignore_after option, update gem deps to published ver. bump ver, make suggested review changes and improve spec reliability update changelog Fixes #81, Fixes #89, Fixes #90 Fixes #87 --- CHANGELOG.md | 6 + Gemfile | 2 +- lib/logstash/inputs/file.rb | 112 +++++-- logstash-input-file.gemspec | 8 +- spec/inputs/file_spec.rb | 562 +++++++++++++++++++++--------------- spec/spec_helper.rb | 79 +++++ 6 files changed, 505 insertions(+), 264 deletions(-) create mode 100644 spec/spec_helper.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c69f52..bbbb2b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 2.1.0 + - Implement new config options: ignore_older and close_older. When close_older is set, any buffered data will be flushed. + - Fixes [#81](https://github.com/logstash-plugins/logstash-input-file/issues/81) + - Fixes [#81](https://github.com/logstash-plugins/logstash-input-file/issues/89) + - Fixes [#81](https://github.com/logstash-plugins/logstash-input-file/issues/90) + ## 2.0.3 - Implement Stream Identity mapping of codecs: distinct codecs will collect input per stream identity (filename) diff --git a/Gemfile b/Gemfile index d926697..851fabc 100644 --- a/Gemfile +++ b/Gemfile @@ -1,2 +1,2 @@ source 'https://rubygems.org' -gemspec \ No newline at end of file +gemspec diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index 1498176..bc34d14 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -62,6 +62,22 @@ # to the rotation and its reopening under the new name (an interval # determined by the `stat_interval` and `discover_interval` options) # will not get picked up. + +class LogStash::Codecs::Base + # TODO - move this to core + if !method_defined?(:accept) + def accept(listener) + decode(listener.data) do |event| + listener.process_event(event) + end + end + end + if !method_defined?(:auto_flush) + def auto_flush + end + end +end + class LogStash::Inputs::File < LogStash::Inputs::Base config_name "file" @@ -119,6 +135,17 @@ class LogStash::Inputs::File < LogStash::Inputs::Base # set the new line delimiter, defaults to "\n" config :delimiter, :validate => :string, :default => "\n" + # If this option is specified, when the file input discovers a file that + # was last modified before the specified timespan in seconds, the file is + # ignored. After it's discovery, if an ignored file is modified it is no + # longer ignored and any new data is read. The default is 24 hours. + config :ignore_older, :validate => :number, :default => 24 * 60 * 60 + + # If this option is specified, the file input closes any files that remain + # unmodified for longer than the specified timespan in seconds. + # The default is 1 hour + config :close_older, :validate => :number, :default => 1 * 60 * 60 + public def register require "addressable/uri" @@ -133,7 +160,8 @@ def register :discover_interval => @discover_interval, :sincedb_write_interval => @sincedb_write_interval, :delimiter => @delimiter, - :logger => @logger, + :ignore_older => @ignore_older, + :close_older => @close_older } @path.each do |path| @@ -184,44 +212,84 @@ def register @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) end # def register + class ListenerTail + # use attr_reader to define noop methods + attr_reader :input, :path, :data + attr_reader :deleted, :created, :error, :eof + + # construct with upstream state + def initialize(path, input) + @path, @input = path, input + end + + def timed_out + input.codec.evict(path) + end + + def accept(data) + # and push transient data filled dup listener downstream + input.log_line_received(path, data) + input.codec.accept(dup_adding_state(data)) + end + + def process_event(event) + event["[@metadata][path]"] = path + event["path"] = path if !event.include?("path") + input.post_process_this(event) + end + + def add_state(data) + @data = data + self + end + + private + + # duplicate and add state for downstream + def dup_adding_state(line) + self.class.new(path, input).add_state(line) + end + end + + def listener_for(path) + # path is the identity + ListenerTail.new(path, self) + end + def begin_tailing - stop # if the pipeline restarts this input. - @tail = FileWatch::Tail.new(@tail_config) + # if the pipeline restarts this input, + # make sure previous files are closed + stop + # use observer listener api + @tail = FileWatch::Tail.new_observing(@tail_config) @tail.logger = @logger @path.each { |path| @tail.tail(path) } end def run(queue) begin_tailing - @tail.subscribe do |path, line| - log_line_received(path, line) - @codec.decode(line, path) do |event| - # path is the identity - # Note: this block is cached in the - # identity_map_codec for use when - # buffered lines are flushed. - queue << add_path_meta(event, path) - end - end + @queue = queue + @tail.subscribe(self) end # def run + def post_process_this(event) + event["host"] = @host if !event.include?("host") + decorate(event) + @queue << event + end + def log_line_received(path, line) return if !@logger.debug? @logger.debug("Received line", :path => path, :text => line) end - def add_path_meta(event, path) - event["[@metadata][path]"] = path - event["host"] = @host if !event.include?("host") - event["path"] = path if !event.include?("path") - decorate(event) - event - end - def stop # in filewatch >= 0.6.7, quit will closes and forget all files # but it will write their last read positions to since_db # beforehand - @tail.quit if @tail + if @tail + @codec.close + @tail.quit + end end end # class LogStash::Inputs::File diff --git a/logstash-input-file.gemspec b/logstash-input-file.gemspec index a7cb459..8eb9f02 100644 --- a/logstash-input-file.gemspec +++ b/logstash-input-file.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-file' - s.version = '2.0.3' + s.version = '2.1.0' s.licenses = ['Apache License (2.0)'] s.summary = "Stream events from files." s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program" @@ -20,12 +20,12 @@ Gem::Specification.new do |s| s.metadata = { "logstash_plugin" => "true", "logstash_group" => "input" } # Gem dependencies - s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0" + s.add_runtime_dependency "logstash-core", ">= 2.0.0", "< 3.0.0.alpha0" s.add_runtime_dependency 'logstash-codec-plain' s.add_runtime_dependency 'addressable' - s.add_runtime_dependency 'filewatch', ['>= 0.6.7', '~> 0.6'] - s.add_runtime_dependency 'logstash-codec-multiline', ['~> 2.0.3'] + s.add_runtime_dependency 'filewatch', ['>= 0.7.0', '~> 0.7'] + s.add_runtime_dependency 'logstash-codec-multiline', ['~> 2.0.5'] s.add_development_dependency 'stud', ['~> 0.0.19'] s.add_development_dependency 'logstash-devutils' diff --git a/spec/inputs/file_spec.rb b/spec/inputs/file_spec.rb index 79467b6..cfe86b7 100644 --- a/spec/inputs/file_spec.rb +++ b/spec/inputs/file_spec.rb @@ -1,306 +1,394 @@ # encoding: utf-8 -require "logstash/devutils/rspec/spec_helper" +require "logstash/inputs/file" +require_relative "../spec_helper" require "tempfile" require "stud/temporary" -require "logstash/inputs/file" +require "logstash/codecs/multiline" FILE_DELIMITER = LogStash::Environment.windows? ? "\r\n" : "\n" describe LogStash::Inputs::File do + describe "testing with input(conf) do |pipeline, queue|" do + it_behaves_like "an interruptible input plugin" do + let(:config) do + { + "path" => Stud::Temporary.pathname, + "sincedb_path" => Stud::Temporary.pathname + } + end + end - before(:all) do - @abort_on_exception = Thread.abort_on_exception - Thread.abort_on_exception = true - end + it "should start at the beginning of an existing file" do + tmpfile_path = Stud::Temporary.pathname + sincedb_path = Stud::Temporary.pathname - after(:all) do - Thread.abort_on_exception = @abort_on_exception - end + conf = <<-CONFIG + input { + file { + type => "blah" + path => "#{tmpfile_path}" + start_position => "beginning" + sincedb_path => "#{sincedb_path}" + delimiter => "#{FILE_DELIMITER}" + } + } + CONFIG - it_behaves_like "an interruptible input plugin" do - let(:config) do - { - "path" => Stud::Temporary.pathname, - "sincedb_path" => Stud::Temporary.pathname - } - end - end + File.open(tmpfile_path, "a") do |fd| + fd.puts("hello") + fd.puts("world") + fd.fsync + end - it "should starts at the end of an existing file" do - tmpfile_path = Stud::Temporary.pathname - sincedb_path = Stud::Temporary.pathname - - conf = <<-CONFIG - input { - file { - type => "blah" - path => "#{tmpfile_path}" - sincedb_path => "#{sincedb_path}" - delimiter => "#{FILE_DELIMITER}" - } - } - CONFIG + events = input(conf) do |pipeline, queue| + 2.times.collect { queue.pop } + end - File.open(tmpfile_path, "w") do |fd| - fd.puts("ignore me 1") - fd.puts("ignore me 2") + insist { events[0]["message"] } == "hello" + insist { events[1]["message"] } == "world" end - events = input(conf) do |pipeline, queue| + it "should restarts at the sincedb value" do + tmpfile_path = Stud::Temporary.pathname + sincedb_path = Stud::Temporary.pathname + + conf = <<-CONFIG + input { + file { + type => "blah" + path => "#{tmpfile_path}" + start_position => "beginning" + sincedb_path => "#{sincedb_path}" + delimiter => "#{FILE_DELIMITER}" + } + } + CONFIG - # at this point the plugins - # threads might still be initializing so we cannot know when the - # file plugin will have seen the original file, it could see it - # after the first(s) hello world appends below, hence the - # retry logic. + File.open(tmpfile_path, "w") do |fd| + fd.puts("hello3") + fd.puts("world3") + end - events = [] + events = input(conf) do |pipeline, queue| + 2.times.collect { queue.pop } + end - retries = 0 - while retries < 20 - File.open(tmpfile_path, "a") do |fd| - fd.puts("hello") - fd.puts("world") - end + insist { events[0]["message"] } == "hello3" + insist { events[1]["message"] } == "world3" - if queue.size >= 2 - events = 2.times.collect { queue.pop } - break - end + File.open(tmpfile_path, "a") do |fd| + fd.puts("foo") + fd.puts("bar") + fd.puts("baz") + fd.fsync + end - sleep(0.1) - retries += 1 + events = input(conf) do |pipeline, queue| + 3.times.collect { queue.pop } end - events + insist { events[0]["message"] } == "foo" + insist { events[1]["message"] } == "bar" + insist { events[2]["message"] } == "baz" end - insist { events[0]["message"] } == "hello" - insist { events[1]["message"] } == "world" - end + it "should not overwrite existing path and host fields" do + tmpfile_path = Stud::Temporary.pathname + sincedb_path = Stud::Temporary.pathname - it "should start at the beginning of an existing file" do - tmpfile_path = Stud::Temporary.pathname - sincedb_path = Stud::Temporary.pathname - - conf = <<-CONFIG - input { - file { - type => "blah" - path => "#{tmpfile_path}" - start_position => "beginning" - sincedb_path => "#{sincedb_path}" - delimiter => "#{FILE_DELIMITER}" + conf = <<-CONFIG + input { + file { + type => "blah" + path => "#{tmpfile_path}" + start_position => "beginning" + sincedb_path => "#{sincedb_path}" + delimiter => "#{FILE_DELIMITER}" + codec => "json" + } } - } - CONFIG - - File.open(tmpfile_path, "a") do |fd| - fd.puts("hello") - fd.puts("world") - end + CONFIG - events = input(conf) do |pipeline, queue| - 2.times.collect { queue.pop } - end + File.open(tmpfile_path, "w") do |fd| + fd.puts('{"path": "my_path", "host": "my_host"}') + fd.puts('{"my_field": "my_val"}') + fd.fsync + end - insist { events[0]["message"] } == "hello" - insist { events[1]["message"] } == "world" - end + events = input(conf) do |pipeline, queue| + 2.times.collect { queue.pop } + end - it "should restarts at the sincedb value" do - tmpfile_path = Stud::Temporary.pathname - sincedb_path = Stud::Temporary.pathname - - conf = <<-CONFIG - input { - file { - type => "blah" - path => "#{tmpfile_path}" - start_position => "beginning" - sincedb_path => "#{sincedb_path}" - delimiter => "#{FILE_DELIMITER}" - } - } - CONFIG + insist { events[0]["path"] } == "my_path" + insist { events[0]["host"] } == "my_host" - File.open(tmpfile_path, "w") do |fd| - fd.puts("hello3") - fd.puts("world3") + insist { events[1]["path"] } == "#{tmpfile_path}" + insist { events[1]["host"] } == "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}" end - events = input(conf) do |pipeline, queue| - 2.times.collect { queue.pop } - end + context "when sincedb_path is an existing directory" do + let(:tmpfile_path) { Stud::Temporary.pathname } + let(:sincedb_path) { Stud::Temporary.directory } + subject { LogStash::Inputs::File.new("path" => tmpfile_path, "sincedb_path" => sincedb_path) } - insist { events[0]["message"] } == "hello3" - insist { events[1]["message"] } == "world3" + after :each do + FileUtils.rm_rf(sincedb_path) + end - File.open(tmpfile_path, "a") do |fd| - fd.puts("foo") - fd.puts("bar") - fd.puts("baz") + it "should raise exception" do + expect { subject.register }.to raise_error(ArgumentError) + end end + end - events = input(conf) do |pipeline, queue| - 3.times.collect { queue.pop } - end + describe "testing with new, register, run and stop" do + let(:conf) { Hash.new } + let(:mlconf) { Hash.new } + let(:events) { Array.new } + let(:mlcodec) { LogStash::Codecs::Multiline.new(mlconf) } + let(:codec) { CodecTracer.new } + let(:tmpfile_path) { Stud::Temporary.pathname } + let(:sincedb_path) { Stud::Temporary.pathname } + let(:tmpdir_path) { Stud::Temporary.directory } - insist { events[0]["message"] } == "foo" - insist { events[1]["message"] } == "bar" - insist { events[2]["message"] } == "baz" - end + context "when data exists and then more data is appended" do + subject { described_class.new(conf) } - it "should not overwrite existing path and host fields" do - tmpfile_path = Stud::Temporary.pathname - sincedb_path = Stud::Temporary.pathname - - conf = <<-CONFIG - input { - file { - type => "blah" - path => "#{tmpfile_path}" - start_position => "beginning" - sincedb_path => "#{sincedb_path}" - delimiter => "#{FILE_DELIMITER}" - codec => "json" - } - } - CONFIG + before do + File.open(tmpfile_path, "w") do |fd| + fd.puts("ignore me 1") + fd.puts("ignore me 2") + fd.fsync + end + mlconf.update("pattern" => "^\s", "what" => "previous") + conf.update("type" => "blah", + "path" => tmpfile_path, + "sincedb_path" => sincedb_path, + "stat_interval" => 0.1, + "codec" => mlcodec, + "delimiter" => FILE_DELIMITER) + subject.register + Thread.new { subject.run(events) } + end - File.open(tmpfile_path, "w") do |fd| - fd.puts('{"path": "my_path", "host": "my_host"}') - fd.puts('{"my_field": "my_val"}') + it "reads the appended data only" do + sleep 0.1 + File.open(tmpfile_path, "a") do |fd| + fd.puts("hello") + fd.puts("world") + fd.fsync + end + # wait for one event, the last line is buffered + expect(pause_until{ events.size == 1 }).to be_truthy + subject.stop + # stop flushes the second event + expect(pause_until{ events.size == 2 }).to be_truthy + + event1 = events[0] + expect(event1).not_to be_nil + expect(event1["path"]).to eq tmpfile_path + expect(event1["@metadata"]["path"]).to eq tmpfile_path + expect(event1["message"]).to eq "hello" + + event2 = events[1] + expect(event2).not_to be_nil + expect(event2["path"]).to eq tmpfile_path + expect(event2["@metadata"]["path"]).to eq tmpfile_path + expect(event2["message"]).to eq "world" + end end - events = input(conf) do |pipeline, queue| - 2.times.collect { queue.pop } - end + context "when close_older config is specified" do + let(:line) { "line1.1-of-a" } - insist { events[0]["path"] } == "my_path" - insist { events[0]["host"] } == "my_host" + subject { described_class.new(conf) } - insist { events[1]["path"] } == "#{tmpfile_path}" - insist { events[1]["host"] } == "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}" - end + before do + conf.update( + "type" => "blah", + "path" => "#{tmpdir_path}/*.log", + "sincedb_path" => sincedb_path, + "stat_interval" => 0.02, + "codec" => codec, + "close_older" => 1, + "delimiter" => FILE_DELIMITER) - context "when sincedb_path is an existing directory" do - let(:tmpfile_path) { Stud::Temporary.pathname } - let(:sincedb_path) { Stud::Temporary.directory } - subject { LogStash::Inputs::File.new("path" => tmpfile_path, "sincedb_path" => sincedb_path) } + subject.register + Thread.new { subject.run(events) } + end - after :each do - FileUtils.rm_rf(sincedb_path) + it "having timed_out, the identity is evicted" do + sleep 0.1 + File.open("#{tmpdir_path}/a.log", "a") do |fd| + fd.puts(line) + fd.fsync + end + expect(pause_until{ subject.codec.identity_count == 1 }).to be_truthy + expect(codec).to receive_call_and_args(:accept, [true]) + # wait for expiry to kick in and close files. + expect(pause_until{ subject.codec.identity_count.zero? }).to be_truthy + expect(codec).to receive_call_and_args(:auto_flush, [true]) + subject.stop + end end - it "should raise exception" do - expect { subject.register }.to raise_error(ArgumentError) - end - end + context "when ignore_older config is specified" do + let(:line) { "line1.1-of-a" } - context "when #run is called multiple times", :unix => true do - let(:tmpdir_path) { Stud::Temporary.directory } - let(:sincedb_path) { Stud::Temporary.pathname } - let(:file_path) { "#{tmpdir_path}/a.log" } - let(:buffer) { [] } - let(:lsof) { [] } - let(:stop_proc) do - lambda do |input, arr| - Thread.new(input, arr) do |i, a| - sleep 0.5 - a << `lsof -p #{Process.pid} | grep "a.log"` - i.stop + subject { described_class.new(conf) } + + before do + File.open("#{tmpdir_path}/a.log", "a") do |fd| + fd.puts(line) + fd.fsync end + sleep 1.1 # wait for file to age + conf.update( + "type" => "blah", + "path" => "#{tmpdir_path}/*.log", + "sincedb_path" => sincedb_path, + "stat_interval" => 0.02, + "codec" => codec, + "ignore_older" => 1, + "delimiter" => FILE_DELIMITER) + + subject.register + Thread.new { subject.run(events) } end - end - subject { LogStash::Inputs::File.new("path" => tmpdir_path + "/*.log", "start_position" => "beginning", "sincedb_path" => sincedb_path) } - - after :each do - FileUtils.rm_rf(tmpdir_path) - FileUtils.rm_rf(sincedb_path) - end - before do - File.open(file_path, "w") do |fd| - fd.puts('foo') - fd.puts('bar') + it "the file is not read" do + sleep 0.5 + subject.stop + expect(codec).to receive_call_and_args(:accept, false) + expect(codec).to receive_call_and_args(:auto_flush, false) + expect(subject.codec.identity_count).to eq(0) end end - it "should only have one set of files open" do - subject.register - lsof_before = `lsof -p #{Process.pid} | grep #{file_path}` - expect(lsof_before).to eq("") - stop_proc.call(subject, lsof) - subject.run(buffer) - expect(lsof.first).not_to eq("") - stop_proc.call(subject, lsof) - subject.run(buffer) - expect(lsof.last).to eq(lsof.first) - end - end - context "when wildcard path and a multiline codec is specified" do - let(:tmpdir_path) { Stud::Temporary.directory } - let(:sincedb_path) { Stud::Temporary.pathname } - let(:conf) do - <<-CONFIG - input { - file { - type => "blah" - path => "#{tmpdir_path}/*.log" - start_position => "beginning" - sincedb_path => "#{sincedb_path}" - delimiter => "#{FILE_DELIMITER}" - codec => multiline { pattern => "^\s" what => previous } - } - } - CONFIG - end + context "when wildcard path and a multiline codec is specified" do + subject { described_class.new(conf) } + let(:writer_proc) do + -> do + File.open("#{tmpdir_path}/a.log", "a") do |fd| + fd.puts("line1.1-of-a") + fd.puts(" line1.2-of-a") + fd.puts(" line1.3-of-a") + fd.fsync + end + File.open("#{tmpdir_path}/z.log", "a") do |fd| + fd.puts("line1.1-of-z") + fd.puts(" line1.2-of-z") + fd.puts(" line1.3-of-z") + fd.fsync + end + end + end - let(:writer_proc) do - -> do - File.open("#{tmpdir_path}/a.log", "a") do |fd| - fd.puts("line1.1-of-a") - fd.puts(" line1.2-of-a") - fd.puts(" line1.3-of-a") - fd.puts("line2.1-of-a") + before do + mlconf.update("pattern" => "^\s", "what" => "previous") + conf.update( + "type" => "blah", + "path" => "#{tmpdir_path}/*.log", + "sincedb_path" => sincedb_path, + "stat_interval" => 0.05, + "codec" => mlcodec, + "delimiter" => FILE_DELIMITER) + + subject.register + Thread.new { subject.run(events) } + sleep 0.1 + writer_proc.call + end + + it "collects separate multiple line events from each file" do + # wait for both paths to be mapped as identities + expect(pause_until{ subject.codec.identity_count == 2 }).to be_truthy + subject.stop + # stop flushes both events + expect(pause_until{ events.size == 2 }).to be_truthy + + e1, e2 = events + e1_message = e1["message"] + e2_message = e2["message"] + + # can't assume File A will be read first + if e1_message.start_with?('line1.1-of-z') + expect(e1["path"]).to match(/z.log/) + expect(e2["path"]).to match(/a.log/) + expect(e1_message).to eq("line1.1-of-z#{FILE_DELIMITER} line1.2-of-z#{FILE_DELIMITER} line1.3-of-z") + expect(e2_message).to eq("line1.1-of-a#{FILE_DELIMITER} line1.2-of-a#{FILE_DELIMITER} line1.3-of-a") + else + expect(e1["path"]).to match(/a.log/) + expect(e2["path"]).to match(/z.log/) + expect(e1_message).to eq("line1.1-of-a#{FILE_DELIMITER} line1.2-of-a#{FILE_DELIMITER} line1.3-of-a") + expect(e2_message).to eq("line1.1-of-z#{FILE_DELIMITER} line1.2-of-z#{FILE_DELIMITER} line1.3-of-z") end + end - File.open("#{tmpdir_path}/z.log", "a") do |fd| - fd.puts("line1.1-of-z") - fd.puts(" line1.2-of-z") - fd.puts(" line1.3-of-z") - fd.puts("line2.1-of-z") + context "if auto_flush is enabled on the multiline codec" do + let(:writer_proc) do + -> do + File.open("#{tmpdir_path}/a.log", "a") do |fd| + fd.puts("line1.1-of-a") + fd.puts(" line1.2-of-a") + fd.puts(" line1.3-of-a") + end + end + end + let(:mlconf) { { "auto_flush_interval" => 1 } } + + it "an event is generated via auto_flush" do + # wait for auto_flush + # without it lines are buffered and pause_until would time out i.e false + expect(pause_until{ events.size == 1 }).to be_truthy + subject.stop + + e1 = events.first + e1_message = e1["message"] + expect(e1["path"]).to match(/a.log/) + expect(e1_message).to eq("line1.1-of-a#{FILE_DELIMITER} line1.2-of-a#{FILE_DELIMITER} line1.3-of-a") end end end - after do - FileUtils.rm_rf(tmpdir_path) - end + context "when #run is called multiple times", :unix => true do + let(:file_path) { "#{tmpdir_path}/a.log" } + let(:buffer) { [] } + let(:lsof) { [] } + let(:run_thread_proc) do + lambda { Thread.new { subject.run(buffer) } } + end + let(:lsof_proc) do + lambda { `lsof -p #{Process.pid} | grep #{file_path}` } + end - let(:event_count) { 2 } + subject { described_class.new(conf) } - it "collects separate multiple line events from each file" do - writer_proc.call + before do + conf.update( + "path" => tmpdir_path + "/*.log", + "start_position" => "beginning", + "sincedb_path" => sincedb_path) - events = input(conf) do |pipeline, queue| - queue.size.times.collect { queue.pop } + File.open(file_path, "w") do |fd| + fd.puts('foo') + fd.puts('bar') + fd.fsync + end end - expect(events.size).to eq(event_count) - - e1_message = events[0]["message"] - e2_message = events[1]["message"] - - # can't assume File A will be read first - if e1_message.start_with?('line1.1-of-z') - expect(e1_message).to eq("line1.1-of-z#{FILE_DELIMITER} line1.2-of-z#{FILE_DELIMITER} line1.3-of-z") - expect(e2_message).to eq("line1.1-of-a#{FILE_DELIMITER} line1.2-of-a#{FILE_DELIMITER} line1.3-of-a") - else - expect(e1_message).to eq("line1.1-of-a#{FILE_DELIMITER} line1.2-of-a#{FILE_DELIMITER} line1.3-of-a") - expect(e2_message).to eq("line1.1-of-z#{FILE_DELIMITER} line1.2-of-z#{FILE_DELIMITER} line1.3-of-z") + it "should only have one set of files open" do + subject.register + expect(lsof_proc.call).to eq("") + run_thread_proc.call + sleep 0.1 + first_lsof = lsof_proc.call + expect(first_lsof).not_to eq("") + run_thread_proc.call + sleep 0.1 + second_lsof = lsof_proc.call + expect(second_lsof).to eq(first_lsof) end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..322678f --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,79 @@ +# encoding: utf-8 + +require "logstash/devutils/rspec/spec_helper" + +class TracerBase + def initialize() @tracer = []; end + + def trace_for(symbol) + params = @tracer.map {|k,v| k == symbol ? v : nil}.compact + params.empty? ? false : params + end + + def clear() + @tracer.clear() + end +end + +class FileLogTracer < TracerBase + def warn(*args) @tracer.push [:warn, args]; end + def error(*args) @tracer.push [:error, args]; end + def debug(*args) @tracer.push [:debug, args]; end + def info(*args) @tracer.push [:info, args]; end + + def info?() true; end + def debug?() true; end + def warn?() true; end + def error?() true; end +end + +class ComponentTracer < TracerBase + def accept(*args) @tracer.push [:accept, args]; end + def deliver(*args) @tracer.push [:deliver, args]; end +end + +class CodecTracer < TracerBase + def decode_accept(ctx, data, listener) + @tracer.push [:decode_accept, [ctx, data]] + listener.process(ctx, {"message" => data}) + end + def accept(listener) + @tracer.push [:accept, true] + end + def auto_flush() + @tracer.push [:auto_flush, true] + end + def close + @tracer.push [:close, true] + end + def clone + self.class.new + end +end + +module Kernel + def pause_until(nap = 5, &block) + sq = SizedQueue.new(1) + th1 = Thread.new(sq) {|q| sleep nap; q.push(false) } + th2 = Thread.new(sq) do |q| + success = false + iters = nap * 5 + 1 + iters.times do + break if !!(success = block.call) + sleep(0.2) + end + q.push(success) + end + sq.pop + end +end + +RSpec::Matchers.define(:receive_call_and_args) do |m, args| + match do |actual| + actual.trace_for(m) == args + end + + failure_message do + "Expecting method #{m} to receive: #{args} but got: #{actual.trace_for(m)}" + end +end