-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactoring, Dynamic prefix and AWS v2 #102
Conversation
great work guys. |
@tkampour Since this is a big change, in fact a complete rewrite, it need to go through the review process like any other PRs. Since you have commented on this issue you will get notified when it happen, since its this one of most requested feature I am also eager to get this merged in. |
Awesome work! Can't wait to try this! |
I too am very excited for this change, so excited I am attempting to create a local build of it to get working but I keep running into the following: Couldn't find any output plugin named 's3'. Are you sure this is correct? Trying to load the s3 output plugin resulted in this error: no such file to load -- concurrent/map", :level=>:error, :file=>"logstash/agent.rb", :line=>"448", :method=>"create_pipeline"} I built the gem and am using that with --no-verify to install. Logstash version=2.4, I am doing this inside a docker if that makes a difference. |
@vistorve how did you build this gem? are you following the steps here? https://github.com/logstash-plugins/logstash-output-s3#2-running-your-unpublished-plugin-in-logstash |
@suyograo I did the steps in 2.2 under the "or you can build the gem and install it using". That seemed like the easier approach since it was in a docker. |
@vistorve Nice finding, this is a bug in this PR. I am using concurrent-ruby's map implementation and its available in 1.0.0 >= Logstash 5.0/master ships with it but 2.4 ships with 0.9.2. The dependency is not strict enough in the gemspec this is why you were able to install it but failed to run. logstash-core depends on a strict version of it in https://github.com/elastic/logstash/blob/2.4/Gemfile.jruby-1.9.lock#L162 so if the plugin requires a newer version the core wont permit it. @suyograo If we want to make this change works in both 2.4 and 5.0 we have the following solutions:
The other features I used from concurrent-ruby ScheduledTask and the threadpool are available in 0.9.2. |
@ph I meant point 2:
|
/facepalm ☕ ☕ |
FWIW I generally prefer the java util concurrent stuff to the concurrent ruby objects. IMHO the main point of concurrent ruby is to be portable across ruby implementations' concurrency models, which is not a concern for Logstash. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely amazing work here @ph !
This is an initial pass at review. I have still not covered the test suite. I'll do another round once the changes here have been discussed and changes made.
|
||
# S3 bucket | ||
config :bucket, :validate => :string | ||
config :bucket, :validate => :string, :required => true | ||
|
||
# Set the size of file in bytes, this means that files on bucket when have dimension > file_size, they are stored in two or more file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs some grammatical fixes, I don't completely understand what these comments are trying to say.
I believe it is something like:
"Set the target size of uploaded files in bytes. This will result in multiple files being created once the data exceeds that size threshold"
The note regarding the local file I don't understand.
|
||
# Set the size of file in bytes, this means that files on bucket when have dimension > file_size, they are stored in two or more file. | ||
# If you have tags then it will generate a specific size file for every tags | ||
##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket. | ||
config :size_file, :validate => :number, :default => 0 | ||
config :size_file, :validate => :number, :default => 1024 * 1024 * 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be named target_file_size
since it is a goal but not a guarantee. Alternatively max_file_size
would make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I wanted to change theses names for quite a while, I will keep the other working and mark them as deprecated.
|
||
# Set the size of file in bytes, this means that files on bucket when have dimension > file_size, they are stored in two or more file. | ||
# If you have tags then it will generate a specific size file for every tags | ||
##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket. | ||
config :size_file, :validate => :number, :default => 0 | ||
config :size_file, :validate => :number, :default => 1024 * 1024 * 5 | ||
|
||
# Set the time, in MINUTES, to close the current sub_time_section of bucket. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also needs to be edited for clarity
|
||
# Set the time, in MINUTES, to close the current sub_time_section of bucket. | ||
# If you define file_size you have a number of files in consideration of the section and the current tag. | ||
# 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket, | ||
# for now the only thing this plugin can do is to put the file when logstash restart. | ||
config :time_file, :validate => :number, :default => 0 | ||
config :time_file, :validate => :number, :default => 15 * 60 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be renamed max_upload_delay
or something. I'm not sure if this setting controls the max time from file creation or the max time from the last write by the comments above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the max time from file creation
in this case.
@@ -102,7 +121,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base | |||
config :restore, :validate => :boolean, :default => false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed this will be changed to default to true.
# Ensure that all access or work done | ||
# on a factory is threadsafe | ||
class PrefixedValue | ||
def initialize(factory, stale_time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
factory
lacks context. Have you considered renaming this file_factory
or similar?
# | ||
# Since the UUID should be fairly unique I can destroy the whole path when an upload is complete. | ||
# I do not have to mess around to check if the other directory have file in it before destroying them. | ||
class TemporaryFileFactory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really a factory? It seems more like a proxy. I would expect a factory to make and return new objects. This abstracts over an underlying set of files. Consider renaming to RotatingTemporaryFile
or something else that's more descriptive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it feel more like a proxy to me, in this case.
@size_strategy.rotate?(file) || @time_strategy.rotate?(file) | ||
end | ||
|
||
def need_periodic? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/need_periodic/needs_periodic/
extend Forwardable | ||
DELEGATES_METHODS = [:path, :write, :close, :size, :fsync] | ||
|
||
def_delegators :@fd, *DELEGATES_METHODS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT we only use a constant here to test if these methods are defined in a later spec. I would just define them inline and remove the spec. I don't think there's a ton of value in specs that check that methods merely exist since at that point we're testing that Forwardable
isn't broken.
I don't think we need to do any testing here FWIW and we can simplify this.
end | ||
|
||
def generate_name | ||
filename = "ls.s3.#{Socket.gethostname}.#{current_time}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to document that there is a (remote) possibility of collisions between LS instances if this scheme is used. If users use multiple logstashes they should probably give them unique prefixes.
Alternatively we could inject some randomness into here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are you suggesting for randomness?
We have:
- the host
- a date
SecureRandom.uuid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! I've been waiting for something like that for a year at least!
I tried to run this branch on test dataset and there are couple things:
encoding => "gzip"
is not working- time_file says it is in minutes, but implemented in seconds
end | ||
|
||
def rotate?(file) | ||
file.size > 0 && Time.now - file.ctime >= time_file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs says time_file
is in MINUTES, but it's not like that here
There hasn't seemed to be any activity on this pull request in a while so I made a fork and modified the code to address most of the issues raised with this review. I haven't tested this code in production yet but all the tests pass. |
@nevins Thanks can you make a PR with your changes? I will help moving it forward :) |
I seem to run into a weird jruby thread/autoload issue when running the tests. To be clear the bug was there before but the new logger infra expose it. |
# if the file is close we will use the File::size | ||
begin | ||
@fd.size | ||
rescue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a better way we can do this rescue? Is there a specific error we can catch instead?
Left a few more comments, it's almost good to go! |
@andrewvc I have addressed your latest comment. |
rekicking the travis job |
@ph approved! Great work :) |
**Motivation** One of the most requested features was adding a way to add dynamic prefixes using the fieldref syntax for the files on the bucket and also the changes in the pipeline to support shared delegator. The S3 output by nature was always a single threaded writes but had multiples workers to process the upload, the code was threadsafe when used in the concurrency `:single` mode. This PR addresses a few problems and provide shorter and more structured code: - This Plugin now uses the V2 version of the SDK, this make sure we receive the latest updates and changes. - We now uses S3's `upload_file` instead of reading chunks, this method is more efficient and will uses the multipart with threads if the files is too big. - You can now use the `fieldref` syntax in the prefix to dynamically changes the target with the events it receives. - The Upload queue is now a bounded list, this options is necessary to allow back pressure to be communicated back to the pipeline but its configurable by the user. - If the queue is full the plugin will start the upload in the current thread. - The plugin now threadsafe and support the concurrency model `shared` - The rotation strategy can be selected, the recommended is `size_and_time` that will check for both the configured limits (`size` and `time` are also available) - The `restore` option will now use a separate threadpool with an unbounded queue - The `restore` option will not block the launch of logstash and will uses less resources than the real time path - The plugin now uses `multi_receive_encode`, this will optimize the writes to the files - rotate operation are now batched to reduce the number of IO calls. - Empty file will not be uploaded by any rotation rotation strategy - We now use Concurrent-Ruby for the implementation of the java executor - If you have finer grain permission on prefixes or want faster boot, you can disable the credentials check with `validate_credentials_on_root_bucket` - The credentials check will no longer fails if we can't delete the file - We now have a full suite of integration test for all the defined rotation Fixes: logstash-plugins#4 logstash-plugins#81 logstash-plugins#44 logstash-plugins#59 logstash-plugins#50
… of unused files/directories
176ad7c
to
458de28
Compare
… of unused files/directories Fixes #102
Motivation
One of the most requested features was adding a way to add dynamic prefixes using the fieldref
syntax for the files on the bucket and also the changes in the pipeline to support shared delegator.
The S3 output by nature was always a single threaded writes but had multiples workers to process the upload, the code was threadsafe when used in the concurrency
:single
mode.This PR addresses a few problems and provide shorter and more structured code:
upload_file
instead of reading chunks, this method is more efficient and will uses the multipart with threads if the files is too big.fieldref
syntax in the prefix to dynamically changes the target with the events it receives.shared
size_and_time
that will check for both the configured limits (size
andtime
are also available)restore
option will now use a separate threadpool with an unbounded queuerestore
option will not block the launch of logstash and will uses less resources than the real time pathmulti_receive_encode
, this will optimize the writes to the filesvalidate_credentials_on_root_bucket
Fixes: #4 #81 #44 #59 #50