Skip to content

Commit

Permalink
Remove a poc component that crept in, use filewatch 0.8.0 and ML code…
Browse files Browse the repository at this point in the history
…c 2.0.7 Describe close_older option better and add max_open_files.

fix spec to work on linux too

damn, spec should work on linux now

closes logstash-plugins#97

Fixes logstash-plugins#98
  • Loading branch information
Guy Boertje committed Jan 29, 2016
1 parent e5ed49f commit 60a6268
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 66 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 2.2.0
- Use ruby-filewatch 0.8.0, major rework of filewatch. See [Pull Request 74](https://github.com/jordansissel/ruby-filewatch/pull/74)
- add max_open_files config option, defaults to 4095, the input will process much more than this but have this number of files open at any time - files are closed based on the close_older setting, thereby making others openable.
- Changes the close_older logic to measure the time since the file was last read internlly rather than using the file stat modified time.
- Use logstash-codec-multiline 2.0.7, fixes a bug with auto_flush deadlocking when multiple file inputs are defined in the LS config.

## 2.1.3
- Use ruby-filewatch 0.7.1, re-enable close after file is modified again

Expand Down
20 changes: 17 additions & 3 deletions lib/logstash/inputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,24 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
# longer ignored and any new data is read. The default is 24 hours.
config :ignore_older, :validate => :number, :default => 24 * 60 * 60

# If this option is specified, the file input closes any files that remain
# unmodified for longer than the specified timespan in seconds.
# If this option is specified, the file input closes any files that were last
# read the specified timespan in seconds ago.
# This has different implications depending on if a file is being tailed or
# read. If tailing, and there is a large time gap in incoming data the file
# can be closed (allowing other files to be opened) but will be queued for
# reopening when new data is detected. If reading, the file will be closed
# after closed_older seconds from when the last bytes were read.
# The default is 1 hour
config :close_older, :validate => :number, :default => 1 * 60 * 60

# What is the maximum number of file_handles that this input consumes
# at any one time. Use close_older to close some files if you need to
# process more files than this number. This should not be set to the
# maximum the OS can do because file handles are needed for other
# LS plugins and OS processes.
# The default of 4095 is set in filewatch.
config :max_open_files, :validate => :number

public
def register
require "addressable/uri"
Expand All @@ -165,7 +178,8 @@ def register
:sincedb_write_interval => @sincedb_write_interval,
:delimiter => @delimiter,
:ignore_older => @ignore_older,
:close_older => @close_older
:close_older => @close_older,
:max_open_files => @max_open_files
}

@path.each do |path|
Expand Down
56 changes: 0 additions & 56 deletions lib/logstash/inputs/identity_map_codec_component.rb

This file was deleted.

6 changes: 3 additions & 3 deletions logstash-input-file.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-file'
s.version = '2.1.3'
s.version = '2.2.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Stream events from files."
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -24,8 +24,8 @@ Gem::Specification.new do |s|

s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'addressable'
s.add_runtime_dependency 'filewatch', ['>= 0.7.1', '~> 0.7']
s.add_runtime_dependency 'logstash-codec-multiline', ['~> 2.0.5']
s.add_runtime_dependency 'filewatch', ['>= 0.8.0', '~> 0.8']
s.add_runtime_dependency 'logstash-codec-multiline', ['~> 2.0.7']

s.add_development_dependency 'stud', ['~> 0.0.19']
s.add_development_dependency 'logstash-devutils'
Expand Down
97 changes: 93 additions & 4 deletions spec/inputs/file_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@
let(:sincedb_path) { Stud::Temporary.pathname }
let(:tmpdir_path) { Stud::Temporary.directory }

after :each do
FileUtils.rm_rf(sincedb_path)
end

context "when data exists and then more data is appended" do
subject { described_class.new(conf) }

Expand Down Expand Up @@ -369,6 +373,7 @@
conf.update(
"path" => tmpdir_path + "/*.log",
"start_position" => "beginning",
"stat_interval" => 0.1,
"sincedb_path" => sincedb_path)

File.open(file_path, "w") do |fd|
Expand All @@ -382,14 +387,98 @@
subject.register
expect(lsof_proc.call).to eq("")
run_thread_proc.call
sleep 0.1
sleep 0.25
first_lsof = lsof_proc.call
expect(first_lsof).not_to eq("")
expect(first_lsof.scan(file_path).size).to eq(1)
run_thread_proc.call
sleep 0.1
sleep 0.25
second_lsof = lsof_proc.call
expect(second_lsof).to eq(first_lsof)
expect(second_lsof.scan(file_path).size).to eq(1)
end
end

describe "specifying max_open_files" do
subject { described_class.new(conf) }
before do
File.open("#{tmpdir_path}/a.log", "w") do |fd|
fd.puts("line1-of-a")
fd.puts("line2-of-a")
fd.fsync
end
File.open("#{tmpdir_path}/z.log", "w") do |fd|
fd.puts("line1-of-z")
fd.puts("line2-of-z")
fd.fsync
end
end

context "when close_older is NOT specified" do
before do
conf.clear
conf.update(
"type" => "blah",
"path" => "#{tmpdir_path}/*.log",
"sincedb_path" => sincedb_path,
"stat_interval" => 0.1,
"max_open_files" => 1,
"start_position" => "beginning",
"delimiter" => FILE_DELIMITER)
subject.register
Thread.new { subject.run(events) }
sleep 0.1
end
it "collects line events from only one file" do
# wait for one path to be mapped as identity
expect(pause_until{ subject.codec.identity_count == 1 }).to be_truthy
subject.stop
# stop flushes last event
expect(pause_until{ events.size == 2 }).to be_truthy

e1, e2 = events
if Dir.glob("#{tmpdir_path}/*.log").first =~ %r{a\.log}
#linux and OSX have different retrieval order
expect(e1["message"]).to eq("line1-of-a")
expect(e2["message"]).to eq("line2-of-a")
else
expect(e1["message"]).to eq("line1-of-z")
expect(e2["message"]).to eq("line2-of-z")
end
end
end

context "when close_older IS specified" do
before do
conf.update(
"type" => "blah",
"path" => "#{tmpdir_path}/*.log",
"sincedb_path" => sincedb_path,
"stat_interval" => 0.1,
"max_open_files" => 1,
"close_older" => 1,
"start_position" => "beginning",
"delimiter" => FILE_DELIMITER)
subject.register
Thread.new { subject.run(events) }
sleep 0.1
end

it "collects line events from both files" do
# close flushes last event of each identity
expect(pause_until{ events.size == 4 }).to be_truthy
subject.stop
if Dir.glob("#{tmpdir_path}/*.log").first =~ %r{a\.log}
#linux and OSX have different retrieval order
e1, e2, e3, e4 = events
else
e3, e4, e1, e2 = events
end
expect(e1["message"]).to eq("line1-of-a")
expect(e2["message"]).to eq("line2-of-a")
expect(e3["message"]).to eq("line1-of-z")
expect(e4["message"]).to eq("line2-of-z")
end
end

end
end
end

0 comments on commit 60a6268

Please sign in to comment.