Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for REDIS ZSET #56

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ This plugin supports the following configuration options plus the <<plugins-{typ
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-batch_count>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-data_type>> |<<string,string>>, one of `["list", "channel", "pattern_channel"]`|Yes
| <<plugins-{type}s-{plugin}-data_type>> |<<string,string>>, one of `["list", "channel", "pattern_channel", "sortedset"]`|Yes
| <<plugins-{type}s-{plugin}-db>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-host>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-key>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-priority_reverse>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-port>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-threads>> |<<number,number>>|No
Expand All @@ -58,75 +59,86 @@ input plugins.
&nbsp;

[id="plugins-{type}s-{plugin}-batch_count"]
===== `batch_count`
===== `batch_count`

* Value type is <<number,number>>
* Default value is `125`

The number of events to return from Redis using EVAL.

[id="plugins-{type}s-{plugin}-data_type"]
===== `data_type`
===== `data_type`

* This is a required setting.
* Value can be any of: `list`, `channel`, `pattern_channel`
* Value can be any of: `list`, `channel`, `pattern_channel`, `sortedset`
* There is no default value for this setting.

Specify either list or channel. If `redis\_type` is `list`, then we will BLPOP the
key. If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.
key.
Specify either list or channel. If `redis\_type` is `sortedset`, then we will
ZRANGE/ZREVRANGE and ZREM/ZREMRANGEBYRANK the key.
If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.
If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.

[id="plugins-{type}s-{plugin}-db"]
===== `db`
===== `db`

* Value type is <<number,number>>
* Default value is `0`

The Redis database number.

[id="plugins-{type}s-{plugin}-host"]
===== `host`
===== `host`

* Value type is <<string,string>>
* Default value is `"127.0.0.1"`

The hostname of your Redis server.

[id="plugins-{type}s-{plugin}-key"]
===== `key`
===== `key`

* This is a required setting.
* Value type is <<string,string>>
* There is no default value for this setting.

The name of a Redis list or channel.
The name of a Redis list, sortedset or channel.

[id="plugins-{type}s-{plugin}-priority_reverse"]
===== `priority_reverse`

* Value type is <<boolean,boolean>>
* Default value if `false`

When the data_type is `sortedset`, read the sortedset in reverse order.

[id="plugins-{type}s-{plugin}-password"]
===== `password`
===== `password`

* Value type is <<password,password>>
* There is no default value for this setting.

Password to authenticate with. There is no authentication by default.

[id="plugins-{type}s-{plugin}-port"]
===== `port`
===== `port`

* Value type is <<number,number>>
* Default value is `6379`

The port to connect on.

[id="plugins-{type}s-{plugin}-threads"]
===== `threads`
===== `threads`

* Value type is <<number,number>>
* Default value is `1`



[id="plugins-{type}s-{plugin}-timeout"]
===== `timeout`
===== `timeout`

* Value type is <<number,number>>
* Default value is `5`
Expand Down
129 changes: 121 additions & 8 deletions lib/logstash/inputs/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
require "logstash/inputs/threadable"
require 'redis'

# This input will read events from a Redis instance; it supports both Redis channels and lists.
# This input will read events from a Redis instance; it supports both Redis channels, lists and sortedsets.
#
# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
# the channel commands used by Logstash are found in Redis v1.3.8+.
# While you may be able to make these Redis versions work, the best performance
# and stability will be found in more recent stable versions. Versions 2.6.0+
# are recommended.
#
# The sortedset commands (ZREM, ZREMRANGEBYRANK, ZRANGE, ZREVRANGE) used by Logstash are supported
# in Redis v2.0.0+
#
# For more information about Redis, see <http://redis.io/>
#
# `batch_count` note: If you use the `batch_count` setting, you *must* use a Redis version 2.6.0 or
Expand Down Expand Up @@ -41,10 +45,14 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
# The name of a Redis list or channel.
config :key, :validate => :string, :required => true

# Specify either list or channel. If `redis\_type` is `list`, then we will BLPOP the
# key. If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.
# If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => true
# Pop high scores item first in sortedset. No effect for other data types
config :priority_reverse, :validate => :boolean, :default => false

# Specify either list or channel. If `redis\_type` is `list`, then we will BLPOP
# the key. If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.
# If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
# If `redis\_type` is `sortedset`, then we will ZRANGE/ZREVRANGE
config :data_type, :validate => [ "list", "channel", "pattern_channel", "sortedset" ], :required => true

# The number of events to return from Redis using EVAL.
config :batch_count, :validate => :number, :default => 125
Expand Down Expand Up @@ -76,6 +84,9 @@ def register
if @data_type == 'list' || @data_type == 'dummy'
@run_method = method(:list_runner)
@stop_method = method(:list_stop)
elsif @data_type == 'sortedset'
@run_method = method(:sortedset_runner)
@stop_method = method(:sortedset_stop)
elsif @data_type == 'channel'
@run_method = method(:channel_runner)
@stop_method = method(:subscribe_stop)
Expand All @@ -84,7 +95,9 @@ def register
@stop_method = method(:subscribe_stop)
end

#TODO voir à terme comment fusionner ces deux méthodes
@list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)
@sortedset_method = batched? ? method(:sortedset_batch_listener) : method(:sortedset_single_listener)

