From d9196787612ecc01b597ce617fea321326d7f9c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 24 Sep 2024 16:40:52 +0200 Subject: [PATCH 01/49] feat(vrl): add caching feature for VRL This adds additional VRL functions for reading and storing data into caches that can be configured in global options. Caches can store any VRL value and are meant to store data for shorter periods. All data gets TTL (time-to-live) assigned, based on cache configuration and gets removed when that TTL expires. --- Cargo.lock | 10 ++ Cargo.toml | 1 + lib/vector-lib/Cargo.toml | 1 + lib/vector-lib/src/lib.rs | 1 + lib/vector-vrl/tests/Cargo.toml | 1 + lib/vector-vrl/tests/src/main.rs | 2 + lib/vector-vrl/web-playground/Cargo.toml | 1 + lib/vector-vrl/web-playground/src/lib.rs | 1 + lib/vrl-cache/Cargo.toml | 9 + lib/vrl-cache/src/cache_get.rs | 146 ++++++++++++++++ lib/vrl-cache/src/cache_set.rs | 162 ++++++++++++++++++ lib/vrl-cache/src/caches.rs | 62 +++++++ lib/vrl-cache/src/lib.rs | 17 ++ lib/vrl-cache/src/vrl_util.rs | 35 ++++ src/conditions/datadog_search.rs | 1 + src/conditions/mod.rs | 11 +- src/conditions/vrl.rs | 3 + src/config/builder.rs | 14 +- src/config/compiler.rs | 2 + src/config/graph.rs | 1 + src/config/mod.rs | 3 + src/config/transform.rs | 5 + src/config/unit_test/mod.rs | 2 +- src/config/vrl_cache.rs | 19 ++ src/lib.rs | 1 + src/test_util/mock/transforms/basic.rs | 1 + .../mock/transforms/error_definitions.rs | 1 + src/test_util/mock/transforms/noop.rs | 1 + src/topology/builder.rs | 39 ++++- src/topology/schema.rs | 71 ++++++-- src/transforms/aggregate.rs | 1 + src/transforms/aws_ec2_metadata.rs | 1 + src/transforms/dedupe/config.rs | 1 + src/transforms/filter.rs | 4 +- src/transforms/log_to_metric.rs | 1 + src/transforms/lua/mod.rs | 1 + src/transforms/metric_to_log.rs | 1 + src/transforms/reduce/config.rs | 4 +- src/transforms/reduce/transform.rs | 13 +- src/transforms/remap.rs | 48 ++++-- src/transforms/route.rs | 3 +- src/transforms/sample/config.rs | 3 +- .../tag_cardinality_limit/config.rs | 1 + src/transforms/throttle.rs | 3 +- src/vrl_cache.rs | 35 ++++ 45 files changed, 697 insertions(+), 47 deletions(-) create mode 100644 lib/vrl-cache/Cargo.toml create mode 100644 lib/vrl-cache/src/cache_get.rs create mode 100644 lib/vrl-cache/src/cache_set.rs create mode 100644 lib/vrl-cache/src/caches.rs create mode 100644 lib/vrl-cache/src/lib.rs create mode 100644 lib/vrl-cache/src/vrl_util.rs create mode 100644 src/config/vrl_cache.rs create mode 100644 src/vrl_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 54090169962d2..72550c6ac633a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10790,6 +10790,7 @@ dependencies = [ "vector-lookup", "vector-stream", "vector-tap", + "vrl-cache", ] [[package]] @@ -10884,6 +10885,7 @@ dependencies = [ "tracing-subscriber", "vector-vrl-functions", "vrl", + "vrl-cache", ] [[package]] @@ -10898,6 +10900,7 @@ dependencies = [ "serde-wasm-bindgen", "vector-vrl-functions", "vrl", + "vrl-cache", "wasm-bindgen", ] @@ -11009,6 +11012,13 @@ dependencies = [ "zstd 0.13.2", ] +[[package]] +name = "vrl-cache" +version = "0.1.0" +dependencies = [ + "vrl", +] + [[package]] name = "vsimd" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 346cc6acd34df..dea82f31a0dda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,6 +126,7 @@ members = [ "lib/vector-vrl/functions", "lib/vector-vrl/tests", "lib/vector-vrl/web-playground", + "lib/vrl-cache", "vdev", ] diff --git a/lib/vector-lib/Cargo.toml b/lib/vector-lib/Cargo.toml index 9b510d501724c..a42d4af530fbc 100644 --- a/lib/vector-lib/Cargo.toml +++ b/lib/vector-lib/Cargo.toml @@ -8,6 +8,7 @@ publish = false [dependencies] codecs = { path = "../codecs", default-features = false } enrichment = { path = "../enrichment" } +vrl-cache = { path = "../vrl-cache" } file-source = { path = "../file-source", optional = true } opentelemetry-proto = { path = "../opentelemetry-proto", optional = true } prometheus-parser = { path = "../prometheus-parser", optional = true } diff --git a/lib/vector-lib/src/lib.rs b/lib/vector-lib/src/lib.rs index 4bada86f901f4..389441087332a 100644 --- a/lib/vector-lib/src/lib.rs +++ b/lib/vector-lib/src/lib.rs @@ -25,6 +25,7 @@ pub use vector_core::{ pub use vector_lookup as lookup; pub use vector_stream as stream; pub use vector_tap as tap; +pub use vrl_cache; pub mod config { pub use vector_common::config::ComponentKey; diff --git a/lib/vector-vrl/tests/Cargo.toml b/lib/vector-vrl/tests/Cargo.toml index 54b7db24445ac..19cf605cf64ad 100644 --- a/lib/vector-vrl/tests/Cargo.toml +++ b/lib/vector-vrl/tests/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] enrichment = { path = "../../enrichment" } +vrl-cache = { path = "../../vrl-cache" } vrl.workspace = true vector-vrl-functions = { path = "../../vector-vrl/functions" } diff --git a/lib/vector-vrl/tests/src/main.rs b/lib/vector-vrl/tests/src/main.rs index 962f68b2ef296..fadcb67d1387f 100644 --- a/lib/vector-vrl/tests/src/main.rs +++ b/lib/vector-vrl/tests/src/main.rs @@ -98,6 +98,7 @@ fn main() { let mut functions = vrl::stdlib::all(); functions.extend(vector_vrl_functions::all()); functions.extend(enrichment::vrl_functions()); + functions.extend(vrl_cache::vrl_functions()); run_tests( tests, @@ -132,6 +133,7 @@ fn get_tests(cmd: &Cmd) -> Vec { vector_vrl_functions::all() .into_iter() .chain(enrichment::vrl_functions()) + .chain(vrl_cache::vrl_functions()) .collect(), )) .filter(|test| { diff --git a/lib/vector-vrl/web-playground/Cargo.toml b/lib/vector-vrl/web-playground/Cargo.toml index 4a1ff8db28cc6..5f468cdaec6b0 100644 --- a/lib/vector-vrl/web-playground/Cargo.toml +++ b/lib/vector-vrl/web-playground/Cargo.toml @@ -17,6 +17,7 @@ gloo-utils = { version = "0.2", features = ["serde"] } getrandom = { version = "0.2", features = ["js"] } vector-vrl-functions = { path = "../functions" } enrichment = { path = "../../enrichment" } +vrl-cache = { path = "../../vrl-cache" } [build-dependencies] cargo_toml = "0.20.4" diff --git a/lib/vector-vrl/web-playground/src/lib.rs b/lib/vector-vrl/web-playground/src/lib.rs index a434050aefa3c..d59ae02695d15 100644 --- a/lib/vector-vrl/web-playground/src/lib.rs +++ b/lib/vector-vrl/web-playground/src/lib.rs @@ -80,6 +80,7 @@ fn compile(mut input: Input) -> Result { let mut functions = vrl::stdlib::all(); functions.extend(vector_vrl_functions::all()); functions.extend(enrichment::vrl_functions()); + functions.extend(vrl_cache::vrl_functions()); let event = &mut input.event; let state = TypeState::default(); diff --git a/lib/vrl-cache/Cargo.toml b/lib/vrl-cache/Cargo.toml new file mode 100644 index 0000000000000..6213a709ff941 --- /dev/null +++ b/lib/vrl-cache/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "vrl-cache" +version = "0.1.0" +authors = ["Vector Contributors "] +edition = "2021" +publish = false + +[dependencies] +vrl.workspace = true diff --git a/lib/vrl-cache/src/cache_get.rs b/lib/vrl-cache/src/cache_get.rs new file mode 100644 index 0000000000000..adf854b728172 --- /dev/null +++ b/lib/vrl-cache/src/cache_get.rs @@ -0,0 +1,146 @@ +use vrl::prelude::*; + +use crate::{ + caches::{VrlCacheRegistry, VrlCacheSearch}, + vrl_util, +}; + +#[derive(Clone, Copy, Debug)] +pub struct CacheGet; +impl Function for CacheGet { + fn identifier(&self) -> &'static str { + "cache_get" + } + + fn parameters(&self) -> &'static [Parameter] { + &[ + Parameter { + keyword: "cache", + kind: kind::BYTES, + required: true, + }, + Parameter { + keyword: "key", + kind: kind::BYTES, + required: true, + }, + ] + } + + fn examples(&self) -> &'static [Example] { + &[Example { + title: "read from cache", + source: r#"cache_get!("test_cache", "test_key")"#, + result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#), + }] + } + + fn compile( + &self, + state: &TypeState, + ctx: &mut FunctionCompileContext, + arguments: ArgumentList, + ) -> Compiled { + let registry = ctx + .get_external_context_mut::() + .ok_or(Box::new(vrl_util::Error::CachesNotLoaded) as Box)?; + + let caches = registry + .cache_ids() + .into_iter() + .map(Value::from) + .collect::>(); + + let cache = arguments + .required_enum("cache", &caches, state)? + .try_bytes_utf8_lossy() + .expect("cache is not valid utf8") + .into_owned(); + let key = arguments.required("key"); + + Ok(CacheGetFn { + cache, + key, + registry: registry.as_readonly(), + } + .as_expr()) + } +} + +#[derive(Debug, Clone)] +pub struct CacheGetFn { + cache: String, + key: Box, + registry: VrlCacheSearch, +} + +impl FunctionExpression for CacheGetFn { + fn resolve(&self, ctx: &mut Context) -> Resolved { + let key = self.key.resolve(ctx)?.try_bytes_utf8_lossy()?.into_owned(); + + Ok(self + .registry + .get_val(&self.cache, &key) + .ok_or_else(|| format!("key not found in cache: {key}"))?) + } + + fn type_def(&self, _: &TypeState) -> TypeDef { + TypeDef::any().fallible() + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + use vrl::compiler::prelude::TimeZone; + use vrl::compiler::state::RuntimeState; + use vrl::compiler::TargetValue; + use vrl::value; + use vrl::value::Secrets; + + use crate::caches::VrlCache; + + use super::*; + + fn get_cache_registry() -> VrlCacheRegistry { + let registry = VrlCacheRegistry::default(); + registry + .caches + .write() + .unwrap() + .insert("test".to_string(), VrlCache::default()); + registry + } + + #[test] + fn get_val() { + let registry = get_cache_registry(); + registry + .caches + .write() + .unwrap() + .get_mut("test") + .unwrap() + .data + .insert("test_key".to_string(), Value::from("test_value")); + let func = CacheGetFn { + cache: "test".to_string(), + key: expr!("test_key"), + registry: registry.as_readonly(), + }; + + let tz = TimeZone::default(); + let object: Value = BTreeMap::new().into(); + let mut target = TargetValue { + value: object, + metadata: value!({}), + secrets: Secrets::new(), + }; + let mut runtime_state = RuntimeState::default(); + let mut ctx = Context::new(&mut target, &mut runtime_state, &tz); + + let got = func.resolve(&mut ctx); + + assert_eq!(Ok(value!("test_value")), got); + } +} diff --git a/lib/vrl-cache/src/cache_set.rs b/lib/vrl-cache/src/cache_set.rs new file mode 100644 index 0000000000000..8660a5db4d90a --- /dev/null +++ b/lib/vrl-cache/src/cache_set.rs @@ -0,0 +1,162 @@ +use vrl::prelude::*; + +use crate::{caches::VrlCacheRegistry, vrl_util}; + +#[derive(Clone, Copy, Debug)] +pub struct CacheSet; +impl Function for CacheSet { + fn identifier(&self) -> &'static str { + "cache_set" + } + + fn parameters(&self) -> &'static [Parameter] { + &[ + Parameter { + keyword: "cache", + kind: kind::BYTES, + required: true, + }, + Parameter { + keyword: "key", + kind: kind::BYTES, + required: true, + }, + Parameter { + keyword: "value", + kind: kind::ANY, + required: true, + }, + ] + } + + fn examples(&self) -> &'static [Example] { + &[Example { + title: "write to cache", + source: r#"cache_get!("test_cache", "test_key", "test_value")"#, + result: Ok(""), + }] + } + + fn compile( + &self, + state: &TypeState, + ctx: &mut FunctionCompileContext, + arguments: ArgumentList, + ) -> Compiled { + let registry = ctx + .get_external_context_mut::() + .ok_or(Box::new(vrl_util::Error::CachesNotLoaded) as Box)?; + + let caches = registry + .cache_ids() + .into_iter() + .map(Value::from) + .collect::>(); + + let cache = arguments + .required_enum("cache", &caches, state)? + .try_bytes_utf8_lossy() + .expect("cache is not valid utf8") + .into_owned(); + let key = arguments.required("key"); + let value = arguments.required("value"); + + Ok(CacheSetFn { + cache, + key, + value, + registry: registry.clone(), + } + .as_expr()) + } +} + +#[derive(Debug, Clone)] +pub struct CacheSetFn { + cache: String, + key: Box, + value: Box, + registry: VrlCacheRegistry, +} + +impl FunctionExpression for CacheSetFn { + fn resolve(&self, ctx: &mut Context) -> Resolved { + let key = self.key.resolve(ctx)?.try_bytes_utf8_lossy()?.into_owned(); + let value = self.value.resolve(ctx)?; + self.registry + .caches + .write() + .unwrap() + .get_mut(&self.cache) + .unwrap() + .data + .insert(key, value.clone()); + Ok(value) + } + + fn type_def(&self, _: &TypeState) -> TypeDef { + TypeDef::null().impure().fallible() + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + use vrl::compiler::prelude::TimeZone; + use vrl::compiler::state::RuntimeState; + use vrl::compiler::TargetValue; + use vrl::value; + use vrl::value::Secrets; + + use crate::caches::VrlCache; + + use super::*; + + fn get_cache_registry() -> VrlCacheRegistry { + let registry = VrlCacheRegistry::default(); + registry + .caches + .write() + .unwrap() + .insert("test".to_string(), VrlCache::default()); + registry + } + + #[test] + fn set_val() { + let registry = get_cache_registry(); + let func = CacheSetFn { + cache: "test".to_string(), + key: expr!("test_key"), + value: expr!("test_value"), + registry: registry.clone(), + }; + + let tz = TimeZone::default(); + let object: Value = BTreeMap::new().into(); + let mut target = TargetValue { + value: object, + metadata: value!({}), + secrets: Secrets::new(), + }; + let mut runtime_state = RuntimeState::default(); + let mut ctx = Context::new(&mut target, &mut runtime_state, &tz); + + let got = func.resolve(&mut ctx); + + assert_eq!(Ok(value!("test_value")), got); + assert_eq!( + value!("test_value"), + registry + .caches + .read() + .unwrap() + .get("test") + .unwrap() + .data + .get("test_key") + .unwrap() + .clone() + ); + } +} diff --git a/lib/vrl-cache/src/caches.rs b/lib/vrl-cache/src/caches.rs new file mode 100644 index 0000000000000..8b76ec06cc33d --- /dev/null +++ b/lib/vrl-cache/src/caches.rs @@ -0,0 +1,62 @@ +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +use vrl::core::Value; + +type CacheMap = HashMap; + +#[derive(Default, Debug)] +pub struct VrlCache { + pub data: HashMap, +} + +#[derive(Clone, Default, Debug)] +pub struct VrlCacheRegistry { + pub caches: Arc>, +} + +/// Eq implementation for caching purposes +impl PartialEq for VrlCacheRegistry { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.caches, &other.caches) + } +} +impl Eq for VrlCacheRegistry {} + +impl VrlCacheRegistry { + /// Return a list of the available caches we can read and write to. + /// + /// # Panics + /// + /// Panics if the RwLock is poisoned. + pub fn cache_ids(&self) -> Vec { + let locked = self.caches.read().unwrap(); + locked.iter().map(|(key, _)| key.clone()).collect() + } + + /// Returns a cheaply clonable struct through that provides lock free read + /// access to the cache. + pub fn as_readonly(&self) -> VrlCacheSearch { + VrlCacheSearch(self.caches.clone()) + } + + pub fn insert_caches(&self, new_caches: CacheMap) { + let mut caches = self.caches.write().unwrap(); + caches.extend(new_caches); + } +} + +/// Provides read only access to the enrichment tables via the +/// `vrl::EnrichmentTableSearch` trait. Cloning this object is designed to be +/// cheap. The underlying data will be shared by all clones. +#[derive(Clone, Default, Debug)] +pub struct VrlCacheSearch(Arc>); + +impl VrlCacheSearch { + pub fn get_val(&self, cache: &String, key: &String) -> Option { + let locked = self.0.read().unwrap(); + locked[cache].data.get(key).cloned() + } +} diff --git a/lib/vrl-cache/src/lib.rs b/lib/vrl-cache/src/lib.rs new file mode 100644 index 0000000000000..be88e6cd00c9c --- /dev/null +++ b/lib/vrl-cache/src/lib.rs @@ -0,0 +1,17 @@ +#![deny(warnings)] + +pub mod cache_get; +pub mod cache_set; +pub mod caches; + +mod vrl_util; + +pub use caches::VrlCacheRegistry; +use vrl::compiler::Function; + +pub fn vrl_functions() -> Vec> { + vec![ + Box::new(cache_get::CacheGet) as _, + Box::new(cache_set::CacheSet) as _, + ] +} diff --git a/lib/vrl-cache/src/vrl_util.rs b/lib/vrl-cache/src/vrl_util.rs new file mode 100644 index 0000000000000..1e7b57438a245 --- /dev/null +++ b/lib/vrl-cache/src/vrl_util.rs @@ -0,0 +1,35 @@ +//! Utilities shared between VRL functions. +use vrl::diagnostic::{Label, Span}; +use vrl::prelude::*; + +#[derive(Debug)] +pub enum Error { + CachesNotLoaded, +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Error::CachesNotLoaded => write!(f, "VRL caches not loaded"), + } + } +} + +impl std::error::Error for Error {} + +impl DiagnosticMessage for Error { + fn code(&self) -> usize { + 111 + } + + fn labels(&self) -> Vec