Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: id_key in @type elasticsearch_data_stream doesn't function as expected ... #1045

Open
1 task done
mafazely opened this issue Jan 14, 2024 · 3 comments
Open
1 task done

Comments

@mafazely
Copy link

mafazely commented Jan 14, 2024

Problem

I would like to use the gen_id feature to avoid duplicate documents in Elasticsearch. Currently, I employ Fluent Bit on my servers to read and parse logs. These logs are then sent to Fluentd, acting as an aggregator, which subsequently pushes them to the Elasticsearch cluster.
However, I've encountered an issue where the id_key doesn't function as expected on the elasticsearch_data_stream. This results in the addition of a _hash field to the documents in Elasticsearch, but it is not being recognized as the _id field.

Steps to replicate

<system>
  log_level info
  workers 8
</system>

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

<filter data.**>
  @type record_transformer
  enable_ruby true
  <record>
    ...
  </record>
</filter>

<filter data.**>
  @type elasticsearch_genid
  use_record_as_seed true
  record_keys []
  use_entire_record true
  hash_type sha1
  hash_id_key _hash
  separator _
  inc_time_as_key false
  inc_tag_as_key false
</filter>

<filter stream.**>
  @type elasticsearch_genid
  use_record_as_seed true
  record_keys []
  use_entire_record true
  hash_type sha1
  hash_id_key _hash
  separator _
  inc_time_as_key false
  inc_tag_as_key false
</filter>

<match data.**>
  @type copy
  <store>
    @type prometheus
    <metric>
      name fluentd_output_status_num_records_total
      type counter
      desc The total number of processed records for data streams
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </store>
  <store>
    @type elasticsearch_data_stream
    host sample_host:9200
    scheme https
    ssl_verify false
    id_key _hash
    write_operation create
    remove_keys _hash
    user sample_user
    password sample_pass
    include_tag_key true
    include_timestamp true
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    data_stream_name data
    buffer_type memory
    retry_forever true
    overflow_action block
    <buffer>
      @type memory
      flush_thread_count 1
      flush_interval 10s
      chunk_limit_size 4M
      total_limit_size 512M
      retry_forever true
    </buffer>
  </store>
</match>

<match stream.**>
  @type copy
  <store>
    @type prometheus
    <metric>
      name fluentd_output_status_num_records_total
      type counter
      desc The total number of processed records for data streams
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </store>
  <store>
    @type elasticsearch_data_stream
    hosts sample_host:9200
    scheme https
    ssl_verify false
    id_key _hash
    write_operation create
    remove_keys _hash
    user sample_user
    password sample_pass
    include_tag_key true
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    data_stream_name stream_log
    buffer_type memory
    retry_forever true
    overflow_action block
    <buffer>
      @type memory
      flush_thread_count 1
      flush_interval 10s
      chunk_limit_size 4M
      total_limit_size 512M
      retry_forever true
    </buffer>
  </store>
</match>

Expected Behavior or What you need to ask

Use _hash as id field for checking the uniqueness of docs in elasticsearch but didn't do this.

Using Fluentd and ES plugin versions

  • Debian GNU/Linux 11 (bullseye)
  • Bare Metal
  • fluent-package 5.0.2 fluentd 1.16.3
  • ES plugin :
