Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kafka sink): Fix Kafka partition key on metric events #20246

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

jerryjvl
Copy link

@jerryjvl jerryjvl commented Apr 7, 2024

Currently, the Kafka sink implementation of key_field uses a simple lookup into tags for metrics. Unfortunately, this does not work because the round-trip through parsing the configuration results in a forced . prefix, which only works if the tag name also contains an explicit dot at the start.

Fixes: #20217

A workaround that I used in production:

[transforms.remap]
type = "remap"
input = [ "...some source..." ]
source = `.tags.".metric_name" = .name`

[sinks.kafka]
type = "kafka"
input = [ "remap" ]
key_field = ".metric_name"
...

This fix tries to improve the situation by:

  • Supporting .name explicitly, so the hack above can be removed entirely
  • Supporting .tags.<tag-name> explicitly, so that config can also be implemented in a way that will remain compatible when the Metric type structure eventually implements Value compatibility for VRL-style de-referencing.
  • Supporting the current behaviour as a fall-through for anyone running an existing pipeline using key_field so there is at least one version that can be used as a graceful cross-over for deprecation of the previous behaviour.
  • Implementing some unit-tests for these behaviours.

@bits-bot
Copy link

bits-bot commented Apr 7, 2024

CLA assistant check
All committers have signed the CLA.

@github-actions github-actions bot added the domain: sinks Anything related to the Vector's sinks label Apr 7, 2024
@jerryjvl
Copy link
Author

jerryjvl commented Apr 14, 2024

Aside on the implementation:

  • The nested matches are a bit clunky -- maybe there is shorter idiomatic way to achieve this? In a proper implementation of Value semantics across Metrics, this'd be recursive traversal, but I don't think implementing that is in-scope for this bug-fix. I tried to do enough checking so that the eventual implementation of Value does not have to deal with me having introduced any weird edge-cases.
  • Should I make any edits to the documentation to key_field to document the extended behaviour and/or clarify supported patterns for both Logs vs Metrics? Decided I should do this regardless.

@jerryjvl jerryjvl marked this pull request as ready for review April 14, 2024 06:05
@jerryjvl jerryjvl requested a review from a team as a code owner April 14, 2024 06:05
@jerryjvl jerryjvl requested review from a team as code owners April 15, 2024 05:33
@github-actions github-actions bot added the domain: external docs Anything related to Vector's external, public documentation label Apr 15, 2024
Copy link
Contributor

@estherk15 estherk15 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed and approved by documentation

Copy link
Member

@lukesteensen lukesteensen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this @jerryjvl! With some help from @bruceg I was able to track down the closest equivalent to this logic that exists, in the implementation of VrlTarget:

/// Retrieves a value from a the provided metric using the path.
/// Currently the root path and the following paths are supported:
/// - name
/// - namespace
/// - timestamp
/// - kind
/// - tags
/// - tags.<tagname>
/// - type
///
/// Any other paths result in a `MetricPathError::InvalidPath` being returned.
fn target_get_metric<'a>(
path: &OwnedValuePath,
value: &'a Value,
) -> Result<Option<&'a Value>, String> {
if path.is_root() {
return Ok(Some(value));
}
let value = value.get(path);
for paths in path.to_alternative_components(MAX_METRIC_PATH_DEPTH) {
match paths.as_slice() {
["name"] | ["kind"] | ["type"] | ["tags", _] => return Ok(value),
["namespace"] | ["timestamp"] | ["tags"] => {
if let Some(value) = value {
return Ok(Some(value));
}
}
_ => {
return Err(MetricPathError::InvalidPath {
path: &path.to_string(),
expected: VALID_METRIC_PATHS_GET,
}
.to_string())
}
}
}
// We only reach this point if we have requested a tag that doesn't exist or an empty
// field.
Ok(None)
}

I agree that something like this should exist as a method on Metric, where it'd be easy to reuse here. The implementation above is not quite suitable, since it operates against the precomputed Value representation of the metric instead of the Metric type itself.

It seems like the best path would be to merge these two implementation into one method on Metric, using the logic from the existing VrlTarget method but applying it to the Metric instead of the Value.

Does that make sense to you as a next iteration? I'm happy to push some example code if that would be helpful.

@jerryjvl
Copy link
Author

jerryjvl commented Apr 26, 2024

Does that make sense to you as a next iteration? I'm happy to push some example code if that would be helpful.

I think that's enough context for me to give it a try; thanks!

I'd still like to maintain the fall-through logic in the key_field lookup to use the value directly as a tag-name when the consolidated logic returns an Ok(None) so that the fix does not immediately break any existing production work-arounds... notably the one we implemented to work around the bug :)

@jszwedko jszwedko added the meta: awaiting author Pull requests that are awaiting their author. label May 3, 2024
@lukesteensen
Copy link
Member

I'd still like to maintain the fall-through logic in the key_field lookup to use the value directly as a tag-name when the consolidated logic returns an Ok(None) so that the fix does not immediately break any existing production work-arounds... notably the one we implemented to work around the bug :)

Yeah that makes sense to me! Definitely don't want to introduce a breaking change unnecessarily. Ideally we could separate out that logic so that we can cleanly deprecate and remove it in future releases.

@jerryjvl
Copy link
Author

jerryjvl commented May 5, 2024

@lukesteensen I have some work-in-progress locally, but I'm running into some complexities that I want to query to make sure I'm not going about this completely the wrong way. (I'm neither a Rust veteran nor a Vector source-code veteran yet).

I have effectively created get() and get_mut() methods on the Metric implementation with the same signature as exist on LogEvent, which makes the implementation of the get_key() method very trivial and symmetrical.

But when I'm trying to consolidate the logic required for get_key() and target_get_metric() into this shared method on Metric, the biggest issues I have are with the assumption target_get_metric makes that it is operating on a Value.

It means that for an is_root() path, I'm forced to consider re-processing the Metric into a Value, so I can return it as such; when called from get_key() this is unavoidable, but from target_get_metric() I'm effectively then duplicating work. However, I cannot implement the logic with the assumption of a pre-made Value existing because that's not true for get_key(). There are a few other places where this same impedance mismatch pinches at a simple implementation.

I don't know how deep this distinction goes, but it seems for logging, data gets transformed into Values at a much deeper level, whereas for Metrics it's maybe a bit more opportunistic? And the last thing I want to do here is add technical debt by creating duplicated runtime work, with potential performance side-effects.

I don't know the Vector code-base well enough (yet) to know if there is an easy upstream location where these calls could more easily be consolidated without the trade-off downsides I'm struggling with at the level I'm trying to (minimally) intervene at.

@lukesteensen
Copy link
Member

@jerryjvl I see what you mean, thanks for clarifying. Sorry for the delayed response, but here's roughly how I would think about it:

It seems to me that the precomputed Value for metrics in VrlTarget is something of a convenience for implementing these methods given that we know we are in the context of VRL and expect some processing to be happening that will justify the cost of the conversion. More optimal would likely be to lazily convert fields of the metric as needed, but that kind of change is far outside the scope of this PR.

Given the different contexts, it may be simpler to just have two different but related methods. For the non-VRL use case, we should not need to worry about things like returning the root path or other composite Values, mostly just simple strings. So it may make sense to have a method that is more focused on that simple use case and maybe down the line we can figure out a nice way to unify with the more general VRL target method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: external docs Anything related to Vector's external, public documentation domain: sinks Anything related to the Vector's sinks meta: awaiting author Pull requests that are awaiting their author.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Metrics sent to a Kafka Sink partitioning on key_field does not lookup tags correctly
5 participants