Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce cursor tracking akin to jdbc input #205

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions lib/logstash/inputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base

require 'logstash/inputs/elasticsearch/paginated_search'
require 'logstash/inputs/elasticsearch/aggregation'
require 'logstash/inputs/elasticsearch/cursor_tracker'

include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
Expand Down Expand Up @@ -126,6 +127,22 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
# by this pipeline input.
config :slices, :validate => :number

# Enable tracking the value of a given field to be used as a cursor
# TODO: main concerns
# * schedule overlap needs to be disabled (hardcoded as enabled)
# * using anything other than _event.timestamp easily leads to data loss
# * the first "synchronization run can take a long time"
# * checkpointing is only safe to do after each run (not per document)
config :tracking_field, :validate => :string

# Define the initial seed value of the tracking_field
config :tracking_field_seed, :validate => :string

# The location of where the tracking field value will be stored
# The value is persisted after each scheduled run (and not per result)
# If it's not set it defaults to '${path.data}/plugins/inputs/elasticsearch/last_run_value'
config :last_run_metadata_path, :validate => :string

# If set, include Elasticsearch document information such as index, type, and
# the id in the event.
#
Expand Down Expand Up @@ -261,6 +278,10 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
# exactly once.
config :schedule, :validate => :string

# Allow scheduled runs to overlap (enabled by default). Setting to false will
# only start a new scheduled run after the previous one completes.
config :schedule_overlap, :validate => :string

# If set, the _source of each hit will be added nested under the target instead of at the top-level
config :target, :validate => :field_reference

Expand Down Expand Up @@ -331,18 +352,30 @@ def register

setup_query_executor

setup_cursor_tracker

@client
end

def run(output_queue)
if @schedule
scheduler.cron(@schedule) { @query_executor.do_run(output_queue) }
scheduler.cron(@schedule, :overlap => @schedule_overlap) do
@query_executor.do_run(output_queue, get_query_object())
@cursor_tracker.checkpoint_cursor
end
scheduler.join
else
@query_executor.do_run(output_queue)
@query_executor.do_run(output_queue, get_query_object())
@cursor_tracker.checkpoint_cursor
end
end

def get_query_object
injected_query = @cursor_tracker.inject_cursor(@query)
@logger.debug("new query is #{injected_query}")
query_object = LogStash::Json.load(injected_query)
end

##
# This can be called externally from the query_executor
public
Expand All @@ -351,6 +384,7 @@ def push_hit(hit, output_queue, root_field = '_source')
set_docinfo_fields(hit, event) if @docinfo
decorate(event)
output_queue << event
@cursor_tracker.record_last_value(event)
end

def set_docinfo_fields(hit, event)
Expand Down Expand Up @@ -664,6 +698,17 @@ def setup_query_executor
end
end

def setup_cursor_tracker
if @tracking_field
@tracking_field_seed ||= Time.now.utc.iso8601
@cursor_tracker = CursorTracker.new(last_run_metadata_path: @last_run_metadata_path,
tracking_field: @tracking_field,
tracking_field_seed: @tracking_field_seed)
else
@cursor_tracker = NoopCursorTracker.new
end
end

module URIOrEmptyValidator
##
# @override to provide :uri_or_empty validator
Expand Down
17 changes: 10 additions & 7 deletions lib/logstash/inputs/elasticsearch/aggregation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@ def initialize(client, plugin)
@plugin_params = plugin.params

@size = @plugin_params["size"]
@query = @plugin_params["query"]
@retries = @plugin_params["retries"]
@agg_options = {
:index => @index,
:size => 0
}.merge(:body => @query)

@plugin = plugin
end
Expand All @@ -33,10 +28,18 @@ def retryable(job_name, &block)
false
end

def do_run(output_queue)
def aggregation_options(query_object)
{
:index => @index,
:size => 0,
:body => query_object
}
end

def do_run(output_queue, query_object)
logger.info("Aggregation starting")
r = retryable(AGGREGATION_JOB) do
@client.search(@agg_options)
@client.search(aggregation_options(query_object))
end
@plugin.push_hit(r, output_queue, 'aggregations') if r
end
Expand Down
55 changes: 55 additions & 0 deletions lib/logstash/inputs/elasticsearch/cursor_tracker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
require 'fileutils'

module LogStash; module Inputs; class Elasticsearch
class NoopCursorTracker
include LogStash::Util::Loggable
def checkpoint_cursor; end

def converge_last_value; end

def record_last_value(event); end

def inject_cursor(query_json); return query_json; end
end

class CursorTracker
include LogStash::Util::Loggable

attr_reader :last_value

def initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:)
@last_run_metadata_path = last_run_metadata_path
@last_run_metadata_path ||= ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "elasticsearch", "last_run_value")
FileUtils.mkdir_p ::File.dirname(@last_run_metadata_path)
@last_value_hashmap = Java::java.util.concurrent.ConcurrentHashMap.new
@last_value = IO.read(@last_run_metadata_path) rescue nil || tracking_field_seed
@tracking_field = tracking_field
logger.info "Starting value for cursor field \"#{@tracking_field}\": #{@last_value}"
end

def checkpoint_cursor
converge_last_value
IO.write(@last_run_metadata_path, @last_value)
@last_value_hashmap.clear
end

def converge_last_value
return if @last_value_hashmap.empty?
# TODO this implicitly assumes that the way to converge the value among slices is to pick the highest and we can't assume that
new_last_value = @last_value_hashmap.reduceValues(1, lambda { |v1, v2| Time.parse(v1) < Time.parse(v2) ? v2 : v1 })
jsvd marked this conversation as resolved.
Show resolved Hide resolved
return if new_last_value == @last_value
@last_value = new_last_value
logger.info "New cursor value for field \"#{@tracking_field}\" is: #{new_last_value}"
end

def record_last_value(event)
value = event.get(@tracking_field)
logger.trace? && logger.trace("storing last_value if #{@tracking_field} for #{Thread.current.object_id}: #{value}")
@last_value_hashmap.put(Thread.current.object_id, value)
end

def inject_cursor(query_json)
query_json.gsub(":last_value", @last_value)
end
end
end; end; end
10 changes: 7 additions & 3 deletions lib/logstash/inputs/elasticsearch/paginated_search.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ def initialize(client, plugin)
@pipeline_id = plugin.pipeline_id
end

def do_run(output_queue)
return retryable_search(output_queue) if @slices.nil? || @slices <= 1
def do_run(output_queue, query)
@query = query

retryable_slice_search(output_queue)
if @slices.nil? || @slices <= 1
retryable_search(output_queue)
else
retryable_slice_search(output_queue)
end
end

def retryable(job_name, &block)
Expand Down