Skip to content

Commit

Permalink
DEPENDS ON filewatch v0.6.7 allow run to be called multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
guyboertje authored and jordansissel committed Dec 3, 2015
1 parent 63eccf9 commit 011f80f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 2 deletions.
7 changes: 6 additions & 1 deletion lib/logstash/inputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,15 @@ def register
@codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
end # def register

def run(queue)
def begin_tailing
stop # if the pipeline restarts this input.
@tail = FileWatch::Tail.new(@tail_config)
@tail.logger = @logger
@path.each { |path| @tail.tail(path) }
end

def run(queue)
begin_tailing
@tail.subscribe do |path, line|
log_line_received(path, line)
@codec.decode(line, path) do |event|
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-file.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Gem::Specification.new do |s|

s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'addressable'
s.add_runtime_dependency 'filewatch', ['>= 0.6.5', '~> 0.6']
s.add_runtime_dependency 'filewatch', ['>= 0.6.7', '~> 0.6']
s.add_runtime_dependency 'logstash-codec-multiline', ['~> 2.0.3']

s.add_development_dependency 'stud', ['~> 0.0.19']
Expand Down
43 changes: 43 additions & 0 deletions spec/inputs/file_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
require "stud/temporary"
require "logstash/inputs/file"

Thread.abort_on_exception = true

FILE_DELIMITER = LogStash::Environment.windows? ? "\r\n" : "\n"

describe LogStash::Inputs::File do
Expand Down Expand Up @@ -190,6 +192,47 @@
end
end

context "when #run is called multiple times" do
let(:tmpdir_path) { Stud::Temporary.directory }
let(:sincedb_path) { Stud::Temporary.pathname }
let(:file_path) { "#{tmpdir_path}/a.log" }
let(:buffer) { [] }
let(:lsof) { [] }
let(:stop_proc) do
lambda do |input, arr|
Thread.new(input, arr) do |i, a|
sleep 0.5
a << `lsof -p #{Process.pid} | grep "a.log"`
i.stop
end
end
end

subject { LogStash::Inputs::File.new("path" => tmpdir_path + "/*.log", "start_position" => "beginning", "sincedb_path" => sincedb_path) }

after :each do
FileUtils.rm_rf(tmpdir_path)
FileUtils.rm_rf(sincedb_path)
end
before do
File.open(file_path, "w") do |fd|
fd.puts('foo')
fd.puts('bar')
end
end
it "should only have one set of files open" do
subject.register
lsof_before = `lsof -p #{Process.pid} | grep #{file_path}`
expect(lsof_before).to eq("")
stop_proc.call(subject, lsof)
subject.run(buffer)
expect(lsof.first).not_to eq("")
stop_proc.call(subject, lsof)
subject.run(buffer)
expect(lsof.last).to eq(lsof.first)
end
end

context "when wildcard path and a multiline codec is specified" do
let(:tmpdir_path) { Stud::Temporary.directory }
let(:sincedb_path) { Stud::Temporary.pathname }
Expand Down

0 comments on commit 011f80f

Please sign in to comment.