Skip to content

Commit

Permalink
Introduce close_older and ignore_older config options
Browse files Browse the repository at this point in the history
begin to add ignore_after config option
depends on filewatch 0.6.8

add timed_out method to TailListener and specs

revert Gemfile changes for PR

split and rename ignore_after option, update gem deps to published ver.

bump ver, make suggested review changes and improve spec reliability

update changelog

Fixes logstash-plugins#81, Fixes logstash-plugins#89, Fixes logstash-plugins#90

Fixes logstash-plugins#87
  • Loading branch information
guyboertje authored and Guy Boertje committed Dec 29, 2015
1 parent 4ebc77b commit 61142ab
Show file tree
Hide file tree
Showing 6 changed files with 505 additions and 264 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 2.1.0
- Implement new config options: ignore_older and close_older. When close_older is set, any buffered data will be flushed.
- Fixes [#81](https://github.com/logstash-plugins/logstash-input-file/issues/81)
- Fixes [#81](https://github.com/logstash-plugins/logstash-input-file/issues/89)
- Fixes [#81](https://github.com/logstash-plugins/logstash-input-file/issues/90)

## 2.0.3
- Implement Stream Identity mapping of codecs: distinct codecs will collect input per stream identity (filename)

Expand Down
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
source 'https://rubygems.org'
gemspec
gemspec
112 changes: 90 additions & 22 deletions lib/logstash/inputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@
# to the rotation and its reopening under the new name (an interval
# determined by the `stat_interval` and `discover_interval` options)
# will not get picked up.

class LogStash::Codecs::Base
# TODO - move this to core
if !method_defined?(:accept)
def accept(listener)
decode(listener.data) do |event|
listener.process_event(event)
end
end
end
if !method_defined?(:auto_flush)
def auto_flush
end
end
end

class LogStash::Inputs::File < LogStash::Inputs::Base
config_name "file"

Expand Down Expand Up @@ -119,6 +135,17 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
# set the new line delimiter, defaults to "\n"
config :delimiter, :validate => :string, :default => "\n"

# If this option is specified, when the file input discovers a file that
# was last modified before the specified timespan in seconds, the file is
# ignored. After it's discovery, if an ignored file is modified it is no
# longer ignored and any new data is read. The default is 24 hours.
config :ignore_older, :validate => :number, :default => 24 * 60 * 60

# If this option is specified, the file input closes any files that remain
# unmodified for longer than the specified timespan in seconds.
# The default is 1 hour
config :close_older, :validate => :number, :default => 1 * 60 * 60

public
def register
require "addressable/uri"
Expand All @@ -133,7 +160,8 @@ def register
:discover_interval => @discover_interval,
:sincedb_write_interval => @sincedb_write_interval,
:delimiter => @delimiter,
:logger => @logger,
:ignore_older => @ignore_older,
:close_older => @close_older
}

@path.each do |path|
Expand Down Expand Up @@ -184,44 +212,84 @@ def register
@codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
end # def register

class ListenerTail
# use attr_reader to define noop methods
attr_reader :input, :path, :data
attr_reader :deleted, :created, :error, :eof

# construct with upstream state
def initialize(path, input)
@path, @input = path, input
end

def timed_out
input.codec.evict(path)
end

def accept(data)
# and push transient data filled dup listener downstream
input.log_line_received(path, data)
input.codec.accept(dup_adding_state(data))
end

def process_event(event)
event["[@metadata][path]"] = path
event["path"] = path if !event.include?("path")
input.post_process_this(event)
end

def add_state(data)
@data = data
self
end

private

# duplicate and add state for downstream
def dup_adding_state(line)
self.class.new(path, input).add_state(line)
end
end

def listener_for(path)
# path is the identity
ListenerTail.new(path, self)
end

def begin_tailing
stop # if the pipeline restarts this input.
@tail = FileWatch::Tail.new(@tail_config)
# if the pipeline restarts this input,
# make sure previous files are closed
stop
# use observer listener api
@tail = FileWatch::Tail.new_observing(@tail_config)
@tail.logger = @logger
@path.each { |path| @tail.tail(path) }
end

def run(queue)
begin_tailing
@tail.subscribe do |path, line|
log_line_received(path, line)
@codec.decode(line, path) do |event|
# path is the identity
# Note: this block is cached in the
# identity_map_codec for use when
# buffered lines are flushed.
queue << add_path_meta(event, path)
end
end
@queue = queue
@tail.subscribe(self)
end # def run

def post_process_this(event)
event["host"] = @host if !event.include?("host")
decorate(event)
@queue << event
end

def log_line_received(path, line)
return if !@logger.debug?
@logger.debug("Received line", :path => path, :text => line)
end

def add_path_meta(event, path)
event["[@metadata][path]"] = path
event["host"] = @host if !event.include?("host")
event["path"] = path if !event.include?("path")
decorate(event)
event
end

def stop
# in filewatch >= 0.6.7, quit will closes and forget all files
# but it will write their last read positions to since_db
# beforehand
@tail.quit if @tail
if @tail
@codec.close
@tail.quit
end
end
end # class LogStash::Inputs::File
8 changes: 4 additions & 4 deletions logstash-input-file.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-file'
s.version = '2.0.3'
s.version = '2.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Stream events from files."
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -20,12 +20,12 @@ Gem::Specification.new do |s|
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "input" }

# Gem dependencies
s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0"
s.add_runtime_dependency "logstash-core", ">= 2.0.0", "< 3.0.0.alpha0"

s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'addressable'
s.add_runtime_dependency 'filewatch', ['>= 0.6.7', '~> 0.6']
s.add_runtime_dependency 'logstash-codec-multiline', ['~> 2.0.3']
s.add_runtime_dependency 'filewatch', ['>= 0.7.0', '~> 0.7']
s.add_runtime_dependency 'logstash-codec-multiline', ['~> 2.0.5']

s.add_development_dependency 'stud', ['~> 0.0.19']
s.add_development_dependency 'logstash-devutils'
Expand Down
Loading

0 comments on commit 61142ab

Please sign in to comment.