Skip to content

Commit

Permalink
multiline using identity: spec and code
Browse files Browse the repository at this point in the history
changes as requested in review.

fix gemspec

try to fix specs for Jenkins

better attempt to fix specs for Jenkins

update changelog, contribs and version

Closes logstash-plugins#44
  • Loading branch information
guyboertje committed Nov 17, 2015
1 parent 0d49aad commit 63eccf9
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 19 deletions.
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
## 2.0.3
- Implement Stream Identity mapping of codecs: distinct codecs will collect input per stream identity (filename)

## 2.0.2
- Change LS core dependency version
- Add CI badge

## 2.0.1
- Change LS core dependency version

## 2.0.0
- Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully,
- Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully,
instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895
- Dependency on logstash-core update to 2.0

Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Contributors:
* Tejay Cardon (tejaycar)
* elliot moore (em295)
* yjpa7145
* Guy Boertje (guyboertje)

Note: If you've sent us patches, bug reports, or otherwise contributed to
Logstash, and you aren't on the list above and want to be, please let us know
Expand Down
35 changes: 24 additions & 11 deletions lib/logstash/inputs/file.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/inputs/base"
require "logstash/codecs/identity_map_codec"

require "pathname"
require "socket" # for Socket.gethostname
Expand Down Expand Up @@ -181,27 +182,39 @@ def register
if @start_position == "beginning"
@tail_config[:start_new_files_at] = :beginning
end

@codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
end # def register

public
def run(queue)
@tail = FileWatch::Tail.new(@tail_config)
@tail.logger = @logger
@path.each { |path| @tail.tail(path) }

@tail.subscribe do |path, line|
@logger.debug? && @logger.debug("Received line", :path => path, :text => line)
@codec.decode(line) do |event|
event["[@metadata][path]"] = path
event["host"] = @host if !event.include?("host")
event["path"] = path if !event.include?("path")
decorate(event)
queue << event
log_line_received(path, line)
@codec.decode(line, path) do |event|
# path is the identity
# Note: this block is cached in the
# identity_map_codec for use when
# buffered lines are flushed.
queue << add_path_meta(event, path)
end
end
end # def run

public
def log_line_received(path, line)
return if !@logger.debug?
@logger.debug("Received line", :path => path, :text => line)
end

def add_path_meta(event, path)
event["[@metadata][path]"] = path
event["host"] = @host if !event.include?("host")
event["path"] = path if !event.include?("path")
decorate(event)
event
end

def stop
@tail.quit if @tail # _sincedb_write is called implicitly
end
Expand Down
3 changes: 2 additions & 1 deletion logstash-input-file.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-file'
s.version = '2.0.2'
s.version = '2.0.3'
s.licenses = ['Apache License (2.0)']
s.summary = "Stream events from files."
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -25,6 +25,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'addressable'
s.add_runtime_dependency 'filewatch', ['>= 0.6.5', '~> 0.6']
s.add_runtime_dependency 'logstash-codec-multiline', ['~> 2.0.3']

s.add_development_dependency 'stud', ['~> 0.0.19']
s.add_development_dependency 'logstash-devutils'
Expand Down
77 changes: 71 additions & 6 deletions spec/inputs/file_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
require "stud/temporary"
require "logstash/inputs/file"

describe LogStash::Inputs::File do
FILE_DELIMITER = LogStash::Environment.windows? ? "\r\n" : "\n"

delimiter = (LogStash::Environment.windows? ? "\r\n" : "\n")
describe LogStash::Inputs::File do

it_behaves_like "an interruptible input plugin" do
let(:config) do
Expand All @@ -28,7 +28,7 @@
type => "blah"
path => "#{tmpfile_path}"
sincedb_path => "#{sincedb_path}"
delimiter => "#{delimiter}"
delimiter => "#{FILE_DELIMITER}"
}
}
CONFIG
Expand Down Expand Up @@ -82,7 +82,7 @@
path => "#{tmpfile_path}"
start_position => "beginning"
sincedb_path => "#{sincedb_path}"
delimiter => "#{delimiter}"
delimiter => "#{FILE_DELIMITER}"
}
}
CONFIG
Expand Down Expand Up @@ -111,7 +111,7 @@
path => "#{tmpfile_path}"
start_position => "beginning"
sincedb_path => "#{sincedb_path}"
delimiter => "#{delimiter}"
delimiter => "#{FILE_DELIMITER}"
}
}
CONFIG
Expand Down Expand Up @@ -154,7 +154,7 @@
path => "#{tmpfile_path}"
start_position => "beginning"
sincedb_path => "#{sincedb_path}"
delimiter => "#{delimiter}"
delimiter => "#{FILE_DELIMITER}"
codec => "json"
}
}
Expand Down Expand Up @@ -189,4 +189,69 @@
expect { subject.register }.to raise_error(ArgumentError)
end
end

context "when wildcard path and a multiline codec is specified" do
let(:tmpdir_path) { Stud::Temporary.directory }
let(:sincedb_path) { Stud::Temporary.pathname }
let(:conf) do
<<-CONFIG
input {
file {
type => "blah"
path => "#{tmpdir_path}/*.log"
start_position => "beginning"
sincedb_path => "#{sincedb_path}"
delimiter => "#{FILE_DELIMITER}"
codec => multiline { pattern => "^\s" what => previous }
}
}
CONFIG
end

let(:writer_proc) do
-> do
File.open("#{tmpdir_path}/a.log", "a") do |fd|
fd.puts("line1.1-of-a")
fd.puts(" line1.2-of-a")
fd.puts(" line1.3-of-a")
fd.puts("line2.1-of-a")
end

File.open("#{tmpdir_path}/z.log", "a") do |fd|
fd.puts("line1.1-of-z")
fd.puts(" line1.2-of-z")
fd.puts(" line1.3-of-z")
fd.puts("line2.1-of-z")
end
end
end

after do
FileUtils.rm_rf(tmpdir_path)
end

let(:event_count) { 2 }

it "collects separate multiple line events from each file" do
writer_proc.call

events = input(conf) do |pipeline, queue|
queue.size.times.collect { queue.pop }
end

expect(events.size).to eq(event_count)

e1_message = events[0]["message"]
e2_message = events[1]["message"]

# can't assume File A will be read first
if e1_message.start_with?('line1.1-of-z')
expect(e1_message).to eq("line1.1-of-z#{FILE_DELIMITER} line1.2-of-z#{FILE_DELIMITER} line1.3-of-z")
expect(e2_message).to eq("line1.1-of-a#{FILE_DELIMITER} line1.2-of-a#{FILE_DELIMITER} line1.3-of-a")
else
expect(e1_message).to eq("line1.1-of-a#{FILE_DELIMITER} line1.2-of-a#{FILE_DELIMITER} line1.3-of-a")
expect(e2_message).to eq("line1.1-of-z#{FILE_DELIMITER} line1.2-of-z#{FILE_DELIMITER} line1.3-of-z")
end
end
end
end

0 comments on commit 63eccf9

Please sign in to comment.