From 085dc12307641c5cbfc232e659562cf8f337a908 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Mon, 20 Apr 2015 15:08:21 +0200 Subject: [PATCH] use LogStash::Json for json serialization, some cleanups first spec added spec dependencies plugins and bumped version to 0.1.4 fixed unused allow_time_override closes #20 --- lib/logstash/outputs/influxdb.rb | 31 ++++++++++++----------- logstash-output-influxdb.gemspec | 2 ++ spec/outputs/influxdb_spec.rb | 42 ++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 14 deletions(-) diff --git a/lib/logstash/outputs/influxdb.rb b/lib/logstash/outputs/influxdb.rb index 8b055f7..8fdb87d 100644 --- a/lib/logstash/outputs/influxdb.rb +++ b/lib/logstash/outputs/influxdb.rb @@ -1,8 +1,8 @@ # encoding: utf-8 require "logstash/namespace" require "logstash/outputs/base" +require "logstash/json" require "stud/buffer" -require "json" # This output lets you output Metrics to InfluxDB # @@ -94,14 +94,14 @@ def register @query_params = "u=#{@user}&p=#{@password.value}&time_precision=#{@time_precision}" @base_url = "http://#{@host}:#{@port}/db/#{@db}/series" @url = "#{@base_url}?#{@query_params}" - + buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) end # def register - + public def receive(event) return unless output?(event) @@ -126,21 +126,26 @@ def receive(event) # ] event_hash = {} event_hash['name'] = event.sprintf(@series) + sprintf_points = Hash[@data_points.map {|k,v| [event.sprintf(k), event.sprintf(v)]}] if sprintf_points.has_key?('time') - @logger.error("Cannot override value of time without 'allow_override_time'. Using event timestamp") unless @allow_override_time + unless @allow_time_override + logger.error("Cannot override value of time without 'allow_time_override'. Using event timestamp") + sprintf_points['time'] = event.timestamp.to_i + end else sprintf_points['time'] = event.timestamp.to_i end + @coerce_values.each do |column, value_type| if sprintf_points.has_key?(column) begin case value_type when "integer" - @logger.debug("Converting column #{column} to type #{value_type}: Current value: #{sprintf_points[column]}") + @logger.debug? and @logger.debug("Converting column #{column} to type #{value_type}: Current value: #{sprintf_points[column]}") sprintf_points[column] = sprintf_points[column].to_i when "float" - @logger.debug("Converting column #{column} to type #{value_type}: Current value: #{sprintf_points[column]}") + @logger.debug? and @logger.debug("Converting column #{column} to type #{value_type}: Current value: #{sprintf_points[column]}") sprintf_points[column] = sprintf_points[column].to_f else @logger.error("Don't know how to convert to #{value_type}") @@ -150,17 +155,15 @@ def receive(event) end end end + event_hash['columns'] = sprintf_points.keys event_hash['points'] = [] event_hash['points'] << sprintf_points.values + buffer_receive(event_hash) end # def receive -# def flush; return; end - def flush(events, teardown=false) - # Avoid creating a new string for newline every time - newline = "\n".freeze - + def flush(events, teardown = false) # seen_series stores a list of series and associated columns # we've seen for each event # so that we can attempt to batch up points for a given series. @@ -182,13 +185,13 @@ def flush(events, teardown=false) seen_series[ev['name']] = ev['columns'] event_collection << ev end - rescue - @logger.info("Error adding event to collection", :exception => e) + rescue => e + @logger.warn("Error adding event to collection", :exception => e) next end end - post(event_collection.to_json) + post(LogStash::Json.dump(event_collection)) end # def receive_bulk def post(body) diff --git a/logstash-output-influxdb.gemspec b/logstash-output-influxdb.gemspec index ba340e0..7b27ab8 100644 --- a/logstash-output-influxdb.gemspec +++ b/logstash-output-influxdb.gemspec @@ -26,5 +26,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'ftw', ['~> 0.0.40'] s.add_development_dependency 'logstash-devutils' + s.add_development_dependency 'logstash-input-generator' + s.add_development_dependency 'logstash-filter-kv' end diff --git a/spec/outputs/influxdb_spec.rb b/spec/outputs/influxdb_spec.rb index 6bf5945..7e16f4c 100644 --- a/spec/outputs/influxdb_spec.rb +++ b/spec/outputs/influxdb_spec.rb @@ -1 +1,43 @@ require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/influxdb" + +describe LogStash::Outputs::InfluxDB do + + let(:pipeline) { LogStash::Pipeline.new(config) } + + context "complete pipeline run with 2 events" do + + let(:config) do <<-CONFIG + input { + generator { + message => "foo=1 bar=2 time=3" + count => 2 + type => "generator" + } + } + + filter { + kv { } + } + + output { + influxdb { + host => "localhost" + user => "someuser" + password => "somepwd" + allow_time_override => true + data_points => {"foo" => "%{foo}" "bar" => "%{bar}" "time" => "%{time}"} + } + } + CONFIG + end + + let(:json_result) { "[{\"name\":\"logstash\",\"columns\":[\"foo\",\"bar\",\"time\"],\"points\":[[\"1\",\"2\",\"3\"],[\"1\",\"2\",\"3\"]]}]" } + + it "should receive 2 events, flush and call post with 2 items json array" do + expect_any_instance_of(LogStash::Outputs::InfluxDB).to receive(:post).with(json_result) + pipeline.run + end + + end +end