Skip to content

Commit c4f93df

Browse files
committed
Merge pull request logstash-plugins#21 from colinsurprenant/fix/specs_hang
refactor input specs and shutdown sequence
2 parents 1aabfeb + 0345249 commit c4f93df

File tree

3 files changed

+56
-57
lines changed

3 files changed

+56
-57
lines changed

lib/logstash/inputs/file.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,10 @@ def run(queue)
147147

148148
public
149149
def teardown
150-
@tail.sincedb_write
151-
@tail.quit
150+
if @tail
151+
@tail.sincedb_write
152+
@tail.quit
153+
@tail = nil
154+
end
152155
end # def teardown
153156
end # class LogStash::Inputs::File

logstash-input-file.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-file'
4-
s.version = '0.1.6'
4+
s.version = '0.1.7'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Stream events from files."
77
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"

spec/inputs/file_spec.rb

Lines changed: 50 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88

99
delimiter = (LogStash::Environment.windows? ? "\r\n" : "\n")
1010

11-
describe "starts at the end of an existing file" do
11+
it "should starts at the end of an existing file" do
1212
tmpfile_path = Stud::Temporary.pathname
1313
sincedb_path = Stud::Temporary.pathname
1414

15-
config <<-CONFIG
15+
conf = <<-CONFIG
1616
input {
1717
file {
1818
type => "blah"
@@ -23,48 +23,49 @@
2323
}
2424
CONFIG
2525

26-
input do |pipeline, queue|
27-
File.open(tmpfile_path, "w") do |fd|
28-
fd.puts("ignore me 1")
29-
fd.puts("ignore me 2")
30-
end
26+
File.open(tmpfile_path, "w") do |fd|
27+
fd.puts("ignore me 1")
28+
fd.puts("ignore me 2")
29+
end
3130

32-
Thread.new { pipeline.run }
33-
sleep 0.1 while !pipeline.ready?
31+
events = input(conf) do |pipeline, queue|
3432

35-
# at this point even if pipeline.ready? == true the plugins
33+
# at this point the plugins
3634
# threads might still be initializing so we cannot know when the
3735
# file plugin will have seen the original file, it could see it
3836
# after the first(s) hello world appends below, hence the
3937
# retry logic.
4038

41-
retries = 0
42-
loop do
43-
insist { retries } < 20 # 2 secs should be plenty?
39+
events = []
4440

41+
retries = 0
42+
while retries < 20
4543
File.open(tmpfile_path, "a") do |fd|
4644
fd.puts("hello")
4745
fd.puts("world")
4846
end
4947

5048
if queue.size >= 2
5149
events = 2.times.collect { queue.pop }
52-
insist { events[0]["message"] } == "hello"
53-
insist { events[1]["message"] } == "world"
5450
break
5551
end
5652

5753
sleep(0.1)
5854
retries += 1
5955
end
56+
57+
events
6058
end
59+
60+
insist { events[0]["message"] } == "hello"
61+
insist { events[1]["message"] } == "world"
6162
end
6263

63-
describe "can start at the beginning of an existing file" do
64+
it "should start at the beginning of an existing file" do
6465
tmpfile_path = Stud::Temporary.pathname
6566
sincedb_path = Stud::Temporary.pathname
6667

67-
config <<-CONFIG
68+
conf = <<-CONFIG
6869
input {
6970
file {
7071
type => "blah"
@@ -76,64 +77,59 @@
7677
}
7778
CONFIG
7879

79-
input do |pipeline, queue|
80-
File.open(tmpfile_path, "a") do |fd|
81-
fd.puts("hello")
82-
fd.puts("world")
83-
end
84-
85-
Thread.new { pipeline.run }
86-
sleep 0.1 while !pipeline.ready?
80+
File.open(tmpfile_path, "a") do |fd|
81+
fd.puts("hello")
82+
fd.puts("world")
83+
end
8784

88-
events = 2.times.collect { queue.pop }
89-
insist { events[0]["message"] } == "hello"
90-
insist { events[1]["message"] } == "world"
85+
events = input(conf) do |pipeline, queue|
86+
2.times.collect { queue.pop }
9187
end
88+
89+
insist { events[0]["message"] } == "hello"
90+
insist { events[1]["message"] } == "world"
9291
end
9392

94-
describe "restarts at the sincedb value" do
93+
it "should restarts at the sincedb value" do
9594
tmpfile_path = Stud::Temporary.pathname
9695
sincedb_path = Stud::Temporary.pathname
9796

98-
config <<-CONFIG
97+
conf = <<-CONFIG
9998
input {
10099
file {
101100
type => "blah"
102101
path => "#{tmpfile_path}"
103-
start_position => "beginning"
102+
start_position => "beginning"
104103
sincedb_path => "#{sincedb_path}"
105104
delimiter => "#{delimiter}"
106105
}
107106
}
108107
CONFIG
109108

110-
input do |pipeline, queue|
111-
File.open(tmpfile_path, "w") do |fd|
112-
fd.puts("hello")
113-
fd.puts("world")
114-
end
115-
116-
t = Thread.new { pipeline.run }
117-
sleep 0.1 while !pipeline.ready?
118-
119-
events = 2.times.collect { queue.pop }
120-
pipeline.shutdown
121-
t.join
109+
File.open(tmpfile_path, "w") do |fd|
110+
fd.puts("hello3")
111+
fd.puts("world3")
112+
end
122113

123-
File.open(tmpfile_path, "a") do |fd|
124-
fd.puts("foo")
125-
fd.puts("bar")
126-
fd.puts("baz")
127-
end
114+
events = input(conf) do |pipeline, queue|
115+
2.times.collect { queue.pop }
116+
end
128117

129-
Thread.new { pipeline.run }
130-
sleep 0.1 while !pipeline.ready?
118+
insist { events[0]["message"] } == "hello3"
119+
insist { events[1]["message"] } == "world3"
131120

132-
events = 3.times.collect { queue.pop }
121+
File.open(tmpfile_path, "a") do |fd|
122+
fd.puts("foo")
123+
fd.puts("bar")
124+
fd.puts("baz")
125+
end
133126

134-
insist { events[0]["message"] } == "foo"
135-
insist { events[1]["message"] } == "bar"
136-
insist { events[2]["message"] } == "baz"
127+
events = input(conf) do |pipeline, queue|
128+
3.times.collect { queue.pop }
137129
end
130+
131+
insist { events[0]["message"] } == "foo"
132+
insist { events[1]["message"] } == "bar"
133+
insist { events[2]["message"] } == "baz"
138134
end
139135
end

0 commit comments

Comments
 (0)