diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 9d0d284..2433734 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -42,10 +42,11 @@ This plugin supports the following configuration options plus the <> |<>|No -| <> |<>, one of `["list", "channel", "pattern_channel"]`|Yes +| <> |<>, one of `["list", "channel", "pattern_channel", "sortedset"]`|Yes | <> |<>|No | <> |<>|No | <> |<>|Yes +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -58,7 +59,7 @@ input plugins.   [id="plugins-{type}s-{plugin}-batch_count"] -===== `batch_count` +===== `batch_count` * Value type is <> * Default value is `125` @@ -66,18 +67,21 @@ input plugins. The number of events to return from Redis using EVAL. [id="plugins-{type}s-{plugin}-data_type"] -===== `data_type` +===== `data_type` * This is a required setting. - * Value can be any of: `list`, `channel`, `pattern_channel` + * Value can be any of: `list`, `channel`, `pattern_channel`, `sortedset` * There is no default value for this setting. Specify either list or channel. If `redis\_type` is `list`, then we will BLPOP the -key. If `redis\_type` is `channel`, then we will SUBSCRIBE to the key. +key. +Specify either list or channel. If `redis\_type` is `sortedset`, then we will +ZRANGE/ZREVRANGE and ZREM/ZREMRANGEBYRANK the key. +If `redis\_type` is `channel`, then we will SUBSCRIBE to the key. If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. [id="plugins-{type}s-{plugin}-db"] -===== `db` +===== `db` * Value type is <> * Default value is `0` @@ -85,7 +89,7 @@ If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. The Redis database number. [id="plugins-{type}s-{plugin}-host"] -===== `host` +===== `host` * Value type is <> * Default value is `"127.0.0.1"` @@ -93,16 +97,24 @@ The Redis database number. The hostname of your Redis server. [id="plugins-{type}s-{plugin}-key"] -===== `key` +===== `key` * This is a required setting. * Value type is <> * There is no default value for this setting. -The name of a Redis list or channel. +The name of a Redis list, sortedset or channel. + +[id="plugins-{type}s-{plugin}-priority_reverse"] +===== `priority_reverse` + + * Value type is <> + * Default value if `false` + +When the data_type is `sortedset`, read the sortedset in reverse order. [id="plugins-{type}s-{plugin}-password"] -===== `password` +===== `password` * Value type is <> * There is no default value for this setting. @@ -110,7 +122,7 @@ The name of a Redis list or channel. Password to authenticate with. There is no authentication by default. [id="plugins-{type}s-{plugin}-port"] -===== `port` +===== `port` * Value type is <> * Default value is `6379` @@ -118,7 +130,7 @@ Password to authenticate with. There is no authentication by default. The port to connect on. [id="plugins-{type}s-{plugin}-threads"] -===== `threads` +===== `threads` * Value type is <> * Default value is `1` @@ -126,7 +138,7 @@ The port to connect on. [id="plugins-{type}s-{plugin}-timeout"] -===== `timeout` +===== `timeout` * Value type is <> * Default value is `5` diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index 203f854..bdde9dc 100644 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -4,13 +4,17 @@ require "logstash/inputs/threadable" require 'redis' -# This input will read events from a Redis instance; it supports both Redis channels and lists. +# This input will read events from a Redis instance; it supports both Redis channels, lists and sortedsets. +# # The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and # the channel commands used by Logstash are found in Redis v1.3.8+. # While you may be able to make these Redis versions work, the best performance # and stability will be found in more recent stable versions. Versions 2.6.0+ # are recommended. # +# The sortedset commands (ZREM, ZREMRANGEBYRANK, ZRANGE, ZREVRANGE) used by Logstash are supported +# in Redis v2.0.0+ +# # For more information about Redis, see # # `batch_count` note: If you use the `batch_count` setting, you *must* use a Redis version 2.6.0 or @@ -41,10 +45,14 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable # The name of a Redis list or channel. config :key, :validate => :string, :required => true - # Specify either list or channel. If `redis\_type` is `list`, then we will BLPOP the - # key. If `redis\_type` is `channel`, then we will SUBSCRIBE to the key. - # If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. - config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => true + # Pop high scores item first in sortedset. No effect for other data types + config :priority_reverse, :validate => :boolean, :default => false + + # Specify either list or channel. If `redis\_type` is `list`, then we will BLPOP + # the key. If `redis\_type` is `channel`, then we will SUBSCRIBE to the key. + # If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. + # If `redis\_type` is `sortedset`, then we will ZRANGE/ZREVRANGE + config :data_type, :validate => [ "list", "channel", "pattern_channel", "sortedset" ], :required => true # The number of events to return from Redis using EVAL. config :batch_count, :validate => :number, :default => 125 @@ -76,6 +84,9 @@ def register if @data_type == 'list' || @data_type == 'dummy' @run_method = method(:list_runner) @stop_method = method(:list_stop) + elsif @data_type == 'sortedset' + @run_method = method(:sortedset_runner) + @stop_method = method(:sortedset_stop) elsif @data_type == 'channel' @run_method = method(:channel_runner) @stop_method = method(:subscribe_stop) @@ -84,7 +95,9 @@ def register @stop_method = method(:subscribe_stop) end + #TODO voir à terme comment fusionner ces deux méthodes @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener) + @sortedset_method = batched? ? method(:sortedset_batch_listener) : method(:sortedset_single_listener) @identity = "#{@redis_url} #{@data_type}:#{@key}" @logger.info("Registering Redis", :identity => @identity) @@ -112,6 +125,11 @@ def is_list_type? @data_type == 'list' end + # private + def is_sortedset_type? + @data_type == 'sortedset' + end + # private def redis_params { @@ -131,12 +149,13 @@ def internal_redis_builder # private def connect redis = new_redis_instance - load_batch_script(redis) if batched? && is_list_type? + list_load_batch_script(redis) if batched? && is_list_type? + sortedset_load_batch_script(redis) if batched? && is_sortedset_type? redis end # def connect # private - def load_batch_script(redis) + def list_load_batch_script(redis) #A Redis Lua EVAL script to fetch a count of keys redis_script = < e if e.to_s =~ /NOSCRIPT/ then @logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e); - load_batch_script(redis) + list_load_batch_script(redis) retry else raise e @@ -231,6 +275,75 @@ def list_single_listener(redis, output_queue) queue_event(item.last, output_queue) end + # private + def sortedset_stop + return if @redis.nil? || !@redis.connected? + + @redis.quit rescue nil + @redis = nil + end + + # private + def sortedset_runner(output_queue) + while !stop? + begin + @redis ||= connect + @sortedset_method.call(@redis, output_queue) + rescue ::Redis::BaseError => e + @logger.warn("Redis connection problem", :exception => e) + # Reset the redis variable to trigger reconnect + @redis = nil + # this sleep does not need to be stoppable as its + # in a while !stop? loop + sleep 1 + end + end + end + + def sortedset_batch_listener(redis, output_queue) + begin + results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1]) + results.each do |item| + queue_event(item, output_queue) + end + + if results.size.zero? + sleep BATCH_EMPTY_SLEEP + end + rescue ::Redis::CommandError => e + if e.to_s =~ /NOSCRIPT/ then + @logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e); + sortedset_load_batch_script(redis) + retry + else + raise e + end + end + end + + def sortedset_single_listener(redis, output_queue) + redis.watch(@key) do + if @priority_reverse then + item = redis.zrevrange(@key, 0, 0, :timeout => 1) + else + item = redis.zrange(@key, 0, 0, :timeout => 1) + end + + if item.size.zero? + sleep BATCH_EMPTY_SLEEP + end + + return unless item.size > 0 + + redis.multi do |multi| + redis.zrem(@key, item) + end + + queue_event(item.first, output_queue) + end + end + + # private def subscribe_stop return if @redis.nil? || !@redis.connected? diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index d4b6812..add7525 100644 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -4,7 +4,7 @@ require 'logstash/inputs/redis' require 'securerandom' -def populate(key, event_count) +def list_populate(key, event_count) require "logstash/event" redis = Redis.new(:host => "localhost") event_count.times do |value| @@ -15,7 +15,42 @@ def populate(key, event_count) end end -def process(conf, event_count) + +def sortedset_populate(key, event_count) + require "logstash/event" + redis = Redis.new(:host => "localhost") + + + event_count_1 = event_count / 2 + event_count_2 = event_count - event_count_1 + + # Add half events in default order + event_count_1.times do |value| + event = LogStash::Event.new("sequence" => value) + Stud.try(10.times) do + redis.zadd(key, value, event.to_json) + end + end + + # Add half events in reverse order + event_count_2.times do |value| + value = event_count - value - 1 + event = LogStash::Event.new("sequence" => value) + Stud.try(10.times) do + redis.zadd(key, value, event.to_json) + end + end +end + +def list_process(conf, event_count) + events = input(conf) do |pipeline, queue| + event_count.times.map{queue.pop} + end + + expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a) +end + +def sortedset_process(conf, event_count) events = input(conf) do |pipeline, queue| event_count.times.map{queue.pop} end @@ -23,6 +58,14 @@ def process(conf, event_count) expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a) end +def sortedsetrev_process(conf, event_count) + events = input(conf) do |pipeline, queue| + event_count.times.map{queue.pop} + end + + expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a.reverse) +end + # integration tests --------------------- describe "inputs/redis", :redis => true do @@ -42,8 +85,47 @@ def process(conf, event_count) } CONFIG - populate(key, event_count) - process(conf, event_count) + list_populate(key, event_count) + list_process(conf, event_count) + end + + it "should read events from a sortedset in default order" do + key = SecureRandom.hex + event_count = 1000 + rand(50) + # event_count = 100 + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key}" + data_type => "sortedset" + batch_count => 1 + } + } + CONFIG + + sortedset_populate(key, event_count) + sortedset_process(conf, event_count) + end + + it "should read events from a sortedset in reverse order" do + key = SecureRandom.hex + event_count = 1000 + rand(50) + # event_count = 100 + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key}" + data_type => "sortedset" + batch_count => 1 + priority_reverse => true + } + } + CONFIG + + sortedset_populate(key, event_count) + sortedsetrev_process(conf, event_count) end it "should read events from a list using batch_count (default 125)" do @@ -59,8 +141,45 @@ def process(conf, event_count) } CONFIG - populate(key, event_count) - process(conf, event_count) + list_populate(key, event_count) + list_process(conf, event_count) + end + + it "should read events from a sortedset in default order using batch_count" do + key = SecureRandom.hex + event_count = 1000 + rand(50) + # event_count = 100 + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key}" + data_type => "sortedset" + } + } + CONFIG + + sortedset_populate(key, event_count) + sortedset_process(conf, event_count) + end + + it "should read events from a sortedset in reverse order using batch_count" do + key = SecureRandom.hex + event_count = 1000 + rand(50) + # event_count = 100 + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key}" + data_type => "sortedset" + priority_reverse => true + } + } + CONFIG + + sortedset_populate(key, event_count) + sortedsetrev_process(conf, event_count) end end @@ -305,7 +424,7 @@ def close_thread(inst, rt) describe LogStash::Inputs::Redis do context "when using data type" do - ["list", "channel", "pattern_channel"].each do |data_type| + ["list", "channel", "sortedset", "pattern_channel"].each do |data_type| context data_type do it_behaves_like "an interruptible input plugin" do let(:config) { {'key' => 'foo', 'data_type' => data_type } }