diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3300a23 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +*.gem +Gemfile.lock +.bundle +vendor diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..ccb9012 --- /dev/null +++ b/Gemfile @@ -0,0 +1,4 @@ +source 'https://rubygems.org' +gem 'rake' +gem 'gem_publisher' +gem 'archive-tar-minitar' diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..da6b220 --- /dev/null +++ b/Rakefile @@ -0,0 +1,6 @@ +@files=[] + +task :default do + system("rake -T") +end + diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb new file mode 100644 index 0000000..8d5ba28 --- /dev/null +++ b/lib/logstash/inputs/file.rb @@ -0,0 +1,150 @@ +# encoding: utf-8 +require "logstash/inputs/base" +require "logstash/namespace" + +require "pathname" +require "socket" # for Socket.gethostname + +# Stream events from files. +# +# By default, each event is assumed to be one line. If you would like +# to join multiple log lines into one event, you'll want to use the +# multiline codec. +# +# Files are followed in a manner similar to "tail -0F". File rotation +# is detected and handled by this input. +class LogStash::Inputs::File < LogStash::Inputs::Base + config_name "file" + milestone 2 + + # TODO(sissel): This should switch to use the 'line' codec by default + # once file following + default :codec, "plain" + + # The path(s) to the file(s) to use as an input. + # You can use globs here, such as `/var/log/*.log` + # Paths must be absolute and cannot be relative. + # + # You may also configure multiple paths. See an example + # on the [Logstash configuration page](configuration#array). + config :path, :validate => :array, :required => true + + # Exclusions (matched against the filename, not full path). Globs + # are valid here, too. For example, if you have + # + # path => "/var/log/*" + # + # You might want to exclude gzipped files: + # + # exclude => "*.gz" + config :exclude, :validate => :array + + # How often we stat files to see if they have been modified. Increasing + # this interval will decrease the number of system calls we make, but + # increase the time to detect new log lines. + config :stat_interval, :validate => :number, :default => 1 + + # How often we expand globs to discover new files to watch. + config :discover_interval, :validate => :number, :default => 15 + + # Where to write the sincedb database (keeps track of the current + # position of monitored log files). The default will write + # sincedb files to some path matching "$HOME/.sincedb*" + config :sincedb_path, :validate => :string + + # How often (in seconds) to write a since database with the current position of + # monitored log files. + config :sincedb_write_interval, :validate => :number, :default => 15 + + # Choose where Logstash starts initially reading files: at the beginning or + # at the end. The default behavior treats files like live streams and thus + # starts at the end. If you have old data you want to import, set this + # to 'beginning' + # + # This option only modifies "first contact" situations where a file is new + # and not seen before. If a file has already been seen before, this option + # has no effect. + config :start_position, :validate => [ "beginning", "end"], :default => "end" + + public + def register + require "addressable/uri" + require "filewatch/tail" + require "digest/md5" + @logger.info("Registering file input", :path => @path) + + @tail_config = { + :exclude => @exclude, + :stat_interval => @stat_interval, + :discover_interval => @discover_interval, + :sincedb_write_interval => @sincedb_write_interval, + :logger => @logger, + } + + @path.each do |path| + if Pathname.new(path).relative? + raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}") + end + end + + if @sincedb_path.nil? + if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil? + @logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \ + "to keep track of the files I'm watching. Either set " \ + "HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \ + "in your Logstash config for the file input with " \ + "path '#{@path.inspect}'") + raise # TODO(sissel): HOW DO I FAIL PROPERLY YO + end + + #pick SINCEDB_DIR if available, otherwise use HOME + sincedb_dir = ENV["SINCEDB_DIR"] || ENV["HOME"] + + # Join by ',' to make it easy for folks to know their own sincedb + # generated path (vs, say, inspecting the @path array) + @sincedb_path = File.join(sincedb_dir, ".sincedb_" + Digest::MD5.hexdigest(@path.join(","))) + + # Migrate any old .sincedb to the new file (this is for version <=1.1.1 compatibility) + old_sincedb = File.join(sincedb_dir, ".sincedb") + if File.exists?(old_sincedb) + @logger.info("Renaming old ~/.sincedb to new one", :old => old_sincedb, + :new => @sincedb_path) + File.rename(old_sincedb, @sincedb_path) + end + + @logger.info("No sincedb_path set, generating one based on the file path", + :sincedb_path => @sincedb_path, :path => @path) + end + + @tail_config[:sincedb_path] = @sincedb_path + + if @start_position == "beginning" + @tail_config[:start_new_files_at] = :beginning + end + end # def register + + public + def run(queue) + @tail = FileWatch::Tail.new(@tail_config) + @tail.logger = @logger + @path.each { |path| @tail.tail(path) } + hostname = Socket.gethostname + + @tail.subscribe do |path, line| + @logger.debug? && @logger.debug("Received line", :path => path, :text => line) + @codec.decode(line) do |event| + decorate(event) + event["host"] = hostname if !event.include?("host") + event["path"] = path + queue << event + end + end + finished + end # def run + + public + def teardown + @tail.sincedb_write + @tail.quit + end # def teardown +end # class LogStash::Inputs::File diff --git a/logstash-input-file.gemspec b/logstash-input-file.gemspec new file mode 100644 index 0000000..c32b140 --- /dev/null +++ b/logstash-input-file.gemspec @@ -0,0 +1,29 @@ +Gem::Specification.new do |s| + + s.name = 'logstash-input-file' + s.version = '0.1.0' + s.licenses = ['Apache License (2.0)'] + s.summary = "Stream events from files." + s.description = "Stream events from files." + s.authors = ["Elasticsearch"] + s.email = 'richard.pijnenburg@elasticsearch.com' + s.homepage = "http://logstash.net/" + s.require_paths = ["lib"] + + # Files + s.files = `git ls-files`.split($\)+::Dir.glob('vendor/*') + + # Tests + s.test_files = s.files.grep(%r{^(test|spec|features)/}) + + # Special flag to let us know this is actually a logstash plugin + s.metadata = { "logstash_plugin" => "true", "group" => "input" } + + # Gem dependencies + s.add_runtime_dependency 'logstash', '>= 1.4.0', '< 2.0.0' + + s.add_runtime_dependency 'addressable' + s.add_runtime_dependency 'filewatch', ['0.5.1'] + +end + diff --git a/rakelib/publish.rake b/rakelib/publish.rake new file mode 100644 index 0000000..0ef58c0 --- /dev/null +++ b/rakelib/publish.rake @@ -0,0 +1,9 @@ +require "gem_publisher" + +desc "Publish gem to RubyGems.org" +task :publish_gem do |t| + gem_file = Dir.glob(File.expand_path('../*.gemspec',File.dirname(__FILE__))).first + gem = GemPublisher.publish_if_updated(gem_file, :rubygems) + puts "Published #{gem}" if gem +end + diff --git a/rakelib/vendor.rake b/rakelib/vendor.rake new file mode 100644 index 0000000..2135119 --- /dev/null +++ b/rakelib/vendor.rake @@ -0,0 +1,169 @@ +require "net/http" +require "uri" +require "digest/sha1" + +def vendor(*args) + return File.join("vendor", *args) +end + +directory "vendor/" => ["vendor"] do |task, args| + mkdir task.name +end + +def fetch(url, sha1, output) + + puts "Downloading #{url}" + actual_sha1 = download(url, output) + + if actual_sha1 != sha1 + fail "SHA1 does not match (expected '#{sha1}' but got '#{actual_sha1}')" + end +end # def fetch + +def file_fetch(url, sha1) + filename = File.basename( URI(url).path ) + output = "vendor/#{filename}" + task output => [ "vendor/" ] do + begin + actual_sha1 = file_sha1(output) + if actual_sha1 != sha1 + fetch(url, sha1, output) + end + rescue Errno::ENOENT + fetch(url, sha1, output) + end + end.invoke + + return output +end + +def file_sha1(path) + digest = Digest::SHA1.new + fd = File.new(path, "r") + while true + begin + digest << fd.sysread(16384) + rescue EOFError + break + end + end + return digest.hexdigest +ensure + fd.close if fd +end + +def download(url, output) + uri = URI(url) + digest = Digest::SHA1.new + tmp = "#{output}.tmp" + Net::HTTP.start(uri.host, uri.port, :use_ssl => (uri.scheme == "https")) do |http| + request = Net::HTTP::Get.new(uri.path) + http.request(request) do |response| + fail "HTTP fetch failed for #{url}. #{response}" if [200, 301].include?(response.code) + size = (response["content-length"].to_i || -1).to_f + count = 0 + File.open(tmp, "w") do |fd| + response.read_body do |chunk| + fd.write(chunk) + digest << chunk + if size > 0 && $stdout.tty? + count += chunk.bytesize + $stdout.write(sprintf("\r%0.2f%%", count/size * 100)) + end + end + end + $stdout.write("\r \r") if $stdout.tty? + end + end + + File.rename(tmp, output) + + return digest.hexdigest +rescue SocketError => e + puts "Failure while downloading #{url}: #{e}" + raise +ensure + File.unlink(tmp) if File.exist?(tmp) +end # def download + +def untar(tarball, &block) + require "archive/tar/minitar" + tgz = Zlib::GzipReader.new(File.open(tarball)) + # Pull out typesdb + tar = Archive::Tar::Minitar::Input.open(tgz) + tar.each do |entry| + path = block.call(entry) + next if path.nil? + parent = File.dirname(path) + + mkdir_p parent unless File.directory?(parent) + + # Skip this file if the output file is the same size + if entry.directory? + mkdir path unless File.directory?(path) + else + entry_mode = entry.instance_eval { @mode } & 0777 + if File.exists?(path) + stat = File.stat(path) + # TODO(sissel): Submit a patch to archive-tar-minitar upstream to + # expose headers in the entry. + entry_size = entry.instance_eval { @size } + # If file sizes are same, skip writing. + next if stat.size == entry_size && (stat.mode & 0777) == entry_mode + end + puts "Extracting #{entry.full_name} from #{tarball} #{entry_mode.to_s(8)}" + File.open(path, "w") do |fd| + # eof? check lets us skip empty files. Necessary because the API provided by + # Archive::Tar::Minitar::Reader::EntryStream only mostly acts like an + # IO object. Something about empty files in this EntryStream causes + # IO.copy_stream to throw "can't convert nil into String" on JRuby + # TODO(sissel): File a bug about this. + while !entry.eof? + chunk = entry.read(16384) + fd.write(chunk) + end + #IO.copy_stream(entry, fd) + end + File.chmod(entry_mode, path) + end + end + tar.close + File.unlink(tarball) if File.file?(tarball) +end # def untar + +def ungz(file) + + outpath = file.gsub('.gz', '') + tgz = Zlib::GzipReader.new(File.open(file)) + begin + File.open(outpath, "w") do |out| + IO::copy_stream(tgz, out) + end + File.unlink(file) + rescue + File.unlink(outpath) if File.file?(outpath) + raise + end + tgz.close +end + +desc "Process any vendor files required for this plugin" +task "vendor" do |task, args| + + @files.each do |file| + download = file_fetch(file['url'], file['sha1']) + if download =~ /.tar.gz/ + prefix = download.gsub('.tar.gz', '').gsub('vendor/', '') + untar(download) do |entry| + if !file['files'].nil? + next unless file['files'].include?(entry.full_name.gsub(prefix, '')) + out = entry.full_name.split("/").last + end + File.join('vendor', out) + end + elsif download =~ /.gz/ + ungz(download) + end + end + +end diff --git a/spec/inputs/file_spec.rb b/spec/inputs/file_spec.rb new file mode 100644 index 0000000..462ade1 --- /dev/null +++ b/spec/inputs/file_spec.rb @@ -0,0 +1,132 @@ +# encoding: utf-8 + +require "spec_helper" +require "tempfile" + +describe "inputs/file" do + + + describe "starts at the end of an existing file" do + tmp_file = Tempfile.new('logstash-spec-input-file') + + config <<-CONFIG + input { + file { + type => "blah" + path => "#{tmp_file.path}" + sincedb_path => "/dev/null" + } + } + CONFIG + + input do |pipeline, queue| + File.open(tmp_file, "w") do |fd| + fd.puts("ignore me 1") + fd.puts("ignore me 2") + end + + Thread.new { pipeline.run } + sleep 0.1 while !pipeline.ready? + + # at this point even if pipeline.ready? == true the plugins + # threads might still be initializing so we cannot know when the + # file plugin will have seen the original file, it could see it + # after the first(s) hello world appends below, hence the + # retry logic. + + retries = 0 + loop do + insist { retries } < 20 # 2 secs should be plenty? + + File.open(tmp_file, "a") do |fd| + fd.puts("hello") + fd.puts("world") + end + + if queue.size >= 2 + events = 2.times.collect { queue.pop } + insist { events[0]["message"] } == "hello" + insist { events[1]["message"] } == "world" + break + end + + sleep(0.1) + retries += 1 + end + end + end + + describe "can start at the beginning of an existing file" do + tmp_file = Tempfile.new('logstash-spec-input-file') + + config <<-CONFIG + input { + file { + type => "blah" + path => "#{tmp_file.path}" + start_position => "beginning" + sincedb_path => "/dev/null" + } + } + CONFIG + + input do |pipeline, queue| + File.open(tmp_file, "a") do |fd| + fd.puts("hello") + fd.puts("world") + end + + Thread.new { pipeline.run } + sleep 0.1 while !pipeline.ready? + + events = 2.times.collect { queue.pop } + insist { events[0]["message"] } == "hello" + insist { events[1]["message"] } == "world" + end + end + + describe "restarts at the sincedb value" do + tmp_file = Tempfile.new('logstash-spec-input-file') + tmp_sincedb = Tempfile.new('logstash-spec-input-file-sincedb') + + config <<-CONFIG + input { + file { + type => "blah" + path => "#{tmp_file.path}" + start_position => "beginning" + sincedb_path => "#{tmp_sincedb.path}" + } + } + CONFIG + + input do |pipeline, queue| + File.open(tmp_file, "w") do |fd| + fd.puts("hello") + fd.puts("world") + end + + t = Thread.new { pipeline.run } + sleep 0.1 while !pipeline.ready? + + events = 2.times.collect { queue.pop } + pipeline.shutdown + t.join + + File.open(tmp_file, "a") do |fd| + fd.puts("foo") + fd.puts("bar") + fd.puts("baz") + end + + Thread.new { pipeline.run } + sleep 0.1 while !pipeline.ready? + + events = 3.times.collect { queue.pop } + + insist { events[0]["message"] } == "foo" + insist { events[1]["message"] } == "bar" + insist { events[2]["message"] } == "baz" + end + end +end