Skip to content

Commit

Permalink
making changes sugguested in #102
Browse files Browse the repository at this point in the history
Fixes #102
  • Loading branch information
Nevins Bartolomeo authored and ph committed Dec 15, 2016
1 parent bcdb0df commit 586210b
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 20 deletions.
8 changes: 4 additions & 4 deletions lib/logstash/outputs/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
## Both time_file and size_file settings can trigger a log "file rotation"
## A log rotation pushes the current log "part" to s3 and deleted from local temporary storage.
#
## If you specify BOTH size_file and time_file then it will create file for each tag (if specified).
## If you specify BOTH size_file and time_file then it will create file for each tag (if specified).
## When EITHER time_file minutes have elapsed OR log file size > size_file, a log rotation is triggered.
##
## If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created.
Expand Down Expand Up @@ -136,7 +136,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
config :prefix, :validate => :string, :default => ''

# Specify how many workers to use to upload the files to S3
config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.25).round
config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).round

# Number of items we can keep in the local queue before uploading them
config :upload_queue_size, :validate => :number, :default => 2 * (Concurrent.processor_count * 0.25).round
Expand Down Expand Up @@ -171,7 +171,7 @@ def register
# to prepare for the new config validation that will be part of the core so the core can
# be moved easily.
unless @prefix.empty?
if !PathValidator.valid?(prefix)
if !PathValidator.valid?(prefix)
raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}"
end
end
Expand All @@ -180,7 +180,7 @@ def register
raise LogStash::ConfigurationError, "Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}"
end

if @validate_credentials_on_root_bucket && !WriteBucketPermissionValidator.valid?(bucket_resource)
if @validate_credentials_on_root_bucket && !WriteBucketPermissionValidator.valid?(bucket_resource)
raise LogStash::ConfigurationError, "Logstash must have the privileges to write to root bucket `#{@bucket}`, check you credentials or your permissions."
end

Expand Down
25 changes: 17 additions & 8 deletions lib/logstash/outputs/s3/file_repository.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# encoding: utf-8
require "java"
require "concurrent"
require "concurrent/map"
require "concurrent/timer_task"
require "logstash/util"

java_import "java.util.concurrent.ConcurrentHashMap"

module LogStash
module Outputs
class S3
Expand All @@ -28,14 +30,18 @@ def with_lock
def stale?
with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) }
end

def apply(prefix)
return self
end
end

def initialize(tags, encoding, temporary_directory,
stale_time = DEFAULT_STALE_TIME_SECS,
sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
# The path need to contains the prefix so when we start
# logtash after a crash we keep the remote structure
@prefixed_factories = Concurrent::Map.new
@prefixed_factories = ConcurrentHashMap.new

@tags = tags
@encoding = encoding
Expand All @@ -48,19 +54,20 @@ def initialize(tags, encoding, temporary_directory,
end

def keys
@prefixed_factories.keys
arr = []
@prefixed_factories.keySet.each {|k| arr << k}
arr
end

def each_files
@prefixed_factories.each_value do |prefixed_file|
@prefixed_factories.values do |prefixed_file|
prefixed_file.with_lock { |factory| yield factory.current }
end
end

# Return the file factory
def get_factory(prefix_key)
@prefixed_factories.compute_if_absent(prefix_key) { PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time) }
.with_lock { |factory| yield factory }
@prefixed_factories.computeIfAbsent(prefix_key, PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)).with_lock { |factory| yield factory }
end

def get_file(prefix_key)
Expand All @@ -79,8 +86,10 @@ def start_stale_sweeper
@stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
LogStash::Util.set_thread_name("S3, Stale factory sweeper")

@prefixed_factories.each_pair do |k, v|
@prefixed_factories.delete_pair(k, v) if v.stale?
@prefixed_factories.entrySet.each do |s|
if s.getValue.stale?
@prefixed_factories.remove(s.getKey, s.getValue)
end
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/logstash/outputs/s3/temporary_file_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class S3
#
# The local structure will look like this.
#
# <TEMPORARY_PATH>/<UUID>/<prefix>/ls.s3.localhost.%Y-%m-%dT%H.%m.tag_es_fb.part1.txt.gz
# <TEMPORARY_PATH>/<UUID>/<prefix>/ls.s3.localhost.%Y-%m-%dT%H.%m.tag_es_fb.part1.txt.gz
#
# Since the UUID should be fairly unique I can destroy the whole path when an upload is complete.
# I do not have to mess around to check if the other directory have file in it before destroying them.
Expand Down Expand Up @@ -59,7 +59,7 @@ def current_time
end

def generate_name
filename = "ls.s3.#{Socket.gethostname}.#{current_time}"
filename = "ls.s3.#{SecureRandom.uuid}.#{current_time}"

if tags.size > 0
"#{filename}.tag_#{tags.join('.')}.part#{counter}.#{extension}"
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/outputs/s3/time_rotation_policy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def initialize(time_file)
end

def rotate?(file)
file.size > 0 && Time.now - file.ctime >= time_file
file.size > 0 && ((Time.now - file.ctime)/60).floor >= time_file
end

def need_periodic?
Expand Down
2 changes: 1 addition & 1 deletion spec/outputs/s3/size_and_time_rotation_policy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
end

it "returns true if the file old enough" do
allow(file).to receive(:ctime).and_return(Time.now - time_file * 2)
allow(file).to receive(:ctime).and_return(Time.now - (time_file * 2 * 60) )
expect(subject.rotate?(file)).to be_truthy
end

Expand Down
4 changes: 2 additions & 2 deletions spec/outputs/s3/temporary_file_factory_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

it "includes the date" do
n = Time.now
expect(subject.current.path).to match(/ls.s3.#{Socket.gethostname}.#{n.strftime("%Y-%m-%dT")}\d+\.\d+\./)
expect(subject.current.path).to include(n.strftime("%Y-%m-%dT"))
end

it "include the file key in the path" do
Expand All @@ -50,7 +50,7 @@

it "create a unique directory in the temporary directory for each file" do
uuid = "hola"
expect(SecureRandom).to receive(:uuid).and_return(uuid)
expect(SecureRandom).to receive(:uuid).and_return(uuid).twice
expect(subject.current.path).to include(uuid)
end

Expand Down
4 changes: 2 additions & 2 deletions spec/outputs/s3/time_rotation_policy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
end

it "returns true if the file old enough" do
allow(file).to receive(:ctime).and_return(Time.now - max_time * 2)
allow(file).to receive(:ctime).and_return(Time.now - (max_time * 2 * 60))
expect(subject.rotate?(file)).to be_truthy
end

Expand All @@ -38,7 +38,7 @@

context "When the size of the file is 0" do
it "returns false if the file old enough" do
allow(file).to receive(:ctime).and_return(Time.now - max_time * 2)
allow(file).to receive(:ctime).and_return(Time.now - (max_time * 2 * 60))
expect(subject.rotate?(file)).to be_falsey
end

Expand Down

0 comments on commit 586210b

Please sign in to comment.