*** LOCAL GEMS ***
abbrev (default: 0.1.1)
addressable (2.8.5)
async (1.31.0)
async-http (0.61.0)
async-io (1.38.0)
async-pool (0.4.0)
aws-eventstream (1.2.0)
aws-partitions (1.785.0)
aws-sdk-core (3.178.0)
aws-sdk-kms (1.71.0)
aws-sdk-s3 (1.129.0)
aws-sdk-sqs (1.61.0)
aws-sigv4 (1.6.0)
base64 (0.2.0, default: 0.1.1)
benchmark (default: 0.2.1)
bigdecimal (default: 3.1.3)
bindata (2.4.15)
bundler (default: 2.5.3, 2.3.26)
cgi (default: 0.3.6)
cmetrics (0.3.3)
concurrent-ruby (1.2.2)
console (1.23.2)
cool.io (1.8.0)
csv (default: 3.2.6)
date (default: 3.3.3)
debug (1.7.1)
delegate (default: 0.3.0)
did_you_mean (default: 1.6.3)
digest (default: 3.1.1)
digest-crc (0.6.5)
digest-murmurhash (1.1.1)
drb (default: 2.1.1)
elastic-transport (8.3.0)
elasticsearch (8.11.0, 8.8.0)
elasticsearch-api (8.11.0, 8.8.0)
english (default: 0.7.2)
erb (default: 4.0.2)
error_highlight (default: 0.5.1)
etc (default: 1.4.2)
excon (0.108.0, 0.104.0)
faraday (2.7.12)
faraday-excon (2.1.0)
faraday-net_http (3.0.2)
faraday_middleware-aws-sigv4 (1.0.1)
fcntl (default: 1.0.2)
ffi (1.15.5)
fiber-annotation (0.2.0)
fiber-local (1.0.0)
fiddle (default: 1.1.1)
fileutils (1.7.2, default: 1.7.0)
find (default: 0.1.1)
fluent-config-regexp-type (1.0.0)
fluent-diagtool (1.0.3)
fluent-logger (0.9.0)
fluent-plugin-calyptia-monitoring (0.1.3)
fluent-plugin-elasticsearch (5.4.2, 5.4.0)
fluent-plugin-flowcounter-simple (0.1.0)
fluent-plugin-kafka (0.19.2)
fluent-plugin-metrics-cmetrics (0.1.2)
fluent-plugin-opensearch (1.1.4)
fluent-plugin-prometheus (2.1.0)
fluent-plugin-prometheus_pushgateway (0.1.1)
fluent-plugin-record-modifier (2.2.0, 2.1.1)
fluent-plugin-rewrite-tag-filter (2.4.0)
fluent-plugin-s3 (1.7.2)
fluent-plugin-sd-dns (0.1.0)
fluent-plugin-systemd (1.0.5)
fluent-plugin-td (1.2.0)
fluent-plugin-utmpx (0.5.0)
fluent-plugin-webhdfs (1.5.0)
fluentd (1.16.3)
forwardable (default: 1.3.3)
getoptlong (default: 0.2.0)
hirb (0.7.3)
http_parser.rb (0.8.0)
httpclient (2.8.3)
io-console (default: 0.6.0)
io-nonblock (default: 0.2.0)
io-wait (default: 0.3.0)
ipaddr (default: 1.2.5)
irb (default: 1.6.2)
jmespath (1.6.2)
json (default: 2.6.3)
linux-utmpx (0.3.0)
logger (default: 1.5.3)
ltsv (0.1.2)
matrix (0.4.2)
mini_portile2 (2.8.2)
minitest (5.16.3)
msgpack (1.7.2)
multi_json (1.15.0)
mutex_m (default: 0.1.2)
net-ftp (0.2.0)
net-http (default: 0.3.2)
net-imap (0.3.4)
net-pop (0.1.2)
net-protocol (default: 0.2.1)
net-smtp (0.3.3)
nio4r (2.6.1)
nkf (default: 0.1.2)
observer (default: 0.1.1)
oj (3.16.1)
open-uri (default: 0.3.0)
open3 (default: 0.1.2)
opensearch-api (2.2.0)
opensearch-ruby (2.1.0)
opensearch-transport (2.1.0)
openssl (default: 3.1.0)
optparse (default: 0.3.1)
ostruct (default: 0.5.5)
parallel (1.20.1)
pathname (default: 0.2.1)
power_assert (2.0.3)
pp (default: 0.4.0)
prettyprint (default: 0.1.1)
prime (0.1.2)
prometheus-client (2.1.0)
protocol-hpack (1.4.2)
protocol-http (0.25.0)
protocol-http1 (0.16.0)
protocol-http2 (0.15.1)
pstore (default: 0.1.2)
psych (default: 5.0.1)
public_suffix (5.0.4)
racc (default: 1.6.2)
rake (13.1.0, 13.0.6)
rbs (2.8.2)
rdkafka (0.12.0)
rdoc (default: 6.5.0)
readline (default: 0.0.3)
readline-ext (default: 0.1.5)
reline (default: 0.3.2)
resolv (default: 0.2.2)
resolv-replace (default: 0.1.1)
rexml (3.2.6, 3.2.5)
rinda (default: 0.1.1)
rss (0.2.9)
ruby-kafka (1.5.0)
ruby-progressbar (1.13.0)
ruby2_keywords (default: 0.0.5)
rubygems-update (3.5.3)
rubyzip (1.3.0)
securerandom (default: 0.2.2)
serverengine (2.3.2)
set (default: 1.0.3)
shellwords (default: 0.1.0)
sigdump (0.2.5)
singleton (default: 0.1.1)
stringio (default: 3.0.4)
strptime (0.2.5)
strscan (default: 3.0.5)
syntax_suggest (default: 1.0.2)
syslog (default: 0.1.1)
systemd-journal (1.4.2)
td (0.17.1)
td-client (1.0.8)
td-logger (0.3.28)
tempfile (default: 0.1.3)
test-unit (3.5.7)
time (default: 0.2.2)
timeout (default: 0.3.1)
timers (4.3.5)
tmpdir (default: 0.1.3)
traces (0.11.1)
tsort (default: 0.1.1)
typeprof (0.21.3)
tzinfo (2.0.6)
tzinfo-data (1.2023.3)
un (default: 0.2.1)
uri (0.12.2, default: 0.12.1)
weakref (default: 0.1.2)
webhdfs (0.10.2)
webrick (1.8.1)
yajl-ruby (1.4.3)
yaml (default: 0.2.1)
zip-zip (0.3)
zlib (default: 3.0.0)
  • ES version 8.11.4
@mafazely
Copy link
Author

Any recommendations?

@castorsky
Copy link

castorsky commented Mar 25, 2024

Same here. id_key and remove_keys are ignored by elasticsearch_data_stream.

ElasticSearch 8.5
Fluentd 1.16.4 (container)

@sturmf
Copy link

sturmf commented May 3, 2024

I would also need this functionality. Is it true that this feature is not supported for data streams or are we missing some configuration?

And is there a technical reason for this or has just no one had the time to implement it so far?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants