Skip to content

Commit

Permalink
cleanup, also fixing a bug where was causing creation of large number…
Browse files Browse the repository at this point in the history
… of unused files/directories

Fixes #102
  • Loading branch information
Nevins Bartolomeo authored and ph committed Dec 15, 2016
1 parent d8c1d98 commit 9b16097
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 37 deletions.
8 changes: 4 additions & 4 deletions lib/logstash/outputs/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
# If you define file_size you have a number of files in consideration of the section and the current tag.
# 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket,
# for now the only thing this plugin can do is to put the file when logstash restart.
config :time_file, :validate => :number, :default => 15 * 60
config :time_file, :validate => :number, :default => 15

## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false".
## This is hack for not destroy the new files after restoring the initial files.
Expand All @@ -136,10 +136,10 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
config :prefix, :validate => :string, :default => ''

# Specify how many workers to use to upload the files to S3
config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).round
config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).ceil

# Number of items we can keep in the local queue before uploading them
config :upload_queue_size, :validate => :number, :default => 2 * (Concurrent.processor_count * 0.25).round
config :upload_queue_size, :validate => :number, :default => 2 * (Concurrent.processor_count * 0.25).ceil

# The version of the S3 signature hash to use. Normally uses the internal client default, can be explicitly
# specified here
Expand Down Expand Up @@ -349,7 +349,7 @@ def restore_from_crash
Dir.glob(::File.join(@temporary_directory, "**/*")) do |file|
if ::File.file?(file)
key_parts = Pathname.new(file).relative_path_from(temp_folder_path).to_s.split(::File::SEPARATOR)
temp_file = TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), ::File.open(file, "r"))
temp_file = TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), ::File.open(file, "r"), key_parts.slice(0, 1))

@logger.debug("Recovering from crash and uploading", :file => temp_file.path)
@crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file))
Expand Down
34 changes: 23 additions & 11 deletions lib/logstash/outputs/s3/file_repository.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
require "concurrent/timer_task"
require "logstash/util"

java_import "java.util.concurrent.ConcurrentHashMap"
ConcurrentHashMap = java.util.concurrent.ConcurrentHashMap

module LogStash
module Outputs
Expand Down Expand Up @@ -34,6 +34,23 @@ def stale?
def apply(prefix)
return self
end

def delete!
with_lock{ |factory| factory.current.delete! }
end
end

class FactoryInitializer
def initialize(tags, encoding, temporary_directory, stale_time)
@tags = tags
@encoding = encoding
@temporary_directory = temporary_directory
@stale_time = stale_time
end

def apply(prefix_key)
PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)
end
end

def initialize(tags, encoding, temporary_directory,
Expand All @@ -43,20 +60,15 @@ def initialize(tags, encoding, temporary_directory,
# logtash after a crash we keep the remote structure
@prefixed_factories = ConcurrentHashMap.new

@tags = tags
@encoding = encoding
@temporary_directory = temporary_directory

@stale_time = stale_time
@sweeper_interval = sweeper_interval

@factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time)

start_stale_sweeper
end

def keys
arr = []
@prefixed_factories.keys.each {|k| arr << k}
arr
@prefixed_factories.keySet
end

def each_files
Expand All @@ -67,7 +79,7 @@ def each_files

# Return the file factory
def get_factory(prefix_key)
@prefixed_factories.computeIfAbsent(prefix_key, PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)).with_lock { |factory| yield factory }
@prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory }
end

def get_file(prefix_key)
Expand All @@ -85,7 +97,7 @@ def size
def remove_stale(k, v)
if v.stale?
@prefixed_factories.remove(k, v)
v.with_lock{ |factor| factor.current.delete!}
v.delete!
end
end

Expand Down
10 changes: 8 additions & 2 deletions lib/logstash/outputs/s3/temporary_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@ class TemporaryFile

def_delegators :@fd, *DELEGATES_METHODS

def initialize(key, fd)
def initialize(key, fd, temp_path)
@fd = fd
@key = key
@temp_path = temp_path
@created_at = Time.now
end

def ctime
@created_at
end

def temp_path
@temp_path
end

