Skip to content

Commit

Permalink
This came from elasticsearch/logstash at cf2242170011fbf2d264ae876601…
Browse files Browse the repository at this point in the history
…92754697671a
  • Loading branch information
Richard Pijnenburg committed Oct 16, 2014
0 parents commit e50c4b6
Show file tree
Hide file tree
Showing 8 changed files with 503 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*.gem
Gemfile.lock
.bundle
vendor
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
source 'https://rubygems.org'
gem 'rake'
gem 'gem_publisher'
gem 'archive-tar-minitar'
6 changes: 6 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
@files=[]

task :default do
system("rake -T")
end

150 changes: 150 additions & 0 deletions lib/logstash/inputs/file.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"

require "pathname"
require "socket" # for Socket.gethostname

# Stream events from files.
#
# By default, each event is assumed to be one line. If you would like
# to join multiple log lines into one event, you'll want to use the
# multiline codec.
#
# Files are followed in a manner similar to "tail -0F". File rotation
# is detected and handled by this input.
class LogStash::Inputs::File < LogStash::Inputs::Base
config_name "file"
milestone 2

# TODO(sissel): This should switch to use the 'line' codec by default
# once file following
default :codec, "plain"

# The path(s) to the file(s) to use as an input.
# You can use globs here, such as `/var/log/*.log`
# Paths must be absolute and cannot be relative.
#
# You may also configure multiple paths. See an example
# on the [Logstash configuration page](configuration#array).
config :path, :validate => :array, :required => true

# Exclusions (matched against the filename, not full path). Globs
# are valid here, too. For example, if you have
#
# path => "/var/log/*"
#
# You might want to exclude gzipped files:
#
# exclude => "*.gz"
config :exclude, :validate => :array

# How often we stat files to see if they have been modified. Increasing
# this interval will decrease the number of system calls we make, but
# increase the time to detect new log lines.
config :stat_interval, :validate => :number, :default => 1

# How often we expand globs to discover new files to watch.
config :discover_interval, :validate => :number, :default => 15

# Where to write the sincedb database (keeps track of the current
# position of monitored log files). The default will write
# sincedb files to some path matching "$HOME/.sincedb*"
config :sincedb_path, :validate => :string

# How often (in seconds) to write a since database with the current position of
# monitored log files.
config :sincedb_write_interval, :validate => :number, :default => 15

# Choose where Logstash starts initially reading files: at the beginning or
# at the end. The default behavior treats files like live streams and thus
# starts at the end. If you have old data you want to import, set this
# to 'beginning'
#
# This option only modifies "first contact" situations where a file is new
# and not seen before. If a file has already been seen before, this option
# has no effect.
config :start_position, :validate => [ "beginning", "end"], :default => "end"

public
def register
require "addressable/uri"
require "filewatch/tail"
require "digest/md5"
@logger.info("Registering file input", :path => @path)

@tail_config = {
:exclude => @exclude,
:stat_interval => @stat_interval,
:discover_interval => @discover_interval,
:sincedb_write_interval => @sincedb_write_interval,
:logger => @logger,
}

@path.each do |path|
if Pathname.new(path).relative?
raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}")
end
end

if @sincedb_path.nil?
if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil?
@logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \
"to keep track of the files I'm watching. Either set " \
"HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \
"in your Logstash config for the file input with " \
"path '#{@path.inspect}'")
raise # TODO(sissel): HOW DO I FAIL PROPERLY YO
end

#pick SINCEDB_DIR if available, otherwise use HOME
sincedb_dir = ENV["SINCEDB_DIR"] || ENV["HOME"]

# Join by ',' to make it easy for folks to know their own sincedb
# generated path (vs, say, inspecting the @path array)
@sincedb_path = File.join(sincedb_dir, ".sincedb_" + Digest::MD5.hexdigest(@path.join(",")))

# Migrate any old .sincedb to the new file (this is for version <=1.1.1 compatibility)
old_sincedb = File.join(sincedb_dir, ".sincedb")
if File.exists?(old_sincedb)
@logger.info("Renaming old ~/.sincedb to new one", :old => old_sincedb,
:new => @sincedb_path)
File.rename(old_sincedb, @sincedb_path)
end

@logger.info("No sincedb_path set, generating one based on the file path",
:sincedb_path => @sincedb_path, :path => @path)
end

@tail_config[:sincedb_path] = @sincedb_path

if @start_position == "beginning"
@tail_config[:start_new_files_at] = :beginning
end
end # def register

public
def run(queue)
@tail = FileWatch::Tail.new(@tail_config)
@tail.logger = @logger
@path.each { |path| @tail.tail(path) }
hostname = Socket.gethostname

@tail.subscribe do |path, line|
@logger.debug? && @logger.debug("Received line", :path => path, :text => line)
@codec.decode(line) do |event|
decorate(event)
event["host"] = hostname if !event.include?("host")
event["path"] = path
queue << event
end
end
finished
end # def run

public
def teardown
@tail.sincedb_write
@tail.quit
end # def teardown
end # class LogStash::Inputs::File
29 changes: 29 additions & 0 deletions logstash-input-file.gemspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-file'
s.version = '0.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Stream events from files."
s.description = "Stream events from files."
s.authors = ["Elasticsearch"]
s.email = '[email protected]'
s.homepage = "http://logstash.net/"
s.require_paths = ["lib"]

# Files
s.files = `git ls-files`.split($\)+::Dir.glob('vendor/*')

# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})

# Special flag to let us know this is actually a logstash plugin
s.metadata = { "logstash_plugin" => "true", "group" => "input" }

# Gem dependencies
s.add_runtime_dependency 'logstash', '>= 1.4.0', '< 2.0.0'

s.add_runtime_dependency 'addressable'
s.add_runtime_dependency 'filewatch', ['0.5.1']

end

9 changes: 9 additions & 0 deletions rakelib/publish.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
require "gem_publisher"

desc "Publish gem to RubyGems.org"
task :publish_gem do |t|
gem_file = Dir.glob(File.expand_path('../*.gemspec',File.dirname(__FILE__))).first
gem = GemPublisher.publish_if_updated(gem_file, :rubygems)
puts "Published #{gem}" if gem
end

169 changes: 169 additions & 0 deletions rakelib/vendor.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
require "net/http"
require "uri"
require "digest/sha1"

def vendor(*args)
return File.join("vendor", *args)
end

directory "vendor/" => ["vendor"] do |task, args|
mkdir task.name
end

def fetch(url, sha1, output)

puts "Downloading #{url}"
actual_sha1 = download(url, output)

if actual_sha1 != sha1
fail "SHA1 does not match (expected '#{sha1}' but got '#{actual_sha1}')"
end
end # def fetch

def file_fetch(url, sha1)
filename = File.basename( URI(url).path )
output = "vendor/#{filename}"
task output => [ "vendor/" ] do
begin
actual_sha1 = file_sha1(output)
if actual_sha1 != sha1
fetch(url, sha1, output)
end
rescue Errno::ENOENT
fetch(url, sha1, output)
end
end.invoke

return output
end

def file_sha1(path)
digest = Digest::SHA1.new
fd = File.new(path, "r")
while true
begin
digest << fd.sysread(16384)
rescue EOFError
break
end
end
return digest.hexdigest
ensure
fd.close if fd
end

def download(url, output)
uri = URI(url)
digest = Digest::SHA1.new
tmp = "#{output}.tmp"
Net::HTTP.start(uri.host, uri.port, :use_ssl => (uri.scheme == "https")) do |http|
request = Net::HTTP::Get.new(uri.path)
http.request(request) do |response|
fail "HTTP fetch failed for #{url}. #{response}" if [200, 301].include?(response.code)
size = (response["content-length"].to_i || -1).to_f
count = 0
File.open(tmp, "w") do |fd|
response.read_body do |chunk|
fd.write(chunk)
digest << chunk
if size > 0 && $stdout.tty?
count += chunk.bytesize
$stdout.write(sprintf("\r%0.2f%%", count/size * 100))
end
end
end
$stdout.write("\r \r") if $stdout.tty?
end
end

File.rename(tmp, output)

return digest.hexdigest
rescue SocketError => e
puts "Failure while downloading #{url}: #{e}"
raise
ensure
File.unlink(tmp) if File.exist?(tmp)
end # def download

def untar(tarball, &block)
require "archive/tar/minitar"
tgz = Zlib::GzipReader.new(File.open(tarball))
# Pull out typesdb
tar = Archive::Tar::Minitar::Input.open(tgz)
tar.each do |entry|
path = block.call(entry)
next if path.nil?
parent = File.dirname(path)

mkdir_p parent unless File.directory?(parent)

# Skip this file if the output file is the same size
if entry.directory?
mkdir path unless File.directory?(path)
else
entry_mode = entry.instance_eval { @mode } & 0777
if File.exists?(path)
stat = File.stat(path)
# TODO(sissel): Submit a patch to archive-tar-minitar upstream to
# expose headers in the entry.
entry_size = entry.instance_eval { @size }
# If file sizes are same, skip writing.
next if stat.size == entry_size && (stat.mode & 0777) == entry_mode
end
puts "Extracting #{entry.full_name} from #{tarball} #{entry_mode.to_s(8)}"
File.open(path, "w") do |fd|
# eof? check lets us skip empty files. Necessary because the API provided by
# Archive::Tar::Minitar::Reader::EntryStream only mostly acts like an
# IO object. Something about empty files in this EntryStream causes
# IO.copy_stream to throw "can't convert nil into String" on JRuby
# TODO(sissel): File a bug about this.
while !entry.eof?
chunk = entry.read(16384)
fd.write(chunk)
end
#IO.copy_stream(entry, fd)
end
File.chmod(entry_mode, path)
end
end
tar.close
File.unlink(tarball) if File.file?(tarball)
end # def untar

def ungz(file)

outpath = file.gsub('.gz', '')
tgz = Zlib::GzipReader.new(File.open(file))
begin
File.open(outpath, "w") do |out|
IO::copy_stream(tgz, out)
end
File.unlink(file)
rescue
File.unlink(outpath) if File.file?(outpath)
raise
end
tgz.close
end

desc "Process any vendor files required for this plugin"
task "vendor" do |task, args|

@files.each do |file|
download = file_fetch(file['url'], file['sha1'])
if download =~ /.tar.gz/
prefix = download.gsub('.tar.gz', '').gsub('vendor/', '')
untar(download) do |entry|
if !file['files'].nil?
next unless file['files'].include?(entry.full_name.gsub(prefix, ''))
out = entry.full_name.split("/").last
end
File.join('vendor', out)
end
elsif download =~ /.gz/
ungz(download)
end
end

end
Loading

0 comments on commit e50c4b6

Please sign in to comment.