Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report input level dropped/filtered event metrics #42325

Open
andrewkroh opened this issue Jan 16, 2025 · 29 comments
Open

Report input level dropped/filtered event metrics #42325

andrewkroh opened this issue Jan 16, 2025 · 29 comments
Assignees
Labels

Comments

@andrewkroh
Copy link
Member

Describe the enhancement:

Today, metrics related to event publishing are available only in an aggregated form that combines metrics from all inputs. This makes it difficult to know which specific input source is responsible for dropped or filtered events.

The input metrics should contain data about the number of events dropped or filtered by each input. This would help narrow down the source of problems to a particular input (and hence a particular integration data stream in the case of Elastic Agent).

I believe this would require the beat publisher client (from libbeat/publisher) to provide a way for inputs to subscribe to this data.

Describe a specific use case for the enhancement or feature:

  • Diagnostic dumps from Elastic Agent will contain dropped and filtered metrics that are specific to an integration data stream (via input_metrics.json).
  • Input metric dashboards in the Elastic Agent fleet integration can display dropped/filtered charts, providing a visual representation of the issue.
  • Standalone beat users can access detailed information about which input is causing drops through the HTTP monitoring API or periodical logs when logging_metrics_namespaces: [stats, inputs] is used. This would allow them to quickly identify and resolve issues.
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@cmacknz
Copy link
Member

cmacknz commented Jan 16, 2025

We already know the target index/datastream for each event in the output, I wonder if we could instead have per data stream stats at the output level? That might get the same result with less internal pipeline wiring.

@andrewkroh
Copy link
Member Author

I like the idea. However, for standalone Beats users this might not be as useful since most users are writing all data to filebeat-x.x.x. But if it's significantly easier, then perhaps that's a good trade-off.

@andrewkroh
Copy link
Member Author

On a related note, the warning messages that are logged when an event is dropped do not contain the index. We should log the index name and the pipeline (if set) as structured data.

client.log.Warnw(fmt.Sprintf("Cannot index event '%s' (status=%v): %s, dropping event!", encodedEvent, itemStatus, itemMessage), logp.TypeKey, logp.EventType)

@nimarezainia
Copy link
Contributor

We already know the target index/datastream for each event in the output, I wonder if we could instead have per data stream stats at the output level? That might get the same result with less internal pipeline wiring.

even for dropped events?

per datastream stats would be very useful.

@AndersonQ
Copy link
Member

Hello folks,

I started looking at it and I have a few questions to ask.

The intent is to have the dropped and filtered by input. So, for completeness I believe we should also have published or just follow the current approach and have total, published, filtered, dropped. DO you have any objection?

So far, from what I've seen, easiest to implement would be to add those at the pipeline metrics.

Making the pipeline or the output some how to communicate back to the input would be either a big change or some hack. So if the aim is just to have the metrics, rather than having them on a specific place, I'd avoid having them on the inputs. Is it ok?

@AndersonQ
Copy link
Member

