From 63eccf963abe47ec61db06935705b43259c372cc Mon Sep 17 00:00:00 2001 From: guyboertje Date: Wed, 11 Nov 2015 17:40:50 +0000 Subject: [PATCH] multiline using identity: spec and code changes as requested in review. fix gemspec try to fix specs for Jenkins better attempt to fix specs for Jenkins update changelog, contribs and version Closes #44 --- CHANGELOG.md | 12 +++++- CONTRIBUTORS | 1 + lib/logstash/inputs/file.rb | 35 +++++++++++------ logstash-input-file.gemspec | 3 +- spec/inputs/file_spec.rb | 77 ++++++++++++++++++++++++++++++++++--- 5 files changed, 109 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2fea2fd..0c69f52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ +## 2.0.3 + - Implement Stream Identity mapping of codecs: distinct codecs will collect input per stream identity (filename) + +## 2.0.2 + - Change LS core dependency version + - Add CI badge + +## 2.0.1 + - Change LS core dependency version + ## 2.0.0 - - Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully, + - Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully, instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895 - Dependency on logstash-core update to 2.0 diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 5624d08..ea05590 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -17,6 +17,7 @@ Contributors: * Tejay Cardon (tejaycar) * elliot moore (em295) * yjpa7145 +* Guy Boertje (guyboertje) Note: If you've sent us patches, bug reports, or otherwise contributed to Logstash, and you aren't on the list above and want to be, please let us know diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index 5f0948e..24eff49 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -1,6 +1,7 @@ # encoding: utf-8 -require "logstash/inputs/base" require "logstash/namespace" +require "logstash/inputs/base" +require "logstash/codecs/identity_map_codec" require "pathname" require "socket" # for Socket.gethostname @@ -181,27 +182,39 @@ def register if @start_position == "beginning" @tail_config[:start_new_files_at] = :beginning end + + @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) end # def register - public def run(queue) @tail = FileWatch::Tail.new(@tail_config) @tail.logger = @logger @path.each { |path| @tail.tail(path) } - @tail.subscribe do |path, line| - @logger.debug? && @logger.debug("Received line", :path => path, :text => line) - @codec.decode(line) do |event| - event["[@metadata][path]"] = path - event["host"] = @host if !event.include?("host") - event["path"] = path if !event.include?("path") - decorate(event) - queue << event + log_line_received(path, line) + @codec.decode(line, path) do |event| + # path is the identity + # Note: this block is cached in the + # identity_map_codec for use when + # buffered lines are flushed. + queue << add_path_meta(event, path) end end end # def run - public + def log_line_received(path, line) + return if !@logger.debug? + @logger.debug("Received line", :path => path, :text => line) + end + + def add_path_meta(event, path) + event["[@metadata][path]"] = path + event["host"] = @host if !event.include?("host") + event["path"] = path if !event.include?("path") + decorate(event) + event + end + def stop @tail.quit if @tail # _sincedb_write is called implicitly end diff --git a/logstash-input-file.gemspec b/logstash-input-file.gemspec index ccf3f52..239cc9a 100644 --- a/logstash-input-file.gemspec +++ b/logstash-input-file.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-file' - s.version = '2.0.2' + s.version = '2.0.3' s.licenses = ['Apache License (2.0)'] s.summary = "Stream events from files." s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program" @@ -25,6 +25,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'logstash-codec-plain' s.add_runtime_dependency 'addressable' s.add_runtime_dependency 'filewatch', ['>= 0.6.5', '~> 0.6'] + s.add_runtime_dependency 'logstash-codec-multiline', ['~> 2.0.3'] s.add_development_dependency 'stud', ['~> 0.0.19'] s.add_development_dependency 'logstash-devutils' diff --git a/spec/inputs/file_spec.rb b/spec/inputs/file_spec.rb index 186f9b7..3124e00 100644 --- a/spec/inputs/file_spec.rb +++ b/spec/inputs/file_spec.rb @@ -5,9 +5,9 @@ require "stud/temporary" require "logstash/inputs/file" -describe LogStash::Inputs::File do +FILE_DELIMITER = LogStash::Environment.windows? ? "\r\n" : "\n" - delimiter = (LogStash::Environment.windows? ? "\r\n" : "\n") +describe LogStash::Inputs::File do it_behaves_like "an interruptible input plugin" do let(:config) do @@ -28,7 +28,7 @@ type => "blah" path => "#{tmpfile_path}" sincedb_path => "#{sincedb_path}" - delimiter => "#{delimiter}" + delimiter => "#{FILE_DELIMITER}" } } CONFIG @@ -82,7 +82,7 @@ path => "#{tmpfile_path}" start_position => "beginning" sincedb_path => "#{sincedb_path}" - delimiter => "#{delimiter}" + delimiter => "#{FILE_DELIMITER}" } } CONFIG @@ -111,7 +111,7 @@ path => "#{tmpfile_path}" start_position => "beginning" sincedb_path => "#{sincedb_path}" - delimiter => "#{delimiter}" + delimiter => "#{FILE_DELIMITER}" } } CONFIG @@ -154,7 +154,7 @@ path => "#{tmpfile_path}" start_position => "beginning" sincedb_path => "#{sincedb_path}" - delimiter => "#{delimiter}" + delimiter => "#{FILE_DELIMITER}" codec => "json" } } @@ -189,4 +189,69 @@ expect { subject.register }.to raise_error(ArgumentError) end end + + context "when wildcard path and a multiline codec is specified" do + let(:tmpdir_path) { Stud::Temporary.directory } + let(:sincedb_path) { Stud::Temporary.pathname } + let(:conf) do + <<-CONFIG + input { + file { + type => "blah" + path => "#{tmpdir_path}/*.log" + start_position => "beginning" + sincedb_path => "#{sincedb_path}" + delimiter => "#{FILE_DELIMITER}" + codec => multiline { pattern => "^\s" what => previous } + } + } + CONFIG + end + + let(:writer_proc) do + -> do + File.open("#{tmpdir_path}/a.log", "a") do |fd| + fd.puts("line1.1-of-a") + fd.puts(" line1.2-of-a") + fd.puts(" line1.3-of-a") + fd.puts("line2.1-of-a") + end + + File.open("#{tmpdir_path}/z.log", "a") do |fd| + fd.puts("line1.1-of-z") + fd.puts(" line1.2-of-z") + fd.puts(" line1.3-of-z") + fd.puts("line2.1-of-z") + end + end + end + + after do + FileUtils.rm_rf(tmpdir_path) + end + + let(:event_count) { 2 } + + it "collects separate multiple line events from each file" do + writer_proc.call + + events = input(conf) do |pipeline, queue| + queue.size.times.collect { queue.pop } + end + + expect(events.size).to eq(event_count) + + e1_message = events[0]["message"] + e2_message = events[1]["message"] + + # can't assume File A will be read first + if e1_message.start_with?('line1.1-of-z') + expect(e1_message).to eq("line1.1-of-z#{FILE_DELIMITER} line1.2-of-z#{FILE_DELIMITER} line1.3-of-z") + expect(e2_message).to eq("line1.1-of-a#{FILE_DELIMITER} line1.2-of-a#{FILE_DELIMITER} line1.3-of-a") + else + expect(e1_message).to eq("line1.1-of-a#{FILE_DELIMITER} line1.2-of-a#{FILE_DELIMITER} line1.3-of-a") + expect(e2_message).to eq("line1.1-of-z#{FILE_DELIMITER} line1.2-of-z#{FILE_DELIMITER} line1.3-of-z") + end + end + end end