Skip to content

ES|QL support #233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a307db2
ES|QL support: ESQL executor implementation, response type to accept …
mashhurs Apr 8, 2025
9c35f22
Merge with upstream, warn if query doesn't include METADATA which DSL…
mashhurs Apr 9, 2025
6f99055
Run unit tests with the LS version which actually supports the ES|QL.
mashhurs Apr 10, 2025
086a592
Add query type to the agent. DRY of supported ES/LS versions.
mashhurs Apr 10, 2025
e30e0f9
Remove query type from user-agent since it is useless, put back accid…
mashhurs Apr 10, 2025
7746c14
Initial docs added for ES|QL.
mashhurs Apr 10, 2025
76303d8
Update query to include condition with string.
mashhurs Apr 11, 2025
1fb29f7
Tested escaped chars cases, uses orignal query.
mashhurs Apr 12, 2025
5d47f2f
Integration tests added.
mashhurs Apr 14, 2025
c291e24
Skip the ESQL test if LS with the ES client which doesn't support ESQ…
mashhurs Apr 14, 2025
22e72e9
Add comments on response type and query params about ES|QL acceptance…
mashhurs Apr 14, 2025
af6e24a
Update spec/inputs/integration/elasticsearch_esql_spec.rb
mashhurs Apr 21, 2025
4ce6fa4
Integration test skip condition correction.
mashhurs Apr 21, 2025
4ed69ff
Introduce query_params option to accept drop_null_columns, set defaul…
mashhurs Apr 24, 2025
0725f98
Fix the failed integration test.
mashhurs Apr 25, 2025
cfb36f3
Request dropping null columns and filter out null values. Consider se…
mashhurs May 1, 2025
a92a71e
Apply suggestions from code review
mashhurs May 7, 2025
d4f559d
Apply code review suggestions: to use decorator as a proc call, doc s…
mashhurs May 7, 2025
65eb675
Rename warning msg field name to avoid conflicts. Generate a target a…
mashhurs May 8, 2025
789f467
Ignore sub-fields with warninigs and keep only parent.
mashhurs May 8, 2025
fefe6a0
Introduce at high level which other params such as , etc.. follow it…
mashhurs May 26, 2025
e108c87
Add a tech preview fior the ESQL.
mashhurs May 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 5.2.0
- ES|QL support [#233](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/233)

## 5.1.0
- Add "cursor"-like index tracking [#205](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205)

Expand Down
109 changes: 105 additions & 4 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,101 @@ The next scheduled run:
* uses {ref}/point-in-time-api.html#point-in-time-api[Point in time (PIT)] + {ref}/paginate-search-results.html#search-after[Search after] to paginate through all the data, and
* updates the value of the field at the end of the pagination.

[id="plugins-{type}s-{plugin}-esql"]
==== ES|QL support
{es} Query Language (ES|QL) provides a SQL-like interface for querying your {es} data.

To use {esql}, this plugin needs to be installed in {ls} 8.17.4 or newer, and must be connected to {es} 8.11 or newer.

To configure {esql} query in the plugin, set the `response_type` to `esql` and provide your {esql} query in the `query` parameter.

IMPORTANT: {esql} is evolving and may still have limitations with regard to result size or supported field types. We recommend understanding https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-limitations.html[ES|QL current limitations] before using it in production environments.

The following is a basic scheduled ES|QL query that runs hourly:
[source, ruby]
input {
elasticsearch {
id => hourly_cron_job
hosts => [ 'https://..']
api_key => '....'
response_type => 'esql'
query => '
FROM food-index
| WHERE spicy_level = "hot" AND @timestamp > NOW() - 1 hour
| LIMIT 500
'
schedule => '0 * * * *' # every hour at min 0
}
}

Set `config.support_escapes: true` in `logstash.yml` if you need to escape special chars in the query.

NOTE: With ES|QL query, {ls} doesn't generate `event.original`.

[id="plugins-{type}s-{plugin}-esql-event-mapping"]
===== Mapping ES|QL result to {ls} event
ES|QL returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries).
The plugin maps each value entry to an event, populating corresponding fields.
For example, a query might produce a table like:

[cols="2,1,1,1,2",options="header"]
|===
|`timestamp` |`user_id` | `action` | `status.code` | `status.desc`

|2025-04-10T12:00:00 |123 |login |200 | Success
|2025-04-10T12:05:00 |456 |purchase |403 | Forbidden (unauthorized user)
|===

For this case, the plugin emits two events look like
[source, json]
[
{
"timestamp": "2025-04-10T12:00:00",
"user_id": 123,
"action": "login",
"status": {
"code": 200,
"desc": "Success"
}
},
{
"timestamp": "2025-04-10T12:05:00",
"user_id": 456,
"action": "purchase",
"status": {
"code": 403,
"desc": "Forbidden (unauthorized user)"
}
}
]

NOTE: If your index has a mapping with sub-objects where `status.code` and `status.desc` actually dotted fields, they appear in {ls} events as a nested structure.

[id="plugins-{type}s-{plugin}-esql-multifields"]
===== Conflict on multi-fields

ES|QL query fetches all parent and sub-fields fields if your {es} index has https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] or https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/subobjects[subobjects].
Since {ls} events cannot contain parent field's concrete value and sub-field values together, the plugin cannot map the result to {ls} event and produces `_elasticsearch_input_failure` tagged failed event.
We recommend using the `RENAME` (or `DROP`) keyword in your ES|QL query explicitly rename the fields to overcome this issue.
To illustrate the situation with example, assuming your mapping has a time `time` field with `time.min` and `time.max` sub-fields as following:
[source, ruby]
"properties": {
"time": { "type": "long" },
"time.min": { "type": "long" },
"time.max": { "type": "long" }
}

The ES|QL result will contain all three fields but the plugin cannot map them into {ls} event.

This a common occurence if your template or mapping follows the pattern of always indexing strings as "text" (`field`) + " keyword" (`field.keyword`) multi-field. In this case it's recommended to do `KEEP field` if the string is identical and there is only one subfield as the engine will optimize and retrieve the keyword, otherwise you can do `KEEP field.keyword | RENAME field.keyword as field` .
To avoid this, you can use the `RENAME` keyword to rename the `time` parent field to get all three fields with unique fields.
[source, ruby]
...
query => 'FROM my-index | RENAME time AS time.current'
...

For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[{es} ES|QL documentation].

[id="plugins-{type}s-{plugin}-options"]
==== Elasticsearch Input configuration options

Expand Down Expand Up @@ -257,7 +352,7 @@ Please check out <<plugins-{type}s-{plugin}-obsolete-options>> for details.
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-proxy>> |<<uri,uri>>|No
| <<plugins-{type}s-{plugin}-query>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations"]`|No
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations","esql"]`|No
| <<plugins-{type}s-{plugin}-request_timeout_seconds>> | <<number,number>>|No
| <<plugins-{type}s-{plugin}-schedule>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-schedule_overlap>> |<<boolean,boolean>>|No
Expand Down Expand Up @@ -498,26 +593,32 @@ environment variables e.g. `proxy => '${LS_PROXY:}'`.
* Value type is <<string,string>>
* Default value is `'{ "sort": [ "_doc" ] }'`

The query to be executed. Read the {ref}/query-dsl.html[Elasticsearch query DSL
documentation] for more information.
The query to be executed.
Accepted query shape is DSL or ES|QL (when `response_type => 'esql'`).
Read the {ref}/query-dsl.html[{es} query DSL documentation] or {ref}/esql.html[{es} ES|QL documentation] for more information.

When <<plugins-{type}s-{plugin}-search_api>> resolves to `search_after` and the query does not specify `sort`,
the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the query. Please refer to the {ref}/paginate-search-results.html#search-after[Elasticsearch search_after] parameter to know more.