Adding the new metrics in the pipeline, it'd go to the beat_metrics.json file in the diagnostics and it'd be like that:

  "libbeat": {
    "pipeline": {
      "clients": 2,
      "events": {
        "active": 0,
        "dropped": 0,
        "failed": 0,
        "filtered": 4,
        "published": 1984,
        "retry": 1600,
        "total": 1988
      },
      "inputs": {
        "logfile-system-6ecb8afb-3445-41e8-a7bd-914959b160d4": {
          "events": {
            "failed": 0,
            "filtered": 0,
            "published": 1984,
            "total": 0
          }
        }
      },

@andrewkroh
Copy link
Member Author

It would be easier to consume if it were present in the HTTP /inputs/ data or input_metrics.json because all of the data would be in one place. Looking further down the "pipeline", how will we handle ingesting that data into Elasticsearch?

The metrics above are ingested into metrics-elastic_agent.filebeat-* as one doc per sampling. To make effective use of this data in ES, I think we would need to split the input section into one doc per input. If it gets added to HTTP /inputs/ then it lands in metrics-elastic-agent.filebeat_inputs-* which already creates one doc per input.

So if we do put the data into libbeat pipeline metrics, I would still probably recommend doing an optimistic "join" to bring those pipeline metrics into the HTTP /inputs/ response (which also affects input_metrics.json). The way this could work is that when generating the response data for /inputs/, in addition to grabbing a metric snapshot from the dataset monitoring namespace (which contains input metrics), that it also grabs a snapshot of default stats namespace, and it adds the associated pipeline metrics into each input.

This is the code that would need modified to do this. I'd be happy to take care of this part if you did work aggregating the metrics per input pipeline.

func filteredSnapshot(r *monitoring.Registry, requestedType string) []map[string]any {
metrics := monitoring.CollectStructSnapshot(r, monitoring.Full, false)
filtered := make([]map[string]any, 0, len(metrics))
for _, ifc := range metrics {
m, ok := ifc.(map[string]any)
if !ok {
continue
}
// Require all entries to have an 'input' and 'id' to be accessed through this API.
if id, ok := m["id"].(string); !ok || id == "" {
continue
}
if inputType, ok := m["input"].(string); !ok || (requestedType != "" && !strings.EqualFold(inputType, requestedType)) {
continue
}
filtered = append(filtered, m)
}
return filtered
}

The API response would then look something like

[
  {
    "events": { # NEW METRICS
      "failed": 0,
      "filtered": 0,
      "published": 1984,
      "total": 0
    },
    "batch_read_period": {
      "histogram": {
        "count": 6388,
        "max": 1108938500,
        "mean": 121099029.8828125,
        "median": 96635900,
        "min": 6726500,
        "p75": 102812500,
        "p95": 114321550,
        "p99": 1096386525,
        "p999": 1108896965,
        "stddev": 163688364.02907938
      }
    },
    "discarded_events_total": 0,
    "errors_total": 0,
    "id": "winlog-system.security-58eacef7-041e-49fe-b276-58f0b5f9b2c2",
    "input": "winlog",
    "provider": "Security",
    "received_events_count": {
      "histogram": {
        "count": 6389,
        "max": 100,
        "mean": 95.3232421875,
        "median": 100,
        "min": 3,
        "p75": 100,
        "p95": 100,
        "p99": 100,
        "p999": 100,
        "stddev": 15.715736752558572
      }
    },
    "received_events_total": 611509,
    "source_lag_time": {
      "histogram": {
        "count": 611509,
        "max": 23587791100,
        "mean": 2871361958.3984375,
        "median": 1054898450,
        "min": 53831700,
        "p75": 4440275750,
        "p95": 8395522725,
        "p99": 18535951875,
        "p999": 23581712035.000004,
        "stddev": 3709899503.770204
      }
    }
  }
]

@AndersonQ
Copy link
Member

Hi, thanks, I'm just out of a chat with Fae and she explained me better how those metrics end up on ES. Which is aligned with what you just said. :)

Reporting them in a way they can get in a efficient form on ES seems key part of this task. It also seems to need to change metricbeat to collect them.

I see now why having them inside the pipeline metrics isn't good. But the pipeline seems the best place to collect them. As long as the metrics get reported in the right place, where it's collected won't be a problem.

I'll tinker a bit more. Probably Monday I'll have something more concrete.

@AndersonQ
Copy link
Member

Hello folks,

I've been doing a mix of POC and investigation to see how deep this rabbit hole goes.

TL;DR:

  • need to extend metricbeat beat module to collect the new data. It needs to query /inputs/, currently it doesn't. Also the schema it fetches is hard coded, so any addition will require the module to be updates. It should be its own task.
  • add the new dashboards, also I believe it can be its own issue
  • there isn't a single place to get the metrics for all inputs in standalone mode. So either we require the user to configure the input to add metadata or it'd require to change every single input.

I confess I'm more inclined to require the users to configure their inputs to add the needed metadata as the agent does already. It'd allow a much more generic approach which would work for any input.

What do you think @nimarezainia and @andrewkroh?

The long version:

As I said before, collecting the per input data in the pipeline would be ideal as it'd be a single place to modify and all the inputs would benefit from that. However if the beats aren't running under agent, the needed information (inputID or streamID) might not be there. Therefore it would not work for stand alone. What seems to be always there is the input type, what would end up aggregating the metrics of several inputs of the same type on one place.

Making the input and pipeline to communicate is possible with wither a significant refactor or some hacky solution. I've experimented with that, it's possible without a bit refactor, but it'd require all inputs to be updated to receive the status of each event.

Another option would be to require the inputs to add the metadata to the events for the new metrics to be available. That could be a simpler solution even thought it adds responsibility on the user side.

Regardless of the solution it'll also require to add to metricbeat beat module the hability to query the /inputs endpoint, what it does not do right now.

@jlind23
Copy link
Collaborator

jlind23 commented Feb 4, 2025

@AndersonQ before jumping straight into the implementation details can we please work on an RFC here to make sure we have all potential solutions in mind?

cc @pierrehilbert

@AndersonQ
Copy link
Member

@jlind23 that's what I'm going for, I just needed to understand better the current state of things before proposing something that would be complex to implement or without enough information to compare different approaches.

@cmacknz
Copy link
Member

cmacknz commented Feb 4, 2025

@AndersonQ before jumping straight into the implementation details can we please work on an RFC here to make sure we have all potential solutions in mind?

Just to clarify I don't think we need a full RFC sent out to multiple teams. What we need is more like a 1-2 page summary of what the final design will look like, and we can do some prototyping to find out what it should look. That can be either a google doc or just a comment here depending how easy to summarize it is.

@AndersonQ
Copy link
Member

Just to clarify I don't think we need a full RFC sent out to multiple teams. What we need is more like a 1-2 page summary of what the final design will look like, and we can do some prototyping to find out what it should look. That can be either a google doc or just a comment here depending how easy to summarize it is.

I was going to suggest that, I there isn't enough for an RFC. I'll reorganise what I started and put here.

@AndersonQ
Copy link
Member

AndersonQ commented Feb 5, 2025

Problem Statement

Currently the metrics related to event publishing come from the pipeline and the output. Both of them report the metrics for all events processed by them, without discerning which input produced the events. Those are the metrics currently* available:

  • Pipeline: active, filtered, published, total, dropped
  • Output: acked, active, batches, total`

Whereas it allows to understand and investigate issues in the pipeline and output, it does not help to understand if there is a specific input misbehaving, if the filtered or dropped events are mostly from a specific input which might indicate a misconfiguration or other input specific problem.

*there are more, but not relevant for this discussion

How to expose and consume the new metrics

Elastic Agent users

They will profit from the new metric and dashboards out of the box. No extra configuration will be required and the new or modified dashboards will be added by the Elastic Agent integration. Also the diagnostics will include the new metrics on each components/componentID/input_metrics.json.

Beasts standalone users

  • If they have monitoring.elasticsearch enabled
  • If they have http.enabled, the metric will be available though the /inputs endpoint
  • If they have logging.metrics and the inputs namespace, the new metrics will be logged:
logging.metrics:
  - enabled:
  - namespaces: [inputs]
  • The setup command creates the new dashboards

Proposed design

The publishing pipeline already has the desired metrics, however they are not input specific. If they were, it’d solve the question of how to collect them.

When running in managed mode, the Elastic Agent configures the inputs to add metadata to the events that can be used by the publishing pipeline to know which input generated the event. The metadata can easily be added to standalone beats configuration and the lack of such metadata would only mean the input specific metrics would not be tracked.

The main advantage of using the publishing pipeline to track the new metrics is containing the changes to a single place, without having to change every single input and avoiding having to ensure any future input would adhere to collect these metrics as well. Besides, making the publishing pipeline report back the status of each event would require either a big change in the code or a hacky solution.

The solution is to have the publishing pipeline collect the metrics and aggregate them by event.Meta[“input_id”]. If an event does not contain the necessary metadata, it will not count to the input specific metrics. As the metadata is added by a processor in the input, if input_id isn’t present in the metadata, this input will not have the input specific metrics.

The required config would be:

      processors:
        - add_fields:
            fields:
                input_id: INPUT_ID
            target: '@metadata'

The new metrics will be stored on a new monitoring namespace, internal, to avoid any conflict with the existing metrics and avoid each input having to be changed to support the new metrics.
Using a new global and independent namespace for the metrics it’ll be simple and safe to collect them to merge with the existing input metrics already reported by:

  • the /inputs monitoring endpoint
  • the Elastic Agent’s diagnostic components/componentID/input_metrics.json

They might be added to

As the current input metrics aren’t sent to ES. Tt will require extending monitoring index (.monitoring-beats-) mapping. The current format of the input metrics seems to be good and prevents a field explosion. Just using the API schema as the ES index mapping is safe because each inputID does not create a field, there is no key in the API JSON schema that is the inputID, instead each input metric has an id and input which define from which input it came from. E.g.:

{"id":"my-filestream-id","input":"filestream","bytes_processed_total":0,"events_dropped_total":0,"events_filtered_total":0,"events_published_total":0,"events_processed_total":0,"files_active":0,"files_closed_total":0,"files_opened_total":0,"messages_read_total":0,"messages_truncated_total":0,"processing_errors_total":0,"processing_time":{"histogram":{"count":0,"max":0,"mean":0,"median":0,"min":0,"p75":0,"p95":0,"p99":0,"p999":0,"stddev":0}}}

Besides the current Agent and standalone beats dashboards need to be updated or new dashboards need to be created. The Agent dashboards will require changes in the Elastic Agent package and the standalone beats dashboard in the setup command.

Inputs without an ID or with an empty cannot have this metric tracked, however any solution would have this restriction as without a way to uniquely identify the input, it’d not be possible to differentiate the metrics from this input to other inputs.

Alternative Designs

Input ID passed to the publishing pipeline as an argument or status returned as a return parameter to the input

The publish pipeline interface is modified to receive the inputID as a parameter or to return the event status as a return parameter. Whereas it’s a more explicit way of passing down the inputID or returning the status to the input, it’d require changing not only the publishing pipeline Client interface, but several other similar interfaces the various beats and inputs use between the event being produced and sent to the publishing pipeline. Also it’d require changes to all inputs.

Publish pipeline ‘send back’ final status of the event by an alternative channel

Either the publishing pipeline has a communication channel the inputs can subscribe to or the beat.Event could be modified to allow the inputs to read the status of the event. The former would require all inputs to be modified to subscribe to the event status. The latter would be a hacky solution that besides requiring changes on all inputs would change the beat.Event which is widely used through all beats code.

Testing

Besides unit tests, integration or e2e tests need to be added:

  • On the beats to ensure the metrics are collected and published as expected
  • On metricbeat to ensure the changes to the beat module works as expected
  • On the Elastic Agent to ensure when metric collection is enabled the new metrics are collected

Open questions

  • Do it need to also cover the OTel receivers that are not beats?
  • Do it work out of the box if the inputs are used as OTel receivers?
    • I checked how the metrics will be reported when running as a beat receiver and I've already adjusted the implementation. So as long as the input and current publish pipeline are used, the metrics will be there when the beats run as OTel receivers

@nimarezainia, @cmacknz, @andrewkroh what do you think?
As I said, I believe requiring the users to add the input_id isn't that bad considering other alternatives would require a much bigger change, making it taking a lot longer to be available to all inputs we currently have.

cc: @pierrehilbert, @jlind23

@cmacknz
Copy link
Member

cmacknz commented Feb 5, 2025

The solution is to have the publishing pipeline collect the metrics and aggregate them by event.Meta[“input_id”]

You need the input type + the input ID. For standalone agents the input ID can be the empty string.

They might be added to

As the current input metrics aren’t sent to ES. Tt will require extending monitoring index (.monitoring-beats-) mapping

Integrating any of this with Beats stack monitoring should be out of scope here. We want to augment the input metrics in their current form, not make them available in new places. If we want to do that it can be done separately. Focus on making the new metrics available in the places where input metrics are currently available.

      processors:
        - add_fields:
            fields:
                input_id: INPUT_ID
            target: '@metadata'

The processor based approach will add some small performance overhead, we see the additional processors Elastic Agent adds that a standalone Filebeat doesn't as a cost in our benchmarks.

Can you avoid the processor and just unconditionally include this metadata in the Publish call to the pipeline?

Either the publishing pipeline has a communication channel the inputs can subscribe to

This does exist, it's the EventListener in the pipeline client and is the basis of input state updates like the filestream registry.

// Callbacks for when events are added / acknowledged
EventListener EventListener

You would have to subscribe every input to it, which isn't as nice as just putting metadata in the pipeline. It also doesn't tell you why the event was dropped and adding that would be complicated I think.

// EventListener can be registered with a Client when connecting to the pipeline.
// The EventListener will be informed when events are added or dropped by the processors,
// and when an event has been ACKed by the outputs.
//
// Due to event publishing and ACKing are asynchronous operations, the
// operations on EventListener are normally executed in different go routines. ACKers
// are required to be multi-threading safe.
type EventListener interface {
// AddEvent informs the listener that a new event has been sent to the client.
// AddEvent is called after the processors have handled the event. If the
// event has been dropped by the processor `published` will be set to false.
// This allows the ACKer to do some bookkeeping for dropped events.
AddEvent(event Event, published bool)
// ACK Events from the output and pipeline queue are forwarded to ACKEvents.
// The number of reported events only matches the known number of events downstream.
// ACKers might need to keep track of dropped events by themselves.
ACKEvents(n int)
// ClientClosed informs the ACKer that the Client used to publish to the pipeline has been closed.
// No new events should be published anymore. The ACKEvents method still will be called as long
// as long as there are pending events for the client in the pipeline. The Close signal can be used
// to suppress any ACK event propagation if required.
// Close might be called from another go-routine than AddEvent and ACKEvents.
ClientClosed()
}

You will still need to detect dropped events in two places regardless:

  1. After processors run to detect filtered out events
    c.eventListener.AddEvent(e, publish)
  2. At the end of the pipeline for batch handling, unless you want to modify every output.
    // All events have been acknowledged by the output.
    ACK()
    // Give up on these events permanently without sending.
    Drop()

@AndersonQ
Copy link
Member

Can you avoid the processor and just unconditionally include this metadata in the Publish call to the pipeline?

I don't see a way of doing that without having to change every input or something close to that :/

I think the ideal scenatio would be to have the inputID adn type in the event struct to be used by anything manipulating the event, but that isn't the case.

Besides if those metrics primary use-case if for integrations, then the agent already add the necessary metadata.

You would have to subscribe every input to it, which isn't as nice as just putting metadata in the pipeline. It also doesn't tell you why the event was dropped and adding that would be complicated I think.

Exactly

You will still need to detect dropped events in two places regardless:

I believe you're mixing filtered and dropped here. The pipeline already counts:

  • onFilteredOut: when the processors filter them out
  • onDroppedOnPublish: when the event isn't published

I'm just spiting those by input.

@AndersonQ
Copy link
Member

I got something working :)

Image

@andrewkroh did you have any more specific idea for the charts to add to the dashboards? Or shall I just come up with something and show you?

@nimarezainia @cmacknz any suggestion for dashboard/chart?

As I said before, for the beats running under agent, it works out of the box, for standalone beats they'd need to add an addField processor to add stream_id matching the input ID:

filebeat.inputs:
- type: filestream
  id: my-filestream-id
  enabled: true
  paths:
    - /tmp/*.log
  processors:
    - add_fields:
        fields:
          stream_id: my-filestream-id
        target: '@metadata'

it's using stream_id because what the agent adds as input_id it uses the inputID from the policy, but later on when it's converted to beats config this inputID isn't used, instead that is passed to the beats as inputID is the streamID.

@cmacknz
Copy link
Member

cmacknz commented Feb 7, 2025

As I said before, for the beats running under agent, it works out of the box, for standalone beats they'd need to add an addField processor to add stream_id matching the input ID:

I'm still not convinced you need to rely on processors at all, you should just be able to write directly to the event metadata in the publish client that every input has an instance of it.

if c.processors != nil {
var err error
event, err = c.processors.Run(event)
publish = event != nil
if err != nil {
// If we introduce a dead-letter queue, this is where we should
// route the event to it.
c.logger.Errorf("Failed to publish event: %v", err)
}
}
if event != nil {
e = *event
}

it's using stream_id because what the agent adds as input_id it uses the inputID from the policy, but later on when it's converted to beats config this inputID isn't used, instead that is passed to the beats as inputID is the streamID.

stream_id is an Elastic Agent only concept, it shouldn't be mixed into the Beats code. You should only need to look at the id property of the input itself, as the control protocol translation code does the job of converting a stream ID into a beat input with the stream ID as the input ID.

Also, looking at the generation code, we actually include the stream_id as a process already. We can probably remove this. I think it was added to support the original shipper. add_fields potentially clones the event IIRC and there is a noticeable performance overhead from each additional processor we add.

if streamID := streamExpected.GetId(); streamID != "" {
sourceStream := generateAddFieldsProcessor(mapstr.M{"stream_id": streamID}, "@metadata")
processors = append(processors, sourceStream)
}

@andrewkroh
Copy link
Member Author

@andrewkroh did you have any more specific idea for the charts to add to the dashboards? Or shall I just come up with something and show you?

The current "Agent Metrics" dashboard has output level metrics (e.g. active, filtered, published, total, dropped) broken down by the sub-process (e.g. component.id). I think we should should have a very similar view where instead it splits by the input ID. @strawgate was showing me a refreshed "Agent Metrics" dashboard so I would defer to him about where exactly these visualizations should go.

@AndersonQ
Copy link
Member

I'm still not convinced you need to rely on processors at all, you should just be able to write directly to the event metadata in the publish client that every input has an instance of it.

I haven't found how the pipeline client can know the inputID for each event without either relying on some event field or metadata OR having to change the inputs to add it to the beats.Event or changing the method signature to include the inputID.
It's an option, but will require much more effort to have all inputs complying with that.

stream_id is an Elastic Agent only concept, it shouldn't be mixed into the Beats code. You should only need to look at the id property of the input itself, as the control protocol translation code does the job of converting a stream ID into a beat input with the stream ID as the input ID.

at the pipeline client right now there is not access to the inputID. The input_id in the event metadata is the agent's inputID which isn't the same ID the input uses. That's why I had to change to stream_id.

@andrewkroh is there a specific input or a set of inputs that would be the most important to have the new metrics? If we go the route of modifying each input, knowing that would be useful.

@andrewkroh
Copy link
Member Author

is there a specific input or a set of inputs that would be the most important to have the new metrics?

httpjson, cel, aws-s3, and logfile are main ones I usually find myself debugging. In terms in the source code of log data streams at elastic/integrations@6bc0dc8 these are the top used inputs:

Image

@AndersonQ
Copy link
Member

@andrewkroh thanks!

I was talking to Craig and he really wants to avoid relying on the event metadata added by processors, so we decided to go the route which will require changes per input. Well, perhaps for some inputs there are a generic place which will make the new metric work for a set of inputs. Anyway we'll need to choose the inputs to have the new metrics on and check case by case

@AndersonQ
Copy link
Member

@andrewkroh, @cmacknz, @flexitrev, @pierrehilbert

While I was doing the integration tests I've discovered the pipeline client metric for dropped/failed events metrics don't make much sense. Hardily the queue rejects events, @faec can explain better what this metric was intended for. We're were talking yesterday and she was saying me this metric is rather confusing and normally misunderstood.

As I said in the beginning, the amount of change to be able to have a proper track of the event per input from the input until the output would require a considerable amount of changes as the current architecture isn't meant for that. Also it wouldn't be reusable on OTel. So I'd bring this up to discuss if it's indeed what should be pursued.

As I was talking to @faec she was saying perhaps it'd be better to direct the efforts to have it done on OTel instead of making it on the current beats even if with some hacky solution. Even using some new event metadata, even without a processor adding it, would require changes on the inputs and outputs and I would not assume the additional overhead is negligible. Besides the fact it'd create a contract between input and output to use a new metadata field and it's not be portable to OTel.

The filtered metrics are there, yesterday I got them into the current filestream dashboard, see below. Looking at this dashboard, in addition to the httpjson, cel, aws-s3, and logfile we could try to cover all of them with the pipeline filtered metric. Not all of them publish a events processed metric, so it wouldn't make sense to have a filtered metrics for those, but for the others it's a valuable metric.

Image

@cmacknz
Copy link
Member

cmacknz commented Feb 14, 2025

None of the existing pipeline metrics we rely on will apply in the otel collector, that's not a reason not to iterate on things that already exist. I would agree we should not embark on a massive refactor of the Beats pipeline at this time, but we can take on smaller work to provide a better view into the existing metrics, or enhance the existing metrics to be more accurate.

While I was doing the integration tests I've discovered the pipeline client metric for dropped/failed events metrics don't make much sense. Hardily the queue rejects events, @faec can explain better what this metric was intended for. We're were talking yesterday and she was saying me this metric is rather confusing and normally misunderstood.

I don't think there was an ever an intent to track events dropped by the queue, events get dropped by the output or filtered out by processors. Not the queue.

What metrics specific metrics are you talking about here specifically, where in the code are they currently defined? If they don't make sense, should we shift the focus to fix or remove them?

I originally suggested adding the metrics to the output, primarily to avoid having to create the feedback loop from the output back to the input metrics which I suspect is the hardest part of this. Is that still viable?

We track detailed stats for each batch we attempt to send and we have a reference to the underlying beat.Event when that happens. Can we just take the existing metrics and break them down by datastream or input ID? I think that would accomplish the goal of making this visible just in a slightly less convenient way.

func (client *Client) applyItemStatus(
event publisher.Event,

Taking the existing metrics and breaking them down by index or source doesn't create any net new problems, the metrics being broken down are in the same place with respect to any equivalence in our OTel collector.

@AndersonQ
Copy link
Member

Can we just take the existing metrics and break them down by datastream or input ID?

It isn't as simple as it looks like. Using the current streamID or inputID we had agreed isn't ideal because we want to remove the processors which add them. It's possible to have the inputs to add it directly to the event's metadata and now that #42559 is merged, it'll be available. However it isn't straight forward and I'd say we'd need to evaluate the overhead it might cause to check each event status once ES sends its response. There might be events from different inputs on a batch as far as I know. And also consider if it should be added to other outputs and if it's actually possible to do so.

Anyway to track the dropped is quite different then tracking the filtered. The filtered happens when the processors run and this metric is already being collected. So I believe we should track them as different tasks.

In conclusion what I'm getting is we want to explore how to track the dropped events. So it's worth to to come up with a proposal for that.

@cmacknz
Copy link
Member

cmacknz commented Feb 14, 2025

However it isn't straight forward and I'd say we'd need to evaluate the overhead it might cause to check each event status once ES sends its response.

We already iterate over every single event because ES sets an HTTP status code per event in a _bulk request. That is what this function is doing:

func (client *Client) applyItemStatus(
event publisher.Event,
itemStatus int,
itemMessage []byte,
stats *bulkResultStats,
) bool {
encodedEvent := event.EncodedEvent.(*encodedEvent)
if itemStatus < 300 {
if encodedEvent.deadLetter {
// This was ingested into the dead letter index, not the original target
stats.deadLetter++
} else {
stats.acked++
}
return false // no retry needed
}
if itemStatus == 409 {
// 409 is used to indicate there is already an event with the same ID, or
// with identical Time Series Data Stream dimensions when TSDS is active.
stats.duplicates++
return false // no retry needed
}
if itemStatus == http.StatusTooManyRequests {
stats.fails++
stats.tooMany++
return true
}
if itemStatus < 500 {
// hard failure, apply policy action
if encodedEvent.deadLetter {
// Fatal error while sending an already-failed event to the dead letter
// index, drop.
client.pLogDeadLetter.Add()
client.log.Errorw(fmt.Sprintf("Can't deliver to dead letter index event '%s' (status=%v): %s", encodedEvent, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
stats.nonIndexable++
return false
}
if client.deadLetterIndex == "" {
// Fatal error and no dead letter index, drop.
client.pLogIndex.Add()
client.log.Warnw(fmt.Sprintf("Cannot index event '%s' (status=%v): %s, dropping event!", encodedEvent, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
stats.nonIndexable++
return false
}
// Send this failure to the dead letter index and "retry".
// We count this as a "retryable failure", and then if the dead letter
// ingestion succeeds it is counted in the "deadLetter" counter
// rather than the "acked" counter.
client.pLogIndexTryDeadLetter.Add()
client.log.Warnw(fmt.Sprintf("Cannot index event '%s' (status=%v): %s, trying dead letter index", encodedEvent, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
encodedEvent.setDeadLetter(client.deadLetterIndex, itemStatus, string(itemMessage))
}
// Everything else gets retried.
stats.fails++
return true
}

We'd be incrementing different counters which doesn't worry me, but we do have to sanity check it. We need to avoid having to decode the pre-serialized event, looks like #42559 does what we'd need already.

If what you are saying is you want to say that filtered events are now tracked and supporting dropped events becomes a separate issue or set of PRs then sure.

@AndersonQ
Copy link
Member

If what you are saying is you want to say that filtered events are now tracked and supporting dropped events becomes a separate issue or set of PRs then sure.

yes, that's what I meant.

We'd be incrementing different counters which doesn't worry me, but we do have to sanity check it.

I was looking at that yesterday, It's doable, but I would not assume the overhead is negligible. I just don't wanna inadvertently decrease the output performance to track dropped events per input.

@cmacknz
Copy link
Member

cmacknz commented Feb 14, 2025

Yeah we should measure the impact it has in the end but it doesn't appear like such a bad idea that we shouldn't even try is the point I'm trying to make. You could probably get some idea of this with a microbenchmark since the impact will be contained into that one function we know is on the hot path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants