Skip to content

Commit

Permalink
refactoring for dynamic prefixes and new concurrency model :shared
Browse files Browse the repository at this point in the history
**Motivation**
One of the most requested features was adding a way to add dynamic prefixes using the fieldref
syntax for the files on the bucket and also the changes in the pipeline to support shared delegator.
The S3 output by nature was always a single threaded writes but had multiples workers to process the upload, the code was threadsafe when used in the concurrency `:single` mode.

This PR addresses a few problems and provide shorter and more structured code:
- This Plugin now uses the V2 version of the SDK, this make sure we receive the latest updates and changes.
- We now uses S3's `upload_file` instead of reading chunks, this method is more efficient and will uses the multipart with threads if the files is too big.
- You can now use the `fieldref` syntax in the prefix to dynamically changes the target with the events it receives.
- The Upload queue is now a bounded list, this options is necessary to allow back pressure to be communicated back to the pipeline but its configurable by the user.
- If the queue is full the plugin will start the upload in the current thread.
- The plugin now threadsafe and support the concurrency model `shared`
- The rotation strategy can be selected, the recommended is `size_and_time` that will check for both the configured limits (`size` and `time` are also available)
- The `restore` option will now use a separate threadpool with an unbounded queue
- The `restore` option will not block the launch of logstash and will uses less resources than the real time path
- The plugin now uses `multi_receive_encode`, this will optimize the writes to the files
- rotate operation are now batched to reduce the number of IO calls.
- Empty file will not be uploaded by any rotation rotation strategy
- We now use Concurrent-Ruby for the implementation of the java executor
- If you have finer grain permission on prefixes or want faster boot, you can disable the credentials check with `validate_credentials_on_root_bucket`
- The credentials check will no longer fails if we can't delete the file
- We now have a full suite of integration test for all the defined rotation

Fixes: #4 #81 #44 #59 #50

Fixes #102
  • Loading branch information
ph committed Dec 15, 2016
1 parent f0365ed commit bcdb0df
Show file tree
Hide file tree
Showing 32 changed files with 1,693 additions and 751 deletions.
493 changes: 185 additions & 308 deletions lib/logstash/outputs/s3.rb

Large diffs are not rendered by default.

96 changes: 96 additions & 0 deletions lib/logstash/outputs/s3/file_repository.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# encoding: utf-8
require "concurrent"
require "concurrent/map"
require "concurrent/timer_task"
require "logstash/util"

module LogStash
module Outputs
class S3
class FileRepository
DEFAULT_STATE_SWEEPER_INTERVAL_SECS = 60
DEFAULT_STALE_TIME_SECS = 15 * 60
# Ensure that all access or work done
# on a factory is threadsafe
class PrefixedValue
def initialize(factory, stale_time)
@factory = factory
@lock = Mutex.new
@stale_time = stale_time
end

def with_lock
@lock.synchronize {
yield @factory
}
end

def stale?
with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) }
end
end

def initialize(tags, encoding, temporary_directory,
stale_time = DEFAULT_STALE_TIME_SECS,
sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
# The path need to contains the prefix so when we start
# logtash after a crash we keep the remote structure
@prefixed_factories = Concurrent::Map.new

@tags = tags
@encoding = encoding
@temporary_directory = temporary_directory

@stale_time = stale_time
@sweeper_interval = sweeper_interval

start_stale_sweeper
end

def keys
@prefixed_factories.keys
end

def each_files
@prefixed_factories.each_value do |prefixed_file|
prefixed_file.with_lock { |factory| yield factory.current }
end
end

# Return the file factory
def get_factory(prefix_key)
@prefixed_factories.compute_if_absent(prefix_key) { PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time) }
.with_lock { |factory| yield factory }
end

def get_file(prefix_key)
get_factory(prefix_key) { |factory| yield factory.current }
end

def shutdown
stop_stale_sweeper
end

def size
@prefixed_factories.size
end

def start_stale_sweeper
@stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
LogStash::Util.set_thread_name("S3, Stale factory sweeper")

@prefixed_factories.each_pair do |k, v|
@prefixed_factories.delete_pair(k, v) if v.stale?
end
end

@stale_sweeper.execute
end

def stop_stale_sweeper
@stale_sweeper.shutdown
end
end
end
end
end
18 changes: 18 additions & 0 deletions lib/logstash/outputs/s3/path_validator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# encoding: utf-8
module LogStash
module Outputs
class S3
class PathValidator
INVALID_CHARACTERS = "\^`><"

def self.valid?(name)
name.match(matches_re).nil?
end

def self.matches_re
/[#{Regexp.escape(INVALID_CHARACTERS)}]/
end
end
end
end
end
24 changes: 24 additions & 0 deletions lib/logstash/outputs/s3/size_and_time_rotation_policy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# encoding: utf-8
require "logstash/outputs/s3/size_rotation_policy"
require "logstash/outputs/s3/time_rotation_policy"

module LogStash
module Outputs
class S3
class SizeAndTimeRotationPolicy
def initialize(file_size, time_file)
@size_strategy = SizeRotationPolicy.new(file_size)
@time_strategy = TimeRotationPolicy.new(time_file)
end

def rotate?(file)
@size_strategy.rotate?(file) || @time_strategy.rotate?(file)
end

def need_periodic?
true
end
end
end
end
end
26 changes: 26 additions & 0 deletions lib/logstash/outputs/s3/size_rotation_policy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# encoding: utf-8
module LogStash
module Outputs
class S3
class SizeRotationPolicy
attr_reader :size_file

def initialize(size_file)
if size_file <= 0
raise LogStash::ConfigurationError, "`size_file` need to be greather than 0"
end

@size_file = size_file
end

def rotate?(file)
file.size >= size_file
end

def need_periodic?
false
end
end
end
end
end
45 changes: 45 additions & 0 deletions lib/logstash/outputs/s3/temporary_file.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# encoding: utf-8
require "thread"
require "forwardable"
require "fileutils"

module LogStash
module Outputs
class S3
# Wrap the actual file descriptor into an utility classe
# It make it more OOP and easier to reason with the paths.
class TemporaryFile
extend Forwardable
DELEGATES_METHODS = [:path, :write, :close, :size, :fsync]

def_delegators :@fd, *DELEGATES_METHODS

def initialize(key, fd)
@fd = fd
@key = key
@created_at = Time.now
end

def ctime
@created_at
end

def key
@key.gsub(/^\//, "")
end

# Each temporary file is made inside a directory named with an UUID,
# instead of deleting the file directly and having the risk of deleting other files
# we delete the root of the UUID, using a UUID also remove the risk of deleting unwanted file, it acts as
# a sandbox.
def delete!
::FileUtils.rm_rf(path.gsub(/#{Regexp.escape(key)}$/, ""))
end

def empty?
size == 0
end
end
end
end
end
90 changes: 90 additions & 0 deletions lib/logstash/outputs/s3/temporary_file_factory.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# encoding: utf-8
require "socket"
require "securerandom"
require "fileutils"

module LogStash
module Outputs
class S3
# Since the file can contains dynamic part, we have to handle a more local structure to
# allow a nice recovery from a crash.
#
# The local structure will look like this.
#
# <TEMPORARY_PATH>/<UUID>/<prefix>/ls.s3.localhost.%Y-%m-%dT%H.%m.tag_es_fb.part1.txt.gz
#
# Since the UUID should be fairly unique I can destroy the whole path when an upload is complete.
# I do not have to mess around to check if the other directory have file in it before destroying them.
class TemporaryFileFactory
FILE_MODE = "a"
GZIP_ENCODING = "gzip"
GZIP_EXTENSION = "txt.gz"
TXT_EXTENSION = "txt"
STRFTIME = "%Y-%m-%dT%H.%M"

attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current

def initialize(prefix, tags, encoding, temporary_directory)
@counter = 0
@prefix = prefix

@tags = tags
@encoding = encoding
@temporary_directory = temporary_directory

rotate!
end

def rotate!
@current = new_file
increment_counter
@current
end

private
def extension
gzip? ? GZIP_EXTENSION : TXT_EXTENSION
end

def gzip?
encoding == GZIP_ENCODING
end

def increment_counter
@counter += 1
end

def current_time
Time.now.strftime(STRFTIME)
end

def generate_name
filename = "ls.s3.#{Socket.gethostname}.#{current_time}"

if tags.size > 0
"#{filename}.tag_#{tags.join('.')}.part#{counter}.#{extension}"
else
"#{filename}.part#{counter}.#{extension}"
end
end

def new_file
uuid = SecureRandom.uuid
name = generate_name
path = ::File.join(temporary_directory, uuid, prefix)
key = ::File.join(prefix, name)

FileUtils.mkdir_p(path)

io = if gzip?
Zlib::GzipWriter.open(::File.join(path, name))
else
::File.open(::File.join(path, name), FILE_MODE)
end

TemporaryFile.new(key, io)
end
end
end
end
end
26 changes: 26 additions & 0 deletions lib/logstash/outputs/s3/time_rotation_policy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# encoding: utf-8
module LogStash
module Outputs
class S3
class TimeRotationPolicy
attr_reader :time_file

def initialize(time_file)
if time_file <= 0
raise LogStash::ConfigurationError, "`time_file` need to be greather than 0"
end

@time_file = time_file
end

def rotate?(file)
file.size > 0 && Time.now - file.ctime >= time_file
end

def need_periodic?
true
end
end
end
end
end
60 changes: 60 additions & 0 deletions lib/logstash/outputs/s3/uploader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# encoding: utf-8
require "logstash/util"
require "aws-sdk-resources"

module LogStash
module Outputs
class S3
class Uploader
TIME_BEFORE_RETRYING_SECONDS = 1
DEFAULT_THREADPOOL = Concurrent::ThreadPoolExecutor.new({
:min_threads => 1,
:max_threads => 8,
:max_queue => 1,
:fallback_policy => :caller_runs
})


attr_reader :bucket, :upload_options, :logger

def initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL)
@bucket = bucket
@workers_pool = threadpool
@logger = logger
end

def upload_async(file, options = {})
@workers_pool.post do
LogStash::Util.set_thread_name("S3 output uploader, file: #{file.path}")
upload(file, options)
end
end

def upload(file, options = {})
upload_options = options.fetch(:upload_options, {})

begin
obj = bucket.object(file.key)
obj.upload_file(file.path, upload_options)
rescue => e
# When we get here it usually mean that S3 tried to do some retry by himself (default is 3)
# When the retry limit is reached or another error happen we will wait and retry.
#
# Thread might be stuck here, but I think its better than losing anything
# its either a transient errors or something bad really happened.
sleep(TIME_BEFORE_RETRYING_SECONDS)
logger.error("Uploading failed, retrying", :exception => e, :path => file.path)
retry
end

options[:on_complete].call(file) unless options[:on_complete].nil?
end

def stop
@workers_pool.shutdown
@workers_pool.wait_for_termination(nil) # block until its done
end
end
end
end
end
Loading

0 comments on commit bcdb0df

Please sign in to comment.