diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 5eafedf1..fb4a5df0 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -112,7 +112,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # If you define file_size you have a number of files in consideration of the section and the current tag. # 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket, # for now the only thing this plugin can do is to put the file when logstash restart. - config :time_file, :validate => :number, :default => 15 * 60 + config :time_file, :validate => :number, :default => 15 ## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false". ## This is hack for not destroy the new files after restoring the initial files. @@ -136,10 +136,10 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base config :prefix, :validate => :string, :default => '' # Specify how many workers to use to upload the files to S3 - config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).round + config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).ceil # Number of items we can keep in the local queue before uploading them - config :upload_queue_size, :validate => :number, :default => 2 * (Concurrent.processor_count * 0.25).round + config :upload_queue_size, :validate => :number, :default => 2 * (Concurrent.processor_count * 0.25).ceil # The version of the S3 signature hash to use. Normally uses the internal client default, can be explicitly # specified here @@ -349,7 +349,7 @@ def restore_from_crash Dir.glob(::File.join(@temporary_directory, "**/*")) do |file| if ::File.file?(file) key_parts = Pathname.new(file).relative_path_from(temp_folder_path).to_s.split(::File::SEPARATOR) - temp_file = TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), ::File.open(file, "r")) + temp_file = TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), ::File.open(file, "r"), key_parts.slice(0, 1)) @logger.debug("Recovering from crash and uploading", :file => temp_file.path) @crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file)) diff --git a/lib/logstash/outputs/s3/file_repository.rb b/lib/logstash/outputs/s3/file_repository.rb index b49e13f1..1b1cd5c0 100644 --- a/lib/logstash/outputs/s3/file_repository.rb +++ b/lib/logstash/outputs/s3/file_repository.rb @@ -4,7 +4,7 @@ require "concurrent/timer_task" require "logstash/util" -java_import "java.util.concurrent.ConcurrentHashMap" +ConcurrentHashMap = java.util.concurrent.ConcurrentHashMap module LogStash module Outputs @@ -34,6 +34,23 @@ def stale? def apply(prefix) return self end + + def delete! + with_lock{ |factory| factory.current.delete! } + end + end + + class FactoryInitializer + def initialize(tags, encoding, temporary_directory, stale_time) + @tags = tags + @encoding = encoding + @temporary_directory = temporary_directory + @stale_time = stale_time + end + + def apply(prefix_key) + PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time) + end end def initialize(tags, encoding, temporary_directory, @@ -43,20 +60,15 @@ def initialize(tags, encoding, temporary_directory, # logtash after a crash we keep the remote structure @prefixed_factories = ConcurrentHashMap.new - @tags = tags - @encoding = encoding - @temporary_directory = temporary_directory - - @stale_time = stale_time @sweeper_interval = sweeper_interval + @factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time) + start_stale_sweeper end def keys - arr = [] - @prefixed_factories.keys.each {|k| arr << k} - arr + @prefixed_factories.keySet end def each_files @@ -67,7 +79,7 @@ def each_files # Return the file factory def get_factory(prefix_key) - @prefixed_factories.computeIfAbsent(prefix_key, PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)).with_lock { |factory| yield factory } + @prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory } end def get_file(prefix_key) @@ -85,7 +97,7 @@ def size def remove_stale(k, v) if v.stale? @prefixed_factories.remove(k, v) - v.with_lock{ |factor| factor.current.delete!} + v.delete! end end diff --git a/lib/logstash/outputs/s3/temporary_file.rb b/lib/logstash/outputs/s3/temporary_file.rb index a604e084..176db6b3 100644 --- a/lib/logstash/outputs/s3/temporary_file.rb +++ b/lib/logstash/outputs/s3/temporary_file.rb @@ -14,9 +14,10 @@ class TemporaryFile def_delegators :@fd, *DELEGATES_METHODS - def initialize(key, fd) + def initialize(key, fd, temp_path) @fd = fd @key = key + @temp_path = temp_path @created_at = Time.now end @@ -24,6 +25,10 @@ def ctime @created_at end + def temp_path + @temp_path + end + def key @key.gsub(/^\//, "") end @@ -33,7 +38,8 @@ def key # we delete the root of the UUID, using a UUID also remove the risk of deleting unwanted file, it acts as # a sandbox. def delete! - ::FileUtils.rm_rf(path.gsub(/#{Regexp.escape(key)}$/, "")) + @fd.close + ::FileUtils.rm_rf(@temp_path, :secure => true) end def empty? diff --git a/lib/logstash/outputs/s3/temporary_file_factory.rb b/lib/logstash/outputs/s3/temporary_file_factory.rb index a7d8205b..3b34eb9e 100644 --- a/lib/logstash/outputs/s3/temporary_file_factory.rb +++ b/lib/logstash/outputs/s3/temporary_file_factory.rb @@ -31,14 +31,17 @@ def initialize(prefix, tags, encoding, temporary_directory) @tags = tags @encoding = encoding @temporary_directory = temporary_directory + @lock = Mutex.new rotate! end def rotate! - @current = new_file - increment_counter - @current + @lock.synchronize { + @current = new_file + increment_counter + @current + } end private @@ -71,18 +74,18 @@ def generate_name def new_file uuid = SecureRandom.uuid name = generate_name - path = ::File.join(temporary_directory, uuid, prefix) + path = ::File.join(temporary_directory, uuid) key = ::File.join(prefix, name) - FileUtils.mkdir_p(path) + FileUtils.mkdir_p(::File.join(path, prefix)) io = if gzip? - Zlib::GzipWriter.open(::File.join(path, name)) + Zlib::GzipWriter.open(::File.join(path, key)) else - ::File.open(::File.join(path, name), FILE_MODE) + ::File.open(::File.join(path, key), FILE_MODE) end - TemporaryFile.new(key, io) + TemporaryFile.new(key, io, path) end end end diff --git a/spec/outputs/s3/file_repository_spec.rb b/spec/outputs/s3/file_repository_spec.rb index 9b9614fc..49d7c096 100644 --- a/spec/outputs/s3/file_repository_spec.rb +++ b/spec/outputs/s3/file_repository_spec.rb @@ -34,6 +34,22 @@ end end + it "returns the same file for the same dynamic prefix key" do + prefix = "%{type}/%{+YYYY}/%{+MM}/%{+dd}/" + event = LogStash::Event.new({ "type" => "syslog"}) + key = event.sprintf(prefix) + file_path = nil + + + subject.get_file(key) do |file| + file_path = file.path + end + + subject.get_file(key) do |file| + expect(file.path).to eq(file_path) + end + end + it "returns different file for different prefix keys" do file_path = nil @@ -72,21 +88,27 @@ it "returns all available keys" do subject.get_file(prefix_key) { |file| file.write("something") } - expect(subject.keys).to eq([prefix_key]) + expect(subject.keys.toArray).to eq([prefix_key]) end it "clean stale factories" do - file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1) - expect(file_repository.size).to eq(0) - file_repository.get_factory(prefix_key) do |factory| + @file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1) + expect(@file_repository.size).to eq(0) + path = "" + @file_repository.get_factory(prefix_key) do |factory| factory.current.write("hello") # force a rotation so we get an empty file that will get stale. factory.rotate! + path = factory.current.temp_path end - file_repository.get_file("another-prefix") { |file| file.write("hello") } - expect(file_repository.size).to eq(2) - try(10) { expect(file_repository.size).to eq(1) } + @file_repository.get_file("another-prefix") { |file| file.write("hello") } + expect(@file_repository.size).to eq(2) + @file_repository.keys.each do |k| + puts k + end + try(10) { expect(@file_repository.size).to eq(1) } + expect(File.directory?(path)).to be_falsey end end diff --git a/spec/outputs/s3/size_and_time_rotation_policy_spec.rb b/spec/outputs/s3/size_and_time_rotation_policy_spec.rb index e91f562d..3e822783 100644 --- a/spec/outputs/s3/size_and_time_rotation_policy_spec.rb +++ b/spec/outputs/s3/size_and_time_rotation_policy_spec.rb @@ -8,10 +8,11 @@ let(:time_file) { 1 } subject { described_class.new(file_size, time_file) } + let(:temporary_directory) { Stud::Temporary.pathname } let(:temporary_file) { Stud::Temporary.file } let(:name) { "foobar" } let(:content) { "hello" * 1000 } - let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) } + let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) } it "raises an exception if the `time_file` is set to 0" do expect { described_class.new(100, 0) }.to raise_error(LogStash::ConfigurationError, /time_file/) diff --git a/spec/outputs/s3/size_rotation_policy_spec.rb b/spec/outputs/s3/size_rotation_policy_spec.rb index 7700bbf9..e8ca74a3 100644 --- a/spec/outputs/s3/size_rotation_policy_spec.rb +++ b/spec/outputs/s3/size_rotation_policy_spec.rb @@ -7,11 +7,12 @@ describe LogStash::Outputs::S3::SizeRotationPolicy do subject { described_class.new(size_file) } + let(:temporary_directory) { Stud::Temporary.directory } let(:temporary_file) { Stud::Temporary.file } let(:name) { "foobar" } let(:content) { "hello" * 1000 } let(:size_file) { 10 } # in bytes - let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) } + let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) } it "returns true if the size on disk is higher than the `size_file`" do file.write(content) diff --git a/spec/outputs/s3/temporary_file_spec.rb b/spec/outputs/s3/temporary_file_spec.rb index 46f4fe9f..fd88e1e0 100644 --- a/spec/outputs/s3/temporary_file_spec.rb +++ b/spec/outputs/s3/temporary_file_spec.rb @@ -16,7 +16,7 @@ FileUtils.mkdir_p(::File.join(temporary_directory, uuid)) end - subject { described_class.new(key, temporary_file) } + subject { described_class.new(key, temporary_file, temporary_directory) } it "returns the key of the file" do expect(subject.key).to eq(key) diff --git a/spec/outputs/s3/time_rotation_policy_spec.rb b/spec/outputs/s3/time_rotation_policy_spec.rb index a6a69c35..d6f3407a 100644 --- a/spec/outputs/s3/time_rotation_policy_spec.rb +++ b/spec/outputs/s3/time_rotation_policy_spec.rb @@ -7,10 +7,11 @@ subject { described_class.new(max_time) } let(:max_time) { 1 } + let(:temporary_directory) { Stud::Temporary.directory } let(:temporary_file) { Stud::Temporary.file } let(:name) { "foobar" } let(:content) { "hello" * 1000 } - let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) } + let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) } it "raises an exception if the `file_time` is set to 0" do expect { described_class.new(0) }.to raise_error(LogStash::ConfigurationError, /`time_file` need to be greather than 0/) diff --git a/spec/outputs/s3/uploader_spec.rb b/spec/outputs/s3/uploader_spec.rb index 5613289a..16da5427 100644 --- a/spec/outputs/s3/uploader_spec.rb +++ b/spec/outputs/s3/uploader_spec.rb @@ -11,6 +11,7 @@ let(:bucket_name) { "foobar-bucket" } let(:client) { Aws::S3::Client.new(stub_responses: true) } let(:bucket) { Aws::S3::Bucket.new(bucket_name, :client => client) } + let(:temporary_directory) { Stud::Temporary.pathname } let(:temporary_file) { Stud::Temporary.file } let(:key) { "foobar" } let(:upload_options) { {} } @@ -24,7 +25,7 @@ end let(:file) do - f = LogStash::Outputs::S3::TemporaryFile.new(key, temporary_file) + f = LogStash::Outputs::S3::TemporaryFile.new(key, temporary_file, temporary_directory) f.write("random content") f.fsync f