diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 59aa036..0b03c6c 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -75,6 +75,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base require 'logstash/inputs/elasticsearch/paginated_search' require 'logstash/inputs/elasticsearch/aggregation' + require 'logstash/inputs/elasticsearch/cursor_tracker' include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck @@ -126,6 +127,22 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # by this pipeline input. config :slices, :validate => :number + # Enable tracking the value of a given field to be used as a cursor + # TODO: main concerns + # * schedule overlap needs to be disabled (hardcoded as enabled) + # * using anything other than _event.timestamp easily leads to data loss + # * the first "synchronization run can take a long time" + # * checkpointing is only safe to do after each run (not per document) + config :tracking_field, :validate => :string + + # Define the initial seed value of the tracking_field + config :tracking_field_seed, :validate => :string + + # The location of where the tracking field value will be stored + # The value is persisted after each scheduled run (and not per result) + # If it's not set it defaults to '${path.data}/plugins/inputs/elasticsearch/last_run_value' + config :last_run_metadata_path, :validate => :string + # If set, include Elasticsearch document information such as index, type, and # the id in the event. # @@ -261,6 +278,10 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # exactly once. config :schedule, :validate => :string + # Allow scheduled runs to overlap (enabled by default). Setting to false will + # only start a new scheduled run after the previous one completes. + config :schedule_overlap, :validate => :string + # If set, the _source of each hit will be added nested under the target instead of at the top-level config :target, :validate => :field_reference @@ -331,18 +352,30 @@ def register setup_query_executor + setup_cursor_tracker + @client end def run(output_queue) if @schedule - scheduler.cron(@schedule) { @query_executor.do_run(output_queue) } + scheduler.cron(@schedule, :overlap => @schedule_overlap) do + @query_executor.do_run(output_queue, get_query_object()) + @cursor_tracker.checkpoint_cursor + end scheduler.join else - @query_executor.do_run(output_queue) + @query_executor.do_run(output_queue, get_query_object()) + @cursor_tracker.checkpoint_cursor end end + def get_query_object + injected_query = @cursor_tracker.inject_cursor(@query) + @logger.debug("new query is #{injected_query}") + query_object = LogStash::Json.load(injected_query) + end + ## # This can be called externally from the query_executor public @@ -351,6 +384,7 @@ def push_hit(hit, output_queue, root_field = '_source') set_docinfo_fields(hit, event) if @docinfo decorate(event) output_queue << event + @cursor_tracker.record_last_value(event) end def set_docinfo_fields(hit, event) @@ -664,6 +698,17 @@ def setup_query_executor end end + def setup_cursor_tracker + if @tracking_field + @tracking_field_seed ||= Time.now.utc.iso8601 + @cursor_tracker = CursorTracker.new(last_run_metadata_path: @last_run_metadata_path, + tracking_field: @tracking_field, + tracking_field_seed: @tracking_field_seed) + else + @cursor_tracker = NoopCursorTracker.new + end + end + module URIOrEmptyValidator ## # @override to provide :uri_or_empty validator diff --git a/lib/logstash/inputs/elasticsearch/aggregation.rb b/lib/logstash/inputs/elasticsearch/aggregation.rb index 0855cb8..d479d89 100644 --- a/lib/logstash/inputs/elasticsearch/aggregation.rb +++ b/lib/logstash/inputs/elasticsearch/aggregation.rb @@ -13,12 +13,7 @@ def initialize(client, plugin) @plugin_params = plugin.params @size = @plugin_params["size"] - @query = @plugin_params["query"] @retries = @plugin_params["retries"] - @agg_options = { - :index => @index, - :size => 0 - }.merge(:body => @query) @plugin = plugin end @@ -33,10 +28,18 @@ def retryable(job_name, &block) false end - def do_run(output_queue) + def aggregation_options(query_object) + { + :index => @index, + :size => 0, + :body => query_object + } + end + + def do_run(output_queue, query_object) logger.info("Aggregation starting") r = retryable(AGGREGATION_JOB) do - @client.search(@agg_options) + @client.search(aggregation_options(query_object)) end @plugin.push_hit(r, output_queue, 'aggregations') if r end diff --git a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb new file mode 100644 index 0000000..1ad7b69 --- /dev/null +++ b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb @@ -0,0 +1,55 @@ +require 'fileutils' + +module LogStash; module Inputs; class Elasticsearch + class NoopCursorTracker + include LogStash::Util::Loggable + def checkpoint_cursor; end + + def converge_last_value; end + + def record_last_value(event); end + + def inject_cursor(query_json); return query_json; end + end + + class CursorTracker + include LogStash::Util::Loggable + + attr_reader :last_value + + def initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:) + @last_run_metadata_path = last_run_metadata_path + @last_run_metadata_path ||= ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "elasticsearch", "last_run_value") + FileUtils.mkdir_p ::File.dirname(@last_run_metadata_path) + @last_value_hashmap = Java::java.util.concurrent.ConcurrentHashMap.new + @last_value = IO.read(@last_run_metadata_path) rescue nil || tracking_field_seed + @tracking_field = tracking_field + logger.info "Starting value for cursor field \"#{@tracking_field}\": #{@last_value}" + end + + def checkpoint_cursor + converge_last_value + IO.write(@last_run_metadata_path, @last_value) + @last_value_hashmap.clear + end + + def converge_last_value + return if @last_value_hashmap.empty? + # TODO this implicitly assumes that the way to converge the value among slices is to pick the highest and we can't assume that + new_last_value = @last_value_hashmap.reduceValues(1, lambda { |v1, v2| Time.parse(v1) < Time.parse(v2) ? v2 : v1 }) + return if new_last_value == @last_value + @last_value = new_last_value + logger.info "New cursor value for field \"#{@tracking_field}\" is: #{new_last_value}" + end + + def record_last_value(event) + value = event.get(@tracking_field) + logger.trace? && logger.trace("storing last_value if #{@tracking_field} for #{Thread.current.object_id}: #{value}") + @last_value_hashmap.put(Thread.current.object_id, value) + end + + def inject_cursor(query_json) + query_json.gsub(":last_value", @last_value) + end + end +end; end; end diff --git a/lib/logstash/inputs/elasticsearch/paginated_search.rb b/lib/logstash/inputs/elasticsearch/paginated_search.rb index 2e8236b..cb1cbdc 100644 --- a/lib/logstash/inputs/elasticsearch/paginated_search.rb +++ b/lib/logstash/inputs/elasticsearch/paginated_search.rb @@ -21,10 +21,14 @@ def initialize(client, plugin) @pipeline_id = plugin.pipeline_id end - def do_run(output_queue) - return retryable_search(output_queue) if @slices.nil? || @slices <= 1 + def do_run(output_queue, query) + @query = query - retryable_slice_search(output_queue) + if @slices.nil? || @slices <= 1 + retryable_search(output_queue) + else + retryable_slice_search(output_queue) + end end def retryable(job_name, &block)