Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(enriching): add memory enrichment table #21348

Merged
merged 54 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
d919678
feat(vrl): add caching feature for VRL
esensar Sep 24, 2024
2a3b3dd
Add reads and writes metrics for VRL cache
esensar Sep 26, 2024
759ec92
Rename `cache_set` to `cache_put`
esensar Sep 26, 2024
18ef6f0
Add `cache_delete` VRL function
esensar Sep 26, 2024
45995ec
Add key not found test case for `cache_get`
esensar Sep 26, 2024
74b1825
Add placeholder implementation for fetching TTL using `cache_get`
esensar Oct 8, 2024
8c46422
Add memory enrichment table
esensar Oct 13, 2024
ec6efad
Add basic sink implementation for memory enrichment table
esensar Oct 15, 2024
a56b51c
Add ttl, refresh and scan intervals implementation for memory enrichm…
esensar Oct 24, 2024
4984db7
Merge branch 'master' into feature/vrl-cache
esensar Oct 24, 2024
65a05f3
Add internal events for memory enrichment table
esensar Oct 24, 2024
f217abf
Remove initial implementation of VRL cache
esensar Oct 24, 2024
724e6f6
Remove table name from memory enrichment table events
esensar Nov 6, 2024
8123227
Add `SinkConfig` impl for `MemoryConfig`
esensar Nov 21, 2024
112b9fc
Hook up enrichment tables as sinks when possible
esensar Dec 4, 2024
e17880a
Fix flushing in memory enrichment table
esensar Dec 4, 2024
5c6f7ac
Fix failing memory table tests
esensar Dec 4, 2024
5032181
Remove enrichment_tables from Graph and use them as sinks
esensar Dec 16, 2024
93e90dd
Wrap memory metadata in a single object for easier mutex usage
esensar Dec 16, 2024
b3c4488
Merge branch 'master' into feature/vrl-cache
esensar Dec 16, 2024
91d1121
Add byte size limit to memory enrichment table
esensar Dec 16, 2024
dfff8f9
Remove unnecessary duplicated key from memory table entries
esensar Dec 16, 2024
cb4dacb
Remove debugging log from memory table
esensar Dec 17, 2024
31c75fc
Ensure `ConfigDiff` takes tables into account
esensar Dec 19, 2024
2d1ed34
Implement running topology changes for enrichment_tables like sinks
esensar Jan 6, 2025
ce0b357
Make memory tables visible in `vector top` as sinks
esensar Jan 7, 2025
8524b6a
Remove unnecessary clone when handling events
esensar Jan 8, 2025
7b4712c
Fix tests after removing clones in `handle_value`
esensar Jan 8, 2025
6aa6a82
Reduce key clones when writing to memory table
esensar Jan 8, 2025
5b94be3
Merge branch 'master' into feature/vrl-cache
esensar Jan 9, 2025
3f86c32
Store data in memory table as JSON strings instead of `Value` objects
esensar Jan 9, 2025
4bd0cff
Enable configuration for disabling key tag in internal metrics
esensar Jan 9, 2025
ed659e9
Add changelog entry
esensar Jan 14, 2025
97f6a5b
Merge branch 'master' into feature/vrl-cache
esensar Jan 14, 2025
3531ee0
Add docs for memory enrichment_table
esensar Jan 14, 2025
38bfdde
Fix typo in memory table docs
esensar Jan 14, 2025
0da8298
Apply suggestions from code review in documentation
esensar Jan 15, 2025
372b3f6
Apply docs suggestions in code too
esensar Jan 15, 2025
cb462f4
Run scan and flush on intervals and not only on writes
esensar Jan 16, 2025
38e8388
Ensure `scan_interval` can't be zero and handle zero `flush_interval`
esensar Jan 16, 2025
7f3a8f6
Rename `sinks_and_table_sinks` to `all_sinks`
esensar Jan 20, 2025
fabfe7c
Use `Option` instead of `0` for optional memory config values
esensar Jan 20, 2025
b57c359
Rename `MemoryTableInternalMetricsConfig` to `InternalMetricsConfig`
esensar Jan 20, 2025
97d6214
Remove enrichment table `unwrap` from topology running
esensar Jan 20, 2025
2986613
Use `expect` instead of `unwrap` when capturing write lock
esensar Jan 20, 2025
bb8d13f
Fix typo in `src/enrichment_tables/memory/table.rs`
esensar Jan 20, 2025
e1e8dc6
Add a how it works section for memory enrichment table
esensar Jan 21, 2025
e5693b9
Add documentation above `as_sink` per discussion on the PR
esensar Jan 21, 2025
d4c7f9f
Fix enrichment memory table tests
esensar Jan 21, 2025
ba2d548
Fix spellcheck error
esensar Jan 21, 2025
0e445d3
Update LICENSE-3rdparty
esensar Jan 22, 2025
012a07b
Fix cue formatting
esensar Jan 22, 2025
9f86f95
Merge remote-tracking branch 'upstream' into feature/vrl-cache
esensar Jan 23, 2025
156e4d7
Use NonZeroU64 new_unchecked to fix MSRV check
esensar Jan 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ dirs-next = { version = "2.0.0", default-features = false, optional = true }
dyn-clone = { version = "1.0.17", default-features = false }
encoding_rs = { version = "0.8.35", default-features = false, features = ["serde"] }
enum_dispatch = { version = "0.3.13", default-features = false }
evmap = { version = "10.0.2", default-features = false, optional = true }
evmap-derive = { version = "0.2.0", default-features = false, optional = true }
exitcode = { version = "1.1.2", default-features = false }
flate2.workspace = true
futures-util = { version = "0.3.29", default-features = false }
Expand Down Expand Up @@ -371,6 +373,7 @@ tokio-tungstenite = { version = "0.20.1", default-features = false, features = [
toml.workspace = true
tonic = { workspace = true, optional = true }
hickory-proto = { workspace = true, optional = true }
thread_local = { version = "1.1.8", default-features = false, optional = true }
typetag = { version = "0.2.18", default-features = false }
url = { version = "2.5.4", default-features = false, features = ["serde"] }
warp = { version = "0.3.7", default-features = false }
Expand Down Expand Up @@ -517,9 +520,10 @@ protobuf-build = ["dep:tonic-build", "dep:prost-build"]
gcp = ["dep:base64", "dep:goauth", "dep:smpl_jwt"]

# Enrichment Tables
enrichment-tables = ["enrichment-tables-geoip", "enrichment-tables-mmdb"]
enrichment-tables = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"]
enrichment-tables-geoip = ["dep:maxminddb"]
enrichment-tables-mmdb = ["dep:maxminddb"]
enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"]

# Codecs
codecs-syslog = ["vector-lib/syslog"]
Expand Down Expand Up @@ -972,7 +976,7 @@ remap-benches = ["transforms-remap"]
transform-benches = ["transforms-filter", "transforms-dedupe", "transforms-reduce", "transforms-route"]
codecs-benches = []
loki-benches = ["sinks-loki"]
enrichment-tables-benches = ["enrichment-tables-geoip", "enrichment-tables-mmdb"]
enrichment-tables-benches = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"]
proptest = ["dep:proptest", "dep:proptest-derive", "vrl/proptest"]

[[bench]]
Expand Down
9 changes: 8 additions & 1 deletion src/api/schema/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,14 @@ pub fn update_config(config: &Config) {
}

// Sinks
for (component_key, sink) in config.sinks() {
let table_sinks = config
.enrichment_tables()
.filter_map(|(k, e)| e.as_sink().map(|s| (k, s)))
.collect::<Vec<_>>();
for (component_key, sink) in config
.sinks()
.chain(table_sinks.iter().map(|(key, sink)| (*key, sink)))
{
new_components.insert(
component_key.clone(),
Component::Sink(sink::Sink(sink::Data {
Expand Down
14 changes: 12 additions & 2 deletions src/config/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct ConfigBuilder {

/// All configured enrichment tables.
#[serde(default)]
pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter>,
pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<String>>,

/// All configured sources.
#[serde(default)]
Expand Down Expand Up @@ -106,6 +106,11 @@ impl From<Config> for ConfigBuilder {
.map(|(key, sink)| (key, sink.map_inputs(ToString::to_string)))
.collect();

let enrichment_tables = enrichment_tables
.into_iter()
.map(|(key, table)| (key, table.map_inputs(ToString::to_string)))
.collect();

let tests = tests.into_iter().map(TestDefinition::stringify).collect();

ConfigBuilder {
Expand Down Expand Up @@ -145,11 +150,16 @@ impl ConfigBuilder {
pub fn add_enrichment_table<K: Into<String>, E: Into<EnrichmentTables>>(
&mut self,
key: K,
inputs: &[&str],
enrichment_table: E,
) {
let inputs = inputs
.iter()
.map(|value| value.to_string())
.collect::<Vec<_>>();
self.enrichment_tables.insert(
ComponentKey::from(key.into()),
EnrichmentTableOuter::new(enrichment_table),
EnrichmentTableOuter::new(inputs, enrichment_table),
);
}

Expand Down
20 changes: 18 additions & 2 deletions src/config/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
OutputId,
};

use indexmap::IndexSet;
use indexmap::{IndexMap, IndexSet};
use vector_lib::id::Inputs;

pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<String>> {
Expand Down Expand Up @@ -52,8 +52,17 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<
graceful_shutdown_duration,
allow_empty: _,
} = builder;
let sinks_and_table_sinks = sinks
esensar marked this conversation as resolved.
Show resolved Hide resolved
.clone()
.into_iter()
.chain(
enrichment_tables
.iter()
.filter_map(|(key, table)| table.as_sink().map(|s| (key.clone(), s))),
)
.collect::<IndexMap<_, _>>();

let graph = match Graph::new(&sources, &transforms, &sinks, schema) {
let graph = match Graph::new(&sources, &transforms, &sinks_and_table_sinks, schema) {
Ok(graph) => graph,
Err(graph_errors) => {
errors.extend(graph_errors);
Expand Down Expand Up @@ -85,6 +94,13 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<
(key, transform.with_inputs(inputs))
})
.collect();
let enrichment_tables = enrichment_tables
.into_iter()
.map(|(key, table)| {
let inputs = graph.inputs_for(&key);
(key, table.with_inputs(inputs))
})
.collect();
let tests = tests
.into_iter()
.map(|test| test.resolve_outputs(&graph))
Expand Down
8 changes: 7 additions & 1 deletion src/config/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,32 @@ impl ConfigDiff {
self.sources.flip();
self.transforms.flip();
self.sinks.flip();
self.enrichment_tables.flip();
self
}

/// Checks whether or not the given component is present at all.
pub fn contains(&self, key: &ComponentKey) -> bool {
self.sources.contains(key) || self.transforms.contains(key) || self.sinks.contains(key)
self.sources.contains(key)
|| self.transforms.contains(key)
|| self.sinks.contains(key)
|| self.enrichment_tables.contains(key)
}

/// Checks whether or not the given component is changed.
pub fn is_changed(&self, key: &ComponentKey) -> bool {
self.sources.is_changed(key)
|| self.transforms.is_changed(key)
|| self.sinks.is_changed(key)
|| self.enrichment_tables.contains(key)
}

/// Checks whether or not the given component is removed.
pub fn is_removed(&self, key: &ComponentKey) -> bool {
self.sources.is_removed(key)
|| self.transforms.is_removed(key)
|| self.sinks.is_removed(key)
|| self.enrichment_tables.contains(key)
}
}

Expand Down
70 changes: 66 additions & 4 deletions src/config/enrichment_table.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,79 @@
use enum_dispatch::enum_dispatch;
use serde::Serialize;
use vector_lib::config::GlobalOptions;
use vector_lib::configurable::{configurable_component, NamedComponent};
use vector_lib::configurable::{configurable_component, Configurable, NamedComponent, ToValue};
use vector_lib::id::Inputs;

use crate::enrichment_tables::EnrichmentTables;

use super::dot_graph::GraphConfig;
use super::{SinkConfig, SinkOuter};

/// Fully resolved enrichment table component.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct EnrichmentTableOuter {
pub struct EnrichmentTableOuter<T>
where
T: Configurable + Serialize + 'static + ToValue + Clone,
{
#[serde(flatten)]
pub inner: EnrichmentTables,
#[configurable(derived)]
#[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
pub graph: GraphConfig,
#[configurable(derived)]
#[serde(
default = "Inputs::<T>::default",
skip_serializing_if = "Inputs::is_empty"
)]
pub inputs: Inputs<T>,
}

impl EnrichmentTableOuter {
pub fn new<I: Into<EnrichmentTables>>(inner: I) -> Self {
impl<T> EnrichmentTableOuter<T>
where
T: Configurable + Serialize + 'static + ToValue + Clone,
{
pub fn new<I, IET>(inputs: I, inner: IET) -> Self
where
I: IntoIterator<Item = T>,
IET: Into<EnrichmentTables>,
{
Self {
inner: inner.into(),
graph: Default::default(),
inputs: Inputs::from_iter(inputs),
}
}

pub fn as_sink(&self) -> Option<SinkOuter<T>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a reasonable way to do it the way things are currently designed, but it definitely shows the limitations of how we do things with SinkOuter, etc.

If I were to re-approach it, I think I would limit SinkOuter to config deserialization (and drop the generic to just String), and map to some more granular things to build the topology (e.g. something for inputs, something for healthchecks, etc). That way we could try to unify where we handle "things that have inputs", etc.

You certainly don't need to do any of that, but it could be something to explore if you'd like to make this feel cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not sure how often components such as this one would come up, but it definitely makes sense for components such as this one, that don't really fit just one type.

I can think about it, but I think that would be a too big undertaking for this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Let's add a comment summarizing the limitation's Luke mentioned above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, it would be great if everything was just a node in a graph. Chaining sinks would unlock a lot of heavily requested features. Great topic.

self.inner.sink_config().map(|sink| SinkOuter {
graph: self.graph.clone(),
inputs: self.inputs.clone(),
healthcheck_uri: None,
healthcheck: Default::default(),
buffer: Default::default(),
proxy: Default::default(),
inner: sink,
})
}

pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> EnrichmentTableOuter<U>
where
U: Configurable + Serialize + 'static + ToValue + Clone,
{
let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
self.with_inputs(inputs)
}

pub(crate) fn with_inputs<I, U>(self, inputs: I) -> EnrichmentTableOuter<U>
where
I: IntoIterator<Item = U>,
U: Configurable + Serialize + 'static + ToValue + Clone,
{
EnrichmentTableOuter {
inputs: Inputs::from_iter(inputs),
inner: self.inner,
graph: self.graph,
}
}
}
Expand All @@ -36,4 +94,8 @@ pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync
&self,
globals: &GlobalOptions,
) -> crate::Result<Box<dyn vector_lib::enrichment::Table + Send + Sync>>;

fn sink_config(&self) -> Option<Box<dyn SinkConfig>> {
None
}
}
4 changes: 2 additions & 2 deletions src/config/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Graph {
);
}

for (id, config) in sinks.iter() {
for (id, config) in sinks {
graph.nodes.insert(
id.clone(),
Node::Sink {
Expand All @@ -133,7 +133,7 @@ impl Graph {
}
}

for (id, config) in sinks.iter() {
for (id, config) in sinks {
for input in config.inputs.iter() {
if let Err(e) = graph.add_input(input, id, &available_inputs) {
errors.push(e);
Expand Down
2 changes: 1 addition & 1 deletion src/config/loading/config_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Process for ConfigBuilderLoader {
}
Some(ComponentHint::EnrichmentTable) => {
self.builder.enrichment_tables.extend(deserialize_table::<
IndexMap<ComponentKey, EnrichmentTableOuter>,
IndexMap<ComponentKey, EnrichmentTableOuter<_>>,
>(table)?);
}
Some(ComponentHint::Test) => {
Expand Down
Loading
Loading