@identity = "#{@redis_url} #{@data_type}:#{@key}"
@logger.info("Registering Redis", :identity => @identity)
Expand Down Expand Up @@ -112,6 +125,11 @@ def is_list_type?
@data_type == 'list'
end

# private
def is_sortedset_type?
@data_type == 'sortedset'
end

# private
def redis_params
{
Expand All @@ -131,12 +149,13 @@ def internal_redis_builder
# private
def connect
redis = new_redis_instance
load_batch_script(redis) if batched? && is_list_type?
list_load_batch_script(redis) if batched? && is_list_type?
sortedset_load_batch_script(redis) if batched? && is_sortedset_type?
redis
end # def connect

# private
def load_batch_script(redis)
def list_load_batch_script(redis)
#A Redis Lua EVAL script to fetch a count of keys
redis_script = <<EOF
local batchsize = tonumber(ARGV[1])
Expand All @@ -147,6 +166,31 @@ def load_batch_script(redis)
@redis_script_sha = redis.script(:load, redis_script)
end

# private
def sortedset_load_batch_script(redis)
#A Redis Lua EVAL script to fetch a count of keys
redis_script = "local batchsize = tonumber(ARGV[1])\n"
redis_script << "local zcard = tonumber(redis.call('zcard', KEYS[1]))\n"
redis_script << "local result = redis.call('"
if @priority_reverse then
redis_script << 'zrevrange'
else
redis_script << 'zrange'
end
redis_script << "', KEYS[1], 0, batchsize)\n"
redis_script << "redis.call('"

redis_script << "zremrangebyrank', KEYS[1], "
if @priority_reverse then
redis_script << "zcard - batchsize - 1, zcard"
else
redis_script << "0, batchsize"
end
redis_script << ")\nreturn result\n"

@redis_script_sha = redis.script(:load, redis_script)
end

# private
def queue_event(msg, output_queue)
begin
Expand Down Expand Up @@ -214,7 +258,7 @@ def list_batch_listener(redis, output_queue)
rescue ::Redis::CommandError => e
if e.to_s =~ /NOSCRIPT/ then
@logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e);
load_batch_script(redis)
list_load_batch_script(redis)
retry
else
raise e
Expand All @@ -231,6 +275,75 @@ def list_single_listener(redis, output_queue)
queue_event(item.last, output_queue)
end

# private
def sortedset_stop
return if @redis.nil? || [email protected]?

@redis.quit rescue nil
@redis = nil
end

# private
def sortedset_runner(output_queue)
while !stop?
begin
@redis ||= connect
@sortedset_method.call(@redis, output_queue)
rescue ::Redis::BaseError => e
@logger.warn("Redis connection problem", :exception => e)
# Reset the redis variable to trigger reconnect
@redis = nil
# this sleep does not need to be stoppable as its
# in a while !stop? loop
sleep 1
end
end
end

def sortedset_batch_listener(redis, output_queue)
begin
results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1])
results.each do |item|
queue_event(item, output_queue)
end

if results.size.zero?
sleep BATCH_EMPTY_SLEEP
end
rescue ::Redis::CommandError => e
if e.to_s =~ /NOSCRIPT/ then
@logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e);
sortedset_load_batch_script(redis)
retry
else
raise e
end
end
end

def sortedset_single_listener(redis, output_queue)
redis.watch(@key) do
if @priority_reverse then
item = redis.zrevrange(@key, 0, 0, :timeout => 1)
else
item = redis.zrange(@key, 0, 0, :timeout => 1)
end

if item.size.zero?
sleep BATCH_EMPTY_SLEEP
end

return unless item.size > 0

redis.multi do |multi|
redis.zrem(@key, item)
end

queue_event(item.first, output_queue)
end
end


# private
def subscribe_stop
return if @redis.nil? || [email protected]?
Expand Down
Loading