Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Sorted Set (ZSET) in REDIS #83

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.5.2
- Support for REDIS Sorted Set(ZSET) [#56](https://github.com/logstash-plugins/logstash-input-redis/issues/56)

## 3.5.1
- [DOC] Reordered config option to alpha order [#79](https://github.com/logstash-plugins/logstash-input-redis/issues/79)

Expand Down
17 changes: 14 additions & 3 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-batch_count>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-command_map>> |<<hash,hash>>|No
| <<plugins-{type}s-{plugin}-data_type>> |<<string,string>>, one of `["list", "channel", "pattern_channel"]`|Yes
| <<plugins-{type}s-{plugin}-data_type>> |<<string,string>>, one of `["list", "channel", "pattern_channel", "sortedset"]`|Yes
| <<plugins-{type}s-{plugin}-db>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-host>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-path>> |<<string,string>>|No
Expand All @@ -54,6 +54,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-ssl>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-threads>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-timeout>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-reverse_order>> |<<boolean,boolean>>|No
|=======================================================================

Also see <<plugins-{type}s-{plugin}-common-options>> for a list of options supported by all
Expand Down Expand Up @@ -84,12 +85,13 @@ Redis allows for the renaming or disabling of commands in its protocol, see: ht
===== `data_type`

* This is a required setting.
* Value can be any of: `list`, `channel`, `pattern_channel`
* Value can be any of: `list`, `channel`, `pattern_channel`, `sortedset`
* There is no default value for this setting.

Specify either list or channel. If `data_type` is `list`, then we will BLPOP the
key. If `data_type` is `channel`, then we will SUBSCRIBE to the key.
If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
If `data_type` is `sortedset`, then we will ZRANGE/ZREVRANGE and ZREM/ZREMRANGEBYRANK the key.

[id="plugins-{type}s-{plugin}-db"]
===== `db`
Expand Down Expand Up @@ -123,7 +125,7 @@ The unix socket path of your Redis server.
* Value type is <<string,string>>
* There is no default value for this setting.

The name of a Redis list or channel.
The name of a Redis list, channel or sortedset.

[id="plugins-{type}s-{plugin}-password"]
===== `password`
Expand Down Expand Up @@ -166,6 +168,15 @@ Enable SSL support.

Initial connection timeout in seconds.

[id="plugins-{type}s-{plugin}-reverse_order"]
===== `reverse_order`

* Value type is <<boolean,boolean>>
* Default value if `false`

When the data_type is `sortedset`, read the sortedset in reverse order.


[id="plugins-{type}s-{plugin}-common-options"]
include::{include_path}/{type}.asciidoc[]

Expand Down
111 changes: 109 additions & 2 deletions lib/logstash/inputs/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
require "logstash/inputs/threadable"
require 'redis'

# This input will read events from a Redis instance; it supports both Redis channels and lists.
# This input will read events from a Redis instance; it supports both Redis channels, lists and sortedsets.
# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
# the channel commands used by Logstash are found in Redis v1.3.8+.
# The sortedset commands (ZREM, ZREMRANGEBYRANK, ZRANGE, ZREVRANGE) used by Logstash
# are supported in Redis v2.0.0+.
# While you may be able to make these Redis versions work, the best performance
# and stability will be found in more recent stable versions. Versions 2.6.0+
# are recommended.
Expand Down Expand Up @@ -48,10 +50,14 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
# The name of a Redis list or channel.
config :key, :validate => :string, :required => true

# Pop high scores item first in sortedset. No effect for other data types
config :reverse_order, :validate => :boolean, :default => false

# Specify either list or channel. If `data_type` is `list`, then we will BLPOP the
# key. If `data_type` is `channel`, then we will SUBSCRIBE to the key.
# If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => true
# If `data_type` is `sortedset`, then we will ZRANGE/ZREVRANGE to the key.
config :data_type, :validate => [ "list", "channel", "pattern_channel", "sortedset" ], :required => true

# The number of events to return from Redis using EVAL.
config :batch_count, :validate => :number, :default => 125
Expand Down Expand Up @@ -92,8 +98,12 @@ def register
elsif @data_type == 'pattern_channel'
@run_method = method(:pattern_channel_runner)
@stop_method = method(:subscribe_stop)
elsif @data_type == 'sortedset'
@run_method = method(:sortedset_runner)
@stop_method = method(:sortedset_stop)
end

@sortedset_method = batched? ? method(:sortedset_batch_listener) : method(:sortedset_single_listener)
@list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)

@identity = "#{@redis_url} #{@data_type}:#{@key}"
Expand Down Expand Up @@ -122,6 +132,11 @@ def is_list_type?
@data_type == 'list'
end

# private
def is_sortedset_type?
@data_type == 'sortedset'
end

# private
def redis_params
if @path.nil?
Expand Down Expand Up @@ -164,6 +179,7 @@ def connect
end

load_batch_script(redis) if batched? && is_list_type?
load_batch_script_sortedset(redis) if batched? && is_sortedset_type?
redis
end # def connect

Expand All @@ -179,6 +195,29 @@ def load_batch_script(redis)
@redis_script_sha = redis.script(:load, redis_script)
end

# private
def load_batch_script_sortedset(redis)
#A Redis Lua EVAL script to fetch a count of keys
if @reverse_order then
redis_script = <<EOF
local batchsize = tonumber(ARGV[1])
local zcard = tonumber(redis.call('zcard', KEYS[1]))
local result = redis.call('zrevrange', KEYS[1], 0, batchsize)
redis.call('zremrangebyrank', KEYS[1], zcard - batchsize - 1, zcard)
return result
EOF
else
redis_script = <<EOF
local batchsize = tonumber(ARGV[1])
local zcard = tonumber(redis.call('zcard', KEYS[1]))
local result = redis.call('zrange', KEYS[1], 0, batchsize)
redis.call('zremrangebyrank', KEYS[1], 0, batchsize)
return result
EOF
end
@redis_script_sha = redis.script(:load, redis_script)
end

# private
def queue_event(msg, output_queue, channel=nil)
begin
Expand Down Expand Up @@ -264,6 +303,74 @@ def list_single_listener(redis, output_queue)
queue_event(item.last, output_queue)
end

# private
def sortedset_stop
return if @redis.nil? || [email protected]?

@redis.quit rescue nil
@redis = nil
end

# private
def sortedset_runner(output_queue)
while !stop?
begin
@redis ||= connect
@sortedset_method.call(@redis, output_queue)
rescue ::Redis::BaseError => e
@logger.warn("Redis connection problem", :exception => e)
# Reset the redis variable to trigger reconnect
@redis = nil
# this sleep does not need to be stoppable as its
# in a while !stop? loop
sleep 1
end
end
end

def sortedset_batch_listener(redis, output_queue)
begin
results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1])
results.each do |item|
queue_event(item, output_queue)
end

if results.size.zero?
sleep BATCH_EMPTY_SLEEP
end
rescue ::Redis::CommandError => e
if e.to_s =~ /NOSCRIPT/ then
@logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e);
load_batch_script_sortedset(redis)
retry
else
raise e
end
end
end

def sortedset_single_listener(redis, output_queue)
redis.watch(@key) do
if @reverse_order then
item = redis.zrevrange(@key, 0, @batch_count, :timeout => 1)
else
item = redis.zrange(@key, 0, @batch_count, :timeout => 1)
end

if item.size.zero?
sleep BATCH_EMPTY_SLEEP
end

return unless item.size > 0

redis.multi do |multi|
redis.zrem(@key, item)
end

queue_event(item.first, output_queue)
end
end

# private
def subscribe_stop
return if @redis.nil? || [email protected]?
Expand Down
Loading