[id="plugins-{type}s-{plugin}-response_type"]
===== `response_type`

* Value can be any of: `hits`, `aggregations`
* Value can be any of: `hits`, `aggregations`, `esql`
* Default value is `hits`

Which part of the result to transform into Logstash events when processing the
response from the query.

The default `hits` will generate one event per returned document (i.e. "hit").

When set to `aggregations`, a single Logstash event will be generated with the
contents of the `aggregations` object of the query's response. In this case the
`hits` object will be ignored. The parameter `size` will be always be set to
0 regardless of the default or user-defined value set in this plugin.

When using the `esql` setting, the query must be a valid ES|QL string.
When this setting is active, `index`, `size`, `slices`, `search_api`, `docinfo`, `docinfo_target` and `docinfo_fields` parameters are not allowed.

[id="plugins-{type}s-{plugin}-request_timeout_seconds"]
===== `request_timeout_seconds`

Expand Down
89 changes: 66 additions & 23 deletions lib/logstash/inputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
require 'logstash/inputs/elasticsearch/paginated_search'
require 'logstash/inputs/elasticsearch/aggregation'
require 'logstash/inputs/elasticsearch/cursor_tracker'
require 'logstash/inputs/elasticsearch/esql'

include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
Expand All @@ -96,15 +97,18 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
# The index or alias to search.
config :index, :validate => :string, :default => "logstash-*"

# The query to be executed. Read the Elasticsearch query DSL documentation
# for more info
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
# The query to be executed. DSL or ES|QL (when `response_type => 'esql'`) query shape is accepted.
# Read the following documentations for more info
# Query DSL: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
# ES|QL: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html
config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }'

# This allows you to speccify the response type: either hits or aggregations
# where hits: normal search request
# aggregations: aggregation request
config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits'
# This allows you to speccify the response type: one of [hits, aggregations, esql]
# where
# hits: normal search request
# aggregations: aggregation request
# esql: ES|QL request
config :response_type, :validate => %w[hits aggregations esql], :default => 'hits'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migrating to query_type with auto-detection of ESQL queries would be pretty straight-forward with the NormalizeConfigSupport mixin:

Suggested change
config :response_type, :validate => %w[hits aggregations esql], :default => 'hits'
config :response_type, :validate => %w[hits aggregations], :deprecated => "use `query_type`"
config :query_type, :validate => %w[hits aggregations esql] # default depends on query shape
   def register
+    @query_type = normalize_config("query_type") do |normalizer|
+      normalizer.with_deprecated_alias("response_type")
+    end || (@query.start_with?('{') ? 'hits' : 'esql')

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to add the deprecation right after this ES|QL change.
One agreement we need to decide is naming. I personally do not like hits, aggregations along with esql. They indicate different contexts. I had options dsl_search, dsl_aggregation and esql.
Let me please know your opinion: I can either apply with change if we quickly come with agreement or create an issue follow up right after this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to add the deprecation right after this ES|QL change.

If someone starts using this feature, I would rather that their never-possible-before configuration feels "stable" and doesn't require them to go back and deal with deprecation warnings for things that we knew about before shipping the feature.

They indicate different contexts

This is a very good point.

The current response_type only makes sense in the context of DSL-based queries.

So: what if we were to keep response_type, but constrain its use to query_type => dsl?

This would mean:

  • query_type => dsl: allows use of response_type
  • query_type => esql: prohibits use of response_type
  • unspecified query_type could have a sensible default based on the shape of query:
    • if it looks like JSON, then it's dsl
    • if it looks like ES|QL then it's esql
    • else we error helpfully

Copy link
Contributor Author

@mashhurs mashhurs May 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing query_type and keep using response_type was my initial design and we with @jsvd thinking if we can still simplify without introducing new param (and came to agreement in our 1:1 to support wth response_type and deprecate it in the future).

However, considering the behavior and user experience, I do also strongly support this (introducing query_type at high level which other params follow) structural (query type at the high level, then depth details such as what response shape going to be parsed, etc..) logic.
I have applied it with this commit.

