diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index 44baf53..5881e84 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -147,7 +147,10 @@ def run(queue) public def teardown - @tail.sincedb_write - @tail.quit + if @tail + @tail.sincedb_write + @tail.quit + @tail = nil + end end # def teardown end # class LogStash::Inputs::File diff --git a/logstash-input-file.gemspec b/logstash-input-file.gemspec index cdb62a6..00d4c39 100644 --- a/logstash-input-file.gemspec +++ b/logstash-input-file.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-file' - s.version = '0.1.6' + s.version = '0.1.7' s.licenses = ['Apache License (2.0)'] s.summary = "Stream events from files." s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/inputs/file_spec.rb b/spec/inputs/file_spec.rb index 7c42dde..962d5d4 100644 --- a/spec/inputs/file_spec.rb +++ b/spec/inputs/file_spec.rb @@ -8,11 +8,11 @@ delimiter = (LogStash::Environment.windows? ? "\r\n" : "\n") - describe "starts at the end of an existing file" do + it "should starts at the end of an existing file" do tmpfile_path = Stud::Temporary.pathname sincedb_path = Stud::Temporary.pathname - config <<-CONFIG + conf = <<-CONFIG input { file { type => "blah" @@ -23,25 +23,23 @@ } CONFIG - input do |pipeline, queue| - File.open(tmpfile_path, "w") do |fd| - fd.puts("ignore me 1") - fd.puts("ignore me 2") - end + File.open(tmpfile_path, "w") do |fd| + fd.puts("ignore me 1") + fd.puts("ignore me 2") + end - Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? + events = input(conf) do |pipeline, queue| - # at this point even if pipeline.ready? == true the plugins + # at this point the plugins # threads might still be initializing so we cannot know when the # file plugin will have seen the original file, it could see it # after the first(s) hello world appends below, hence the # retry logic. - retries = 0 - loop do - insist { retries } < 20 # 2 secs should be plenty? + events = [] + retries = 0 + while retries < 20 File.open(tmpfile_path, "a") do |fd| fd.puts("hello") fd.puts("world") @@ -49,22 +47,25 @@ if queue.size >= 2 events = 2.times.collect { queue.pop } - insist { events[0]["message"] } == "hello" - insist { events[1]["message"] } == "world" break end sleep(0.1) retries += 1 end + + events end + + insist { events[0]["message"] } == "hello" + insist { events[1]["message"] } == "world" end - describe "can start at the beginning of an existing file" do + it "should start at the beginning of an existing file" do tmpfile_path = Stud::Temporary.pathname sincedb_path = Stud::Temporary.pathname - config <<-CONFIG + conf = <<-CONFIG input { file { type => "blah" @@ -76,64 +77,59 @@ } CONFIG - input do |pipeline, queue| - File.open(tmpfile_path, "a") do |fd| - fd.puts("hello") - fd.puts("world") - end - - Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? + File.open(tmpfile_path, "a") do |fd| + fd.puts("hello") + fd.puts("world") + end - events = 2.times.collect { queue.pop } - insist { events[0]["message"] } == "hello" - insist { events[1]["message"] } == "world" + events = input(conf) do |pipeline, queue| + 2.times.collect { queue.pop } end + + insist { events[0]["message"] } == "hello" + insist { events[1]["message"] } == "world" end - describe "restarts at the sincedb value" do + it "should restarts at the sincedb value" do tmpfile_path = Stud::Temporary.pathname sincedb_path = Stud::Temporary.pathname - config <<-CONFIG + conf = <<-CONFIG input { file { type => "blah" path => "#{tmpfile_path}" - start_position => "beginning" + start_position => "beginning" sincedb_path => "#{sincedb_path}" delimiter => "#{delimiter}" } } CONFIG - input do |pipeline, queue| - File.open(tmpfile_path, "w") do |fd| - fd.puts("hello") - fd.puts("world") - end - - t = Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? - - events = 2.times.collect { queue.pop } - pipeline.shutdown - t.join + File.open(tmpfile_path, "w") do |fd| + fd.puts("hello3") + fd.puts("world3") + end - File.open(tmpfile_path, "a") do |fd| - fd.puts("foo") - fd.puts("bar") - fd.puts("baz") - end + events = input(conf) do |pipeline, queue| + 2.times.collect { queue.pop } + end - Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? + insist { events[0]["message"] } == "hello3" + insist { events[1]["message"] } == "world3" - events = 3.times.collect { queue.pop } + File.open(tmpfile_path, "a") do |fd| + fd.puts("foo") + fd.puts("bar") + fd.puts("baz") + end - insist { events[0]["message"] } == "foo" - insist { events[1]["message"] } == "bar" - insist { events[2]["message"] } == "baz" + events = input(conf) do |pipeline, queue| + 3.times.collect { queue.pop } end + + insist { events[0]["message"] } == "foo" + insist { events[1]["message"] } == "bar" + insist { events[2]["message"] } == "baz" end end