Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
make explicit exit flushing, not to rely on close() side effects.

fix failing specs NOTE auto_flush takes an argument

More detail in the changelog

change log change and remove rescue nil

closes logstash-plugins#101
closes logstash-plugins#104

Fixes logstash-plugins#112
  • Loading branch information
Guy Boertje committed Mar 17, 2016
1 parent e2d37fb commit b111327
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 15 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 2.2.2
- Fix for: Filewatch library complains if HOME or SINCEDB_PATH variables are unset.
- [Issue #101](https://github.com/logstash-plugins/logstash-input-file/issues/101)
- [PR, filewatch 78](https://github.com/jordansissel/ruby-filewatch/pull/78) introduces the fix
- [Issue, filewatch 76](https://github.com/jordansissel/ruby-filewatch/issues/76)
- Improve documentation on ignore_older and close_older options [#104](https://github.com/logstash-plugins/logstash-input-file/issues/104) Documentation

## 2.2.1
- Fix spec failures on CI Linux builds (not seen on local OSX and Linux)

Expand Down
35 changes: 29 additions & 6 deletions lib/logstash/inputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def accept(listener)
end
end
if !method_defined?(:auto_flush)
def auto_flush
def auto_flush(*)
end
end
end
Expand Down Expand Up @@ -139,14 +139,14 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
# set the new line delimiter, defaults to "\n"
config :delimiter, :validate => :string, :default => "\n"

# If this option is specified, when the file input discovers a file that
# was last modified before the specified timespan in seconds, the file is
# ignored. After it's discovery, if an ignored file is modified it is no
# When the file input discovers a file that was last modified
# before the specified timespan in seconds, the file is ignored.
# After it's discovery, if an ignored file is modified it is no
# longer ignored and any new data is read. The default is 24 hours.
config :ignore_older, :validate => :number, :default => 24 * 60 * 60

# If this option is specified, the file input closes any files that were last
# read the specified timespan in seconds ago.
# The file input closes any files that were last read the specified
# timespan in seconds ago.
# This has different implications depending on if a file is being tailed or
# read. If tailing, and there is a large time gap in incoming data the file
# can be closed (allowing other files to be opened) but will be queued for
Expand Down Expand Up @@ -269,6 +269,10 @@ def dup_adding_state(line)
end
end

class FlushableListener < ListenerTail
attr_writer :path
end

def listener_for(path)
# path is the identity
ListenerTail.new(path, self)
Expand All @@ -288,6 +292,7 @@ def run(queue)
begin_tailing
@queue = queue
@tail.subscribe(self)
exit_flush
end # def run

def post_process_this(event)
Expand All @@ -310,4 +315,22 @@ def stop
@tail.quit
end
end

private

def exit_flush
listener = FlushableListener.new("none", self)
if @codec.identity_count.zero?
# using the base codec without identity/path info
@codec.base_codec.flush do |event|
begin
listener.process_event(event)
rescue => e
@logger.error("File Input: flush on exit downstream error", :exception => e)
end
end
else
@codec.flush_mapped(listener)
end
end
end # class LogStash::Inputs::File
4 changes: 2 additions & 2 deletions logstash-input-file.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-file'
s.version = '2.2.1'
s.version = '2.2.2'
s.licenses = ['Apache License (2.0)']
s.summary = "Stream events from files."
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -24,7 +24,7 @@ Gem::Specification.new do |s|

s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'addressable'
s.add_runtime_dependency 'filewatch', ['>= 0.8.0', '~> 0.8']
s.add_runtime_dependency 'filewatch', ['>= 0.8.1', '~> 0.8']
s.add_runtime_dependency 'logstash-codec-multiline', ['~> 2.0.7']

s.add_development_dependency 'stud', ['~> 0.0.19']
Expand Down
8 changes: 2 additions & 6 deletions spec/inputs/file_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,11 @@
expect(events.size).to eq(0)
File.open(tmpfile_path, "a") { |fd| fd.puts("hello"); fd.puts("world") }
end
.then_after(0.1, "only one event is created, the last line is buffered") do
expect(events.size).to eq(1)
end
.then_after(0.1, "quit") do
.then_after(0.25, "quit") do
subject.stop
end

subject.run(events)
# stop flushes the second event
expect(events.size).to eq(2)

event1 = events[0]
expect(event1).not_to be_nil
Expand Down
5 changes: 4 additions & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ def decode_accept(ctx, data, listener)
def accept(listener)
@tracer.push [:accept, true]
end
def auto_flush()
def auto_flush(*)
@tracer.push [:auto_flush, true]
end
def flush(*)
@tracer.push [:flush, true]
end
def close
@tracer.push [:close, true]
end
Expand Down

0 comments on commit b111327

Please sign in to comment.