FYI: current CI snapshot unit test steps are broken (CIs with release versions are fine) due to core openssl.jar and uri gem miss but I have run on my local with local LS to verify change and unit/integration tests.


# This allows you to set the maximum number of hits returned per scroll.
config :size, :validate => :number, :default => 1000
Expand Down Expand Up @@ -286,6 +290,9 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze
INTERNAL_ORIGIN_HEADER = { 'x-elastic-product-origin' => 'logstash-input-elasticsearch'}.freeze

LS_ESQL_SUPPORT_VERSION = "8.17.4" # the version started using elasticsearch-ruby v8
ES_ESQL_SUPPORT_VERSION = "8.11.0"

def initialize(params={})
super(params)

Expand All @@ -302,10 +309,17 @@ def register
fill_hosts_from_cloud_id
setup_ssl_params!

@base_query = LogStash::Json.load(@query)
if @slices
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
if @response_type == 'esql'
validate_ls_version_for_esql_support!
validate_esql_query!
not_allowed_options = original_params.keys & %w(index size slices search_api, docinfo, docinfo_target, docinfo_fields)
raise(LogStash::ConfigurationError, "Configured #{not_allowed_options} params are not allowed while using ES|QL query") if not_allowed_options&.size > 1
else
@base_query = LogStash::Json.load(@query)
if @slices
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
end
end

@retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`")
Expand Down Expand Up @@ -341,6 +355,8 @@ def register

test_connection!

validate_es_for_esql_support!

setup_serverless

setup_search_api
Expand All @@ -363,16 +379,6 @@ def run(output_queue)
end
end

def get_query_object
if @cursor_tracker
query = @cursor_tracker.inject_cursor(@query)
@logger.debug("new query is #{query}")
else
query = @query
end
LogStash::Json.load(query)
end

##
# This can be called externally from the query_executor
public
Expand All @@ -383,6 +389,23 @@ def push_hit(hit, output_queue, root_field = '_source')
record_last_value(event)
end

def decorate_event(event)
decorate(event)
end

private

def get_query_object
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review note: moved to private area

return @query if @response_type == 'esql'
if @cursor_tracker
query = @cursor_tracker.inject_cursor(@query)
@logger.debug("new query is #{query}")
else
query = @query
end
LogStash::Json.load(query)
end

def record_last_value(event)
@cursor_tracker.record_last_value(event) if @tracking_field
end
Expand Down Expand Up @@ -414,8 +437,6 @@ def set_docinfo_fields(hit, event)
event.set(@docinfo_target, docinfo_target)
end

private

def hosts_default?(hosts)
hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? )
end
Expand Down Expand Up @@ -675,6 +696,8 @@ def setup_query_executor
end
when 'aggregations'
LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self)
when 'esql'
LogStash::Inputs::Elasticsearch::Esql.new(@client, self)
end
end

Expand Down Expand Up @@ -714,6 +737,26 @@ def get_transport_client_class
::Elastic::Transport::Transport::HTTP::Manticore
end

def validate_ls_version_for_esql_support!
if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LS_ESQL_SUPPORT_VERSION)
fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{LS_ESQL_SUPPORT_VERSION}")
end
end

def validate_esql_query!
fail(LogStash::ConfigurationError, "`query` cannot be empty") if @query.strip.empty?
source_commands = %w[FROM ROW SHOW]
contains_source_command = source_commands.any? { |source_command| @query.strip.start_with?(source_command) }
fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command
end

def validate_es_for_esql_support!
return unless @response_type == 'esql'
# make sure connected ES supports ES|QL (8.11+)
es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create(ES_ESQL_SUPPORT_VERSION)
fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{ES_ESQL_SUPPORT_VERSION} version.") unless es_supports_esql
end

module URIOrEmptyValidator
##
# @override to provide :uri_or_empty validator
Expand Down
Loading