def key
@key.gsub(/^\//, "")
end
Expand All @@ -33,7 +38,8 @@ def key
# we delete the root of the UUID, using a UUID also remove the risk of deleting unwanted file, it acts as
# a sandbox.
def delete!
::FileUtils.rm_rf(path.gsub(/#{Regexp.escape(key)}$/, ""))
@fd.close
::FileUtils.rm_rf(@temp_path, :secure => true)
end

def empty?
Expand Down
19 changes: 11 additions & 8 deletions lib/logstash/outputs/s3/temporary_file_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ def initialize(prefix, tags, encoding, temporary_directory)
@tags = tags
@encoding = encoding
@temporary_directory = temporary_directory
@lock = Mutex.new

rotate!
end

def rotate!
@current = new_file
increment_counter
@current
@lock.synchronize {
@current = new_file
increment_counter
@current
}
end

private
Expand Down Expand Up @@ -71,18 +74,18 @@ def generate_name
def new_file
uuid = SecureRandom.uuid
name = generate_name
path = ::File.join(temporary_directory, uuid, prefix)
path = ::File.join(temporary_directory, uuid)
key = ::File.join(prefix, name)

FileUtils.mkdir_p(path)
FileUtils.mkdir_p(::File.join(path, prefix))

io = if gzip?
Zlib::GzipWriter.open(::File.join(path, name))
Zlib::GzipWriter.open(::File.join(path, key))
else
::File.open(::File.join(path, name), FILE_MODE)
::File.open(::File.join(path, key), FILE_MODE)
end

TemporaryFile.new(key, io)
TemporaryFile.new(key, io, path)
end
end
end
Expand Down
36 changes: 29 additions & 7 deletions spec/outputs/s3/file_repository_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@
end
end

it "returns the same file for the same dynamic prefix key" do
prefix = "%{type}/%{+YYYY}/%{+MM}/%{+dd}/"
event = LogStash::Event.new({ "type" => "syslog"})
key = event.sprintf(prefix)
file_path = nil


subject.get_file(key) do |file|
file_path = file.path
end

subject.get_file(key) do |file|
expect(file.path).to eq(file_path)
end
end

it "returns different file for different prefix keys" do
file_path = nil

Expand Down Expand Up @@ -72,21 +88,27 @@

it "returns all available keys" do
subject.get_file(prefix_key) { |file| file.write("something") }
expect(subject.keys).to eq([prefix_key])
expect(subject.keys.toArray).to eq([prefix_key])
end

it "clean stale factories" do
file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1)
expect(file_repository.size).to eq(0)
file_repository.get_factory(prefix_key) do |factory|
@file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1)
expect(@file_repository.size).to eq(0)
path = ""
@file_repository.get_factory(prefix_key) do |factory|
factory.current.write("hello")
# force a rotation so we get an empty file that will get stale.
factory.rotate!
path = factory.current.temp_path
end

file_repository.get_file("another-prefix") { |file| file.write("hello") }
expect(file_repository.size).to eq(2)
try(10) { expect(file_repository.size).to eq(1) }
@file_repository.get_file("another-prefix") { |file| file.write("hello") }
expect(@file_repository.size).to eq(2)
@file_repository.keys.each do |k|
puts k
end
try(10) { expect(@file_repository.size).to eq(1) }
expect(File.directory?(path)).to be_falsey
end
end

Expand Down
3 changes: 2 additions & 1 deletion spec/outputs/s3/size_and_time_rotation_policy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
let(:time_file) { 1 }
subject { described_class.new(file_size, time_file) }

let(:temporary_directory) { Stud::Temporary.pathname }
let(:temporary_file) { Stud::Temporary.file }
let(:name) { "foobar" }
let(:content) { "hello" * 1000 }
let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) }
let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) }

it "raises an exception if the `time_file` is set to 0" do
expect { described_class.new(100, 0) }.to raise_error(LogStash::ConfigurationError, /time_file/)
Expand Down
3 changes: 2 additions & 1 deletion spec/outputs/s3/size_rotation_policy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
describe LogStash::Outputs::S3::SizeRotationPolicy do
subject { described_class.new(size_file) }

let(:temporary_directory) { Stud::Temporary.directory }
let(:temporary_file) { Stud::Temporary.file }
let(:name) { "foobar" }
let(:content) { "hello" * 1000 }
let(:size_file) { 10 } # in bytes
let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) }
let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) }

it "returns true if the size on disk is higher than the `size_file`" do
file.write(content)
Expand Down
2 changes: 1 addition & 1 deletion spec/outputs/s3/temporary_file_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
FileUtils.mkdir_p(::File.join(temporary_directory, uuid))
end

subject { described_class.new(key, temporary_file) }
subject { described_class.new(key, temporary_file, temporary_directory) }

it "returns the key of the file" do
expect(subject.key).to eq(key)
Expand Down
3 changes: 2 additions & 1 deletion spec/outputs/s3/time_rotation_policy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
subject { described_class.new(max_time) }

let(:max_time) { 1 }
let(:temporary_directory) { Stud::Temporary.directory }
let(:temporary_file) { Stud::Temporary.file }
let(:name) { "foobar" }
let(:content) { "hello" * 1000 }
let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) }
let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) }

it "raises an exception if the `file_time` is set to 0" do
expect { described_class.new(0) }.to raise_error(LogStash::ConfigurationError, /`time_file` need to be greather than 0/)
Expand Down
3 changes: 2 additions & 1 deletion spec/outputs/s3/uploader_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
let(:bucket_name) { "foobar-bucket" }
let(:client) { Aws::S3::Client.new(stub_responses: true) }
let(:bucket) { Aws::S3::Bucket.new(bucket_name, :client => client) }
let(:temporary_directory) { Stud::Temporary.pathname }
let(:temporary_file) { Stud::Temporary.file }
let(:key) { "foobar" }
let(:upload_options) { {} }
Expand All @@ -24,7 +25,7 @@
end

let(:file) do
f = LogStash::Outputs::S3::TemporaryFile.new(key, temporary_file)
f = LogStash::Outputs::S3::TemporaryFile.new(key, temporary_file, temporary_directory)
f.write("random content")
f.fsync
f
Expand Down

0 comments on commit 9b16097

Please sign in to comment.