diff --git a/CHANGELOG.md b/CHANGELOG.md index 65fc096..03d7391 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 3.5.2 + - Support for REDIS Sorted Set(ZSET) [#56](https://github.com/logstash-plugins/logstash-input-redis/issues/56) + ## 3.5.1 - [DOC] Reordered config option to alpha order [#79](https://github.com/logstash-plugins/logstash-input-redis/issues/79) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index cc8340e..64c051c 100755 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -44,7 +44,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No -| <> |<>, one of `["list", "channel", "pattern_channel"]`|Yes +| <> |<>, one of `["list", "channel", "pattern_channel", "sortedset"]`|Yes | <> |<>|No | <> |<>|No | <> |<>|No @@ -54,6 +54,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No |======================================================================= Also see <> for a list of options supported by all @@ -84,12 +85,13 @@ Redis allows for the renaming or disabling of commands in its protocol, see: ht ===== `data_type` * This is a required setting. - * Value can be any of: `list`, `channel`, `pattern_channel` + * Value can be any of: `list`, `channel`, `pattern_channel`, `sortedset` * There is no default value for this setting. Specify either list or channel. If `data_type` is `list`, then we will BLPOP the key. If `data_type` is `channel`, then we will SUBSCRIBE to the key. If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. +If `data_type` is `sortedset`, then we will ZRANGE/ZREVRANGE and ZREM/ZREMRANGEBYRANK the key. [id="plugins-{type}s-{plugin}-db"] ===== `db` @@ -123,7 +125,7 @@ The unix socket path of your Redis server. * Value type is <> * There is no default value for this setting. -The name of a Redis list or channel. +The name of a Redis list, channel or sortedset. [id="plugins-{type}s-{plugin}-password"] ===== `password` @@ -166,6 +168,15 @@ Enable SSL support. Initial connection timeout in seconds. +[id="plugins-{type}s-{plugin}-reverse_order"] +===== `reverse_order` + + * Value type is <> + * Default value if `false` + +When the data_type is `sortedset`, read the sortedset in reverse order. + + [id="plugins-{type}s-{plugin}-common-options"] include::{include_path}/{type}.asciidoc[] diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index 9722d8b..8282bdc 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -4,9 +4,11 @@ require "logstash/inputs/threadable" require 'redis' -# This input will read events from a Redis instance; it supports both Redis channels and lists. +# This input will read events from a Redis instance; it supports both Redis channels, lists and sortedsets. # The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and # the channel commands used by Logstash are found in Redis v1.3.8+. +# The sortedset commands (ZREM, ZREMRANGEBYRANK, ZRANGE, ZREVRANGE) used by Logstash +# are supported in Redis v2.0.0+. # While you may be able to make these Redis versions work, the best performance # and stability will be found in more recent stable versions. Versions 2.6.0+ # are recommended. @@ -48,10 +50,14 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable # The name of a Redis list or channel. config :key, :validate => :string, :required => true + # Pop high scores item first in sortedset. No effect for other data types + config :reverse_order, :validate => :boolean, :default => false + # Specify either list or channel. If `data_type` is `list`, then we will BLPOP the # key. If `data_type` is `channel`, then we will SUBSCRIBE to the key. # If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. - config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => true + # If `data_type` is `sortedset`, then we will ZRANGE/ZREVRANGE to the key. + config :data_type, :validate => [ "list", "channel", "pattern_channel", "sortedset" ], :required => true # The number of events to return from Redis using EVAL. config :batch_count, :validate => :number, :default => 125 @@ -92,8 +98,12 @@ def register elsif @data_type == 'pattern_channel' @run_method = method(:pattern_channel_runner) @stop_method = method(:subscribe_stop) + elsif @data_type == 'sortedset' + @run_method = method(:sortedset_runner) + @stop_method = method(:sortedset_stop) end + @sortedset_method = batched? ? method(:sortedset_batch_listener) : method(:sortedset_single_listener) @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener) @identity = "#{@redis_url} #{@data_type}:#{@key}" @@ -122,6 +132,11 @@ def is_list_type? @data_type == 'list' end + # private + def is_sortedset_type? + @data_type == 'sortedset' + end + # private def redis_params if @path.nil? @@ -164,6 +179,7 @@ def connect end load_batch_script(redis) if batched? && is_list_type? + load_batch_script_sortedset(redis) if batched? && is_sortedset_type? redis end # def connect @@ -179,6 +195,29 @@ def load_batch_script(redis) @redis_script_sha = redis.script(:load, redis_script) end + # private + def load_batch_script_sortedset(redis) + #A Redis Lua EVAL script to fetch a count of keys + if @reverse_order then + redis_script = < e + @logger.warn("Redis connection problem", :exception => e) + # Reset the redis variable to trigger reconnect + @redis = nil + # this sleep does not need to be stoppable as its + # in a while !stop? loop + sleep 1 + end + end + end + + def sortedset_batch_listener(redis, output_queue) + begin + results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1]) + results.each do |item| + queue_event(item, output_queue) + end + + if results.size.zero? + sleep BATCH_EMPTY_SLEEP + end + rescue ::Redis::CommandError => e + if e.to_s =~ /NOSCRIPT/ then + @logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e); + load_batch_script_sortedset(redis) + retry + else + raise e + end + end + end + + def sortedset_single_listener(redis, output_queue) + redis.watch(@key) do + if @reverse_order then + item = redis.zrevrange(@key, 0, @batch_count, :timeout => 1) + else + item = redis.zrange(@key, 0, @batch_count, :timeout => 1) + end + + if item.size.zero? + sleep BATCH_EMPTY_SLEEP + end + + return unless item.size > 0 + + redis.multi do |multi| + redis.zrem(@key, item) + end + + queue_event(item.first, output_queue) + end + end + # private def subscribe_stop return if @redis.nil? || !@redis.connected? diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 6675a8e..50849c3 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -24,6 +24,47 @@ def process(conf, event_count) expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a) end +def sortedset_populate(key, event_count) + require "logstash/event" + redis = Redis.new(:host => "localhost") + + event_count_1 = event_count / 2 + event_count_2 = event_count - event_count_1 + + # Add half events in default order + event_count_1.times do |value| + event = LogStash::Event.new("sequence" => value) + Stud.try(10.times) do + redis.zadd(key, value, event.to_json) + end + end + + # Add half events in reverse order + event_count_2.times do |value| + value = event_count - value - 1 + event = LogStash::Event.new("sequence" => value) + Stud.try(10.times) do + redis.zadd(key, value, event.to_json) + end + end +end + +def sortedset_process(conf, event_count) + events = input(conf) do |pipeline, queue| + event_count.times.map{queue.pop} + end + + expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a) +end + +def sortedsetrev_process(conf, event_count) + events = input(conf) do |pipeline, queue| + event_count.times.map{queue.pop} + end + + expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a.reverse) +end + # integration tests --------------------- describe "inputs/redis", :redis => true do @@ -63,6 +104,83 @@ def process(conf, event_count) populate(key, event_count) process(conf, event_count) end + + it "should read events from a sortedset in default order" do + key = SecureRandom.hex + event_count = 1000 + rand(50) + # event_count = 100 + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key}" + data_type => "sortedset" + batch_count => 1 + } + } + CONFIG + + sortedset_populate(key, event_count) + sortedset_process(conf, event_count) + end + + it "should read events from a sortedset in reverse order" do + key = SecureRandom.hex + event_count = 1000 + rand(50) + # event_count = 100 + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key}" + data_type => "sortedset" + batch_count => 1 + reverse_order => true + } + } + CONFIG + + sortedset_populate(key, event_count) + sortedsetrev_process(conf, event_count) + end + + it "should read events from a sortedset in default order using batch_count" do + key = SecureRandom.hex + event_count = 1000 + rand(50) + # event_count = 100 + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key}" + data_type => "sortedset" + } + } + CONFIG + + sortedset_populate(key, event_count) + sortedset_process(conf, event_count) + end + + it "should read events from a sortedset in reverse order using batch_count" do + key = SecureRandom.hex + event_count = 1000 + rand(50) + # event_count = 100 + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key}" + data_type => "sortedset" + reverse_order => true + } + } + CONFIG + + sortedset_populate(key, event_count) + sortedsetrev_process(conf, event_count) + end + end # unit tests --------------------- @@ -264,6 +382,98 @@ def process(conf, event_count) end end + context 'runtime for sortedset data_type' do + let(:data_type) { 'sortedset' } + before do + subject.register + end + + context 'close when redis is unset' do + let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] } + + it 'does not attempt to quit' do + allow(redis).to receive(:nil?).and_return(true) + quit_calls.each do |call| + expect(redis).not_to receive(call) + end + expect {subject.do_stop}.not_to raise_error + end + end + + context "when the batch size is greater than 1" do + let(:batch_count) { 10 } + let(:rates) { [] } + + before do + allow(redis).to receive(:connected?).and_return(connected.last) + allow(redis).to receive(:script) + allow(redis).to receive(:quit) + end + + it 'calling the run method, adds events to the queue' do + expect(redis).to receive(:evalsha).at_least(:once).and_return(['a', 'b']) + + tt = Thread.new do + sleep 0.01 + subject.do_stop + end + + subject.run(accumulator) + + tt.join + expect(accumulator.size).to be > 0 + end + end + + context "when there is no data" do + let(:batch_count) { 10 } + let(:rates) { [] } + + it 'will throttle the loop' do + allow(redis).to receive(:evalsha) do + rates.unshift Time.now.to_f + [] + end + allow(redis).to receive(:connected?).and_return(connected.last) + allow(redis).to receive(:script) + allow(redis).to receive(:quit) + + tt = Thread.new do + sleep 1 + subject.do_stop + end + + subject.run(accumulator) + + tt.join + + inters = [] + rates.each_cons(2) do |x, y| + inters << x - y + end + + expect(accumulator.size).to eq(0) + inters.each do |delta| + expect(delta).to be_within(0.03).of(LogStash::Inputs::Redis::BATCH_EMPTY_SLEEP) + end + end + end + + it 'multiple close calls, calls to redis once' do + subject.use_redis(redis) + allow(redis).to receive(:blpop).and_return(['foo', 'l1']) + expect(redis).to receive(:connected?).and_return(connected.last) + quit_calls.each do |call| + expect(redis).to receive(call).at_most(:once) + end + + subject.do_stop + connected.push(false) #can't use let block here so push to array + expect {subject.do_stop}.not_to raise_error + subject.do_stop + end + end + context 'for the subscribe data_types' do def run_it_thread(inst) Thread.new(inst) do |subj| @@ -396,7 +606,7 @@ def close_thread(inst, rt) describe LogStash::Inputs::Redis do context "when using data type" do - ["list", "channel", "pattern_channel"].each do |data_type| + ["list", "channel", "pattern_channel", "sortedset"].each do |data_type| context data_type do it_behaves_like "an interruptible input plugin" do let(:config) { {'key' => 'foo', 'data_type' => data_type } }