Skip to content

Commit

Permalink
Incorporating the review comments for the dynamic prefix based s3 out…
Browse files Browse the repository at this point in the history
…puts
  • Loading branch information
nigoel authored and elyscape committed Nov 30, 2015
1 parent b6d430d commit b127992
Showing 1 changed file with 13 additions and 19 deletions.
32 changes: 13 additions & 19 deletions lib/logstash/outputs/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
require "thread"
require "tmpdir"
require "fileutils"
require 'pathname'
require "pathname"


# INFORMATION:
Expand Down Expand Up @@ -61,7 +61,7 @@
# time_file => 5 (optional)
# format => "plain" (optional)
# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no eventns for the prefix, before cleaning up the watch on that)
# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no events for the prefix, before cleaning up the watch on that)
# }
#
class LogStash::Outputs::S3 < LogStash::Outputs::Base
Expand Down Expand Up @@ -155,8 +155,8 @@ def write_on_bucket(file)
# find and use the bucket
bucket = @s3.buckets[@bucket]

first = Pathname.new @temporary_directory
second = Pathname.new file
first = Pathname.new(@temporary_directory)
second = Pathname.new(file)

remote_filename_path = second.relative_path_from first

Expand Down Expand Up @@ -187,11 +187,9 @@ def create_temporary_file(prefix)
@tempfile[prefix].close
end

if @prefixes.include? prefix
if @prefixes.include?(prefix)
dirname = File.dirname(filename)
unless File.directory?(dirname)
FileUtils.mkdir_p(dirname)
end
FileUtils.mkdir_p(dirname) unless File.directory?(dirname)
@logger.debug("S3: Creating a new temporary file", :filename => filename)
@tempfile[prefix] = File.open(filename, "a")
end
Expand Down Expand Up @@ -226,8 +224,6 @@ def register

test_s3_write
restore_from_crashes if @restore == true
#reset_page_counter
#create_temporary_file
configure_periodic_rotation if time_file != 0
configure_upload_workers

Expand Down Expand Up @@ -270,7 +266,7 @@ def restore_from_crashes
end

public
def shouldcleanup(prefix)
def need_cleanup?(prefix)
return @empty_uploads[prefix] > @no_event_wait
end

Expand All @@ -281,8 +277,7 @@ def move_file_to_bucket(file)

basepath = Pathname.new @temporary_directory
dirname = Pathname.new File.dirname(file)
prefixpath = dirname.relative_path_from basepath
prefix = prefixpath.to_s
prefix = dirname.relative_path_from(basepath).to_s
@logger.debug("S3: moving the file for prefix", :prefix => prefix)

if !File.zero?(file)
Expand All @@ -308,9 +303,8 @@ def move_file_to_bucket(file)
@logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory)
end

if shouldcleanup(prefix)
cleanprefix(prefix)
end
clean_prefix(prefix) if need_cleanup?(prefix)

end

public
Expand Down Expand Up @@ -369,7 +363,7 @@ def close
shutdown_upload_workers
@periodic_rotation_thread.stop! if @periodic_rotation_thread

for prefix in @prefixes
@prefixes.each do |prefix|
@file_rotation_lock[prefix].synchronize do
@tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed?
end
Expand All @@ -385,7 +379,7 @@ def shutdown_upload_workers
private
def handle_event(encoded_event, event)
actualprefix = event.sprintf(@prefix)
if not @prefixes.to_a().include? actualprefix
if !@prefixes.include? actualprefix
@file_rotation_lock[actualprefix] = Mutex.new
@prefixes.add(actualprefix)
reset_page_counter(actualprefix)
Expand Down Expand Up @@ -425,7 +419,7 @@ def configure_periodic_rotation
end

private
def cleanprefix(prefix)
def clean_prefix(prefix)
path = File.join(@temporary_directory, prefix)
@logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix)
@file_rotation_lock[prefix].synchronize do
Expand Down

0 comments on commit b127992

Please sign in to comment.