Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert buffered output #11

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ message: `metrics { "f1":"100", "f2":"200", "f3":"300" }`
- Setting for rewriting the tag.
- For more information: https://github.com/y-ken/fluent-mixin-rewrite-tag-name

##### buffer

- Inherits from buffer parameters.
- Please refer to the article: http://docs.fluentd.org/v0.12/articles/buffer-plugin-overview

## Contributing

1. Fork it ( http://github.com/studio3104/fluent-plugin-graphite/fork )
Expand Down
17 changes: 12 additions & 5 deletions lib/fluent/plugin/out_graphite.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require 'fluent/mixin/rewrite_tag_name'

class Fluent::GraphiteOutput < Fluent::Output
class Fluent::GraphiteOutput < Fluent::BufferedOutput
Fluent::Plugin.register_output('graphite', self)

include Fluent::HandleTagNameMixin
Expand Down Expand Up @@ -49,19 +49,26 @@ def configure(conf)
end
# How many times to retry the call if timeout raised
@max_retries ||= 3

if @flush_interval < 10
log.info("flush_interval less than 10s is not allowed and overwritten to 10s")
@flush_interval = 10
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 seconds is just a default value for the highest-resolution retention period, and it's configurable, right?
Don't you have any other solution to solve the problem...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 seconds is just a default value for the highest-resolution retention period, and it's configurable, right?

Yes. Users can set arbitrary value greater than 10 seconds with flush_interval parameter in configuration.

Don't you have any other solution to solve the problem...?

An another approach is using this plugin fluent-plugin-bufferize together.
This approach is also acceptable for me.
But it remains some pitfall.

Copy link
Owner

@studio3104 studio3104 Jun 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 seconds is just a default value for the highest-resolution retention period, and it's configurable, right?

Yes. Users can set arbitrary value greater than 10 seconds with flush_interval parameter in configuration.

Ah, no, I meant that, "isn't it a problem to forcibly set it to 10 seconds even if the setting on the graphite side is not 10 seconds?"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, no, I meant that, "isn't it a problem to forcibly set it to 10 seconds even if the setting on the graphite side is not 10 seconds?"

Strictly speaking, there is still problem to time resolution.
But current implementation contains more problem.
With current implementation, Fluentd events cannot wait even if 1 second.
Non-Buffered plugin pass through events with no wait time.

end
end

def emit(tag, es, chain)
es.each do |time, record|
def format(tag, time, record)
[tag, time, record].to_msgpack
end

def write(chunk)
chunk.msgpack_each do |tag, time, record|
emit_tag = tag.dup
filter_record(emit_tag, time, record)
next unless metrics = format_metrics(emit_tag, record)

# implemented to immediate call post method in this loop, because graphite-api.gem has the buffers.
post(metrics, time)
end

chain.next
end

def format_metrics(tag, record)
Expand Down
13 changes: 13 additions & 0 deletions test/plugin/test_out_graphite.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ class GraphiteOutputTest < Test::Unit::TestCase
name_keys dstat.total cpu usage.usr,dstat.total cpu usage.sys,dstat.total cpu usage.idl
name_key_pattern ^((?!hostname).)*$
]
CONFIG_NAME_KEY_PATTERN_FLUSH_INTERVAL_LESS_THAN_10_SECONDS = %[
host localhost
port #{TCP_PORT}
name_key_pattern ^((?!hostname).)*$
flush_interval 5s
]

def setup
Fluent::Test.setup
Expand All @@ -57,6 +63,7 @@ def test_configure
assert_equal d.instance.tag_for, 'prefix'
assert_equal d.instance.name_keys, nil
assert_equal d.instance.name_key_pattern, /^((?!hostname).)*$/
assert_equal d.instance.flush_interval, 60

d = create_driver(CONFIG_NAME_KEYS)
assert_equal d.instance.host, 'localhost'
Expand All @@ -83,6 +90,12 @@ def test_configure
assert_raise(Fluent::ConfigError) { d = create_driver(CONFIG_SPECIFY_BOTH_NAME_KEYS_AND_NAME_KEY_PATTERN) }
end

def test_configure_flush_interval_less_than_10s
d = create_driver(CONFIG_NAME_KEY_PATTERN_FLUSH_INTERVAL_LESS_THAN_10_SECONDS)
# should be overwritten with 10s which is minimum graphite metrics resolution
assert_equal d.instance.flush_interval, 10
end

def test_format_metrics_strings
record = {
'hostname' => 'localhost.localdomain',
Expand Down