diff --git a/Cargo.lock b/Cargo.lock index ef9ca8696c0..2fe87a3614c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2714,6 +2714,7 @@ dependencies = [ "influxdb3_client", "influxdb3_process", "influxdb3_server", + "influxdb3_sys_events", "influxdb3_telemetry", "influxdb3_wal", "influxdb3_write", @@ -2891,6 +2892,7 @@ dependencies = [ "influxdb3_catalog", "influxdb3_id", "influxdb3_process", + "influxdb3_sys_events", "influxdb3_telemetry", "influxdb3_wal", "influxdb3_write", @@ -2935,6 +2937,23 @@ dependencies = [ "urlencoding", ] +[[package]] +name = "influxdb3_sys_events" +version = "0.1.0" +dependencies = [ + "arrow", + "arrow-array", + "async-trait", + "chrono", + "dashmap", + "datafusion", + "iox_system_tables", + "iox_time", + "observability_deps", + "proptest", + "test-log", +] + [[package]] name = "influxdb3_telemetry" version = "0.1.0" diff --git a/influxdb3/Cargo.toml b/influxdb3/Cargo.toml index 3b4d0687f77..f86a9b5f6b0 100644 --- a/influxdb3/Cargo.toml +++ b/influxdb3/Cargo.toml @@ -32,6 +32,7 @@ influxdb3_server = { path = "../influxdb3_server" } influxdb3_wal = { path = "../influxdb3_wal" } influxdb3_write = { path = "../influxdb3_write" } influxdb3_telemetry = { path = "../influxdb3_telemetry" } +influxdb3_sys_events = { path = "../influxdb3_sys_events" } # Crates.io dependencies anyhow.workspace = true diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index d386cf9961b..8ab2bcc2c20 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -18,6 +18,7 @@ use influxdb3_server::{ query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl}, serve, CommonServerState, }; +use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_write::{ @@ -381,9 +382,10 @@ pub async fn command(config: Config) -> Result<()> { // Construct a token to trigger clean shutdown let frontend_shutdown = CancellationToken::new(); + let time_provider = Arc::new(SystemProvider::new()); + let sys_events_store = Arc::new(SysEventStore::new(Arc::clone(&time_provider) as _)); let object_store: Arc = make_object_store(&config.object_store_config).map_err(Error::ObjectStoreParsing)?; - let time_provider = Arc::new(SystemProvider::new()); let (object_store, parquet_cache) = if !config.disable_parquet_mem_cache { let (object_store, parquet_cache) = create_cached_obj_store_and_oracle( @@ -516,6 +518,7 @@ pub async fn command(config: Config) -> Result<()> { concurrent_query_limit: 10, query_log_size: config.query_log_size, telemetry_store: Arc::clone(&telemetry_store), + sys_events_store: Arc::clone(&sys_events_store), })); let listener = TcpListener::bind(*config.http_bind_address) diff --git a/influxdb3_server/Cargo.toml b/influxdb3_server/Cargo.toml index 447079ab375..ffd3fa8b2f0 100644 --- a/influxdb3_server/Cargo.toml +++ b/influxdb3_server/Cargo.toml @@ -37,6 +37,7 @@ influxdb3_process = { path = "../influxdb3_process", default-features = false } influxdb3_wal = { path = "../influxdb3_wal"} influxdb3_write = { path = "../influxdb3_write" } iox_query_influxql_rewrite = { path = "../iox_query_influxql_rewrite" } +influxdb3_sys_events = { path = "../influxdb3_sys_events" } influxdb3_telemetry = { path = "../influxdb3_telemetry" } # crates.io Dependencies diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 3d157775c38..24624436cd7 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -237,6 +237,7 @@ mod tests { use influxdb3_cache::meta_cache::MetaCacheProvider; use influxdb3_catalog::catalog::Catalog; use influxdb3_id::{DbId, TableId}; + use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::WalConfig; use influxdb3_write::parquet_cache::test_cached_obj_store_and_oracle; @@ -798,6 +799,9 @@ mod tests { .unwrap(), ); + let sys_events_store = Arc::new(SysEventStore::new(Arc::::clone( + &time_provider, + ))); let parquet_metrics_provider: Arc = Arc::clone(&write_buffer_impl.persisted_files()); let sample_telem_store = @@ -819,6 +823,7 @@ mod tests { concurrent_query_limit: 10, query_log_size: 10, telemetry_store: Arc::clone(&sample_telem_store), + sys_events_store: Arc::clone(&sys_events_store), }); // bind to port 0 will assign a random available port: diff --git a/influxdb3_server/src/query_executor.rs b/influxdb3_server/src/query_executor.rs index 4107463e732..2033f5e8753 100644 --- a/influxdb3_server/src/query_executor.rs +++ b/influxdb3_server/src/query_executor.rs @@ -20,6 +20,7 @@ use datafusion_util::config::DEFAULT_SCHEMA; use datafusion_util::MemoryStream; use influxdb3_cache::meta_cache::{MetaCacheFunction, META_CACHE_UDTF_NAME}; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; +use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_write::last_cache::LastCacheFunction; use influxdb3_write::WriteBuffer; @@ -57,6 +58,7 @@ pub struct QueryExecutorImpl { query_execution_semaphore: Arc, query_log: Arc, telemetry_store: Arc, + sys_events_store: Arc, } /// Arguments for [`QueryExecutorImpl::new`] @@ -70,6 +72,7 @@ pub struct CreateQueryExecutorArgs { pub concurrent_query_limit: usize, pub query_log_size: usize, pub telemetry_store: Arc, + pub sys_events_store: Arc, } impl QueryExecutorImpl { @@ -83,6 +86,7 @@ impl QueryExecutorImpl { concurrent_query_limit, query_log_size, telemetry_store, + sys_events_store, }: CreateQueryExecutorArgs, ) -> Self { let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new( @@ -103,6 +107,7 @@ impl QueryExecutorImpl { query_execution_semaphore, query_log, telemetry_store, + sys_events_store, } } } @@ -340,6 +345,7 @@ impl QueryDatabase for QueryExecutorImpl { Arc::clone(&self.exec), Arc::clone(&self.datafusion_config), Arc::clone(&self.query_log), + Arc::clone(&self.sys_events_store), )))) } @@ -372,11 +378,13 @@ impl Database { exec: Arc, datafusion_config: Arc>, query_log: Arc, + sys_events_store: Arc, ) -> Self { let system_schema_provider = Arc::new(SystemSchemaProvider::new( Arc::clone(&db_schema), Arc::clone(&query_log), Arc::clone(&write_buffer), + Arc::clone(&sys_events_store), )); Self { db_schema, @@ -616,6 +624,7 @@ mod tests { use futures::TryStreamExt; use influxdb3_cache::meta_cache::MetaCacheProvider; use influxdb3_catalog::catalog::Catalog; + use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_write::{ @@ -697,6 +706,9 @@ mod tests { let persisted_files: Arc = Arc::clone(&write_buffer_impl.persisted_files()); let telemetry_store = TelemetryStore::new_without_background_runners(persisted_files); + let sys_events_store = Arc::new(SysEventStore::new(Arc::::clone( + &time_provider, + ))); let write_buffer: Arc = write_buffer_impl; let metrics = Arc::new(Registry::new()); let datafusion_config = Arc::new(Default::default()); @@ -709,6 +721,7 @@ mod tests { concurrent_query_limit: 10, query_log_size: 10, telemetry_store, + sys_events_store, }); (write_buffer, query_executor, time_provider) diff --git a/influxdb3_server/src/system_tables/mod.rs b/influxdb3_server/src/system_tables/mod.rs index 41527a291de..0d6057e75bc 100644 --- a/influxdb3_server/src/system_tables/mod.rs +++ b/influxdb3_server/src/system_tables/mod.rs @@ -2,6 +2,7 @@ use std::{any::Any, collections::HashMap, sync::Arc}; use datafusion::{catalog::SchemaProvider, datasource::TableProvider, error::DataFusionError}; use influxdb3_catalog::catalog::DatabaseSchema; +use influxdb3_sys_events::SysEventStore; use influxdb3_write::WriteBuffer; use iox_query::query_log::QueryLog; use iox_system_tables::SystemTableProvider; @@ -45,6 +46,7 @@ impl SystemSchemaProvider { db_schema: Arc, query_log: Arc, buffer: Arc, + _sys_events_store: Arc, ) -> Self { let mut tables = HashMap::<&'static str, Arc>::new(); let queries = Arc::new(SystemTableProvider::new(Arc::new(QueriesTable::new( diff --git a/influxdb3_sys_events/Cargo.toml b/influxdb3_sys_events/Cargo.toml new file mode 100644 index 00000000000..a25e6b7da36 --- /dev/null +++ b/influxdb3_sys_events/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "influxdb3_sys_events" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + + +[dependencies] +# core crates +iox_time.workspace = true +iox_system_tables.workspace = true +observability_deps.workspace = true + +# crates.io deps +arrow.workspace = true +arrow-array.workspace = true +async-trait.workspace = true +chrono.workspace = true +dashmap.workspace = true +datafusion.workspace = true + +[dev-dependencies] +test-log.workspace = true +proptest.workspace = true diff --git a/influxdb3_sys_events/src/lib.rs b/influxdb3_sys_events/src/lib.rs new file mode 100644 index 00000000000..d0e5cd7d35d --- /dev/null +++ b/influxdb3_sys_events/src/lib.rs @@ -0,0 +1,225 @@ +use std::fmt::Debug; +use std::{ + any::{Any, TypeId}, + mem::replace, + sync::Arc, +}; + +use dashmap::DashMap; +use iox_time::TimeProvider; + +const MAX_CAPACITY: usize = 1000; + +/// This store captures the events for different types of instrumentation. +/// It is backed by a ring buffer per event type. Every new event type that +/// is added can call [`SysEventStore::add`] directly. And in order to find +/// all the events per event type [`SysEventStore::query`] method can be used. +/// Every time a new event is introduced, the system table had to be setup +/// following the same pattern as in [`influxdb3_server::system_tables`] +#[derive(Debug)] +pub struct SysEventStore { + events: dashmap::DashMap>, + time_provider: Arc, +} + +impl SysEventStore { + pub fn new(time_provider: Arc) -> Self { + Self { + events: DashMap::new(), + time_provider, + } + } + + pub fn add(&self, val: E) + where + E: 'static + Debug + Sync + Send, + { + let wrapped = Event { + time: self.time_provider.now().timestamp_nanos(), + data: val, + }; + let mut buf = self + .events + .entry(TypeId::of::>>()) + .or_insert_with(|| Box::new(RingBuffer::>::new(MAX_CAPACITY))); + + buf.downcast_mut::>>() + .unwrap() + .push(wrapped); + } + + pub fn query(&self) -> Vec> + where + E: 'static + Clone + Debug + Sync + Send, + { + let mut vec = vec![]; + if let Some(buf) = self.events.get(&TypeId::of::>>()) { + let iter = buf + .downcast_ref::>>() + .unwrap() + .in_order(); + for i in iter { + vec.push(i.clone()); + } + }; + vec + } +} + +struct RingBuffer { + buf: Vec, + max: usize, + write_index: usize, +} + +impl RingBuffer { + pub fn new(capacity: usize) -> Self { + Self { + buf: Vec::with_capacity(capacity), + max: capacity, + write_index: 0, + } + } + + pub fn push(&mut self, val: T) { + if !self.reached_max() { + self.buf.push(val); + } else { + let _ = replace(&mut self.buf[self.write_index], val); + } + self.write_index = (self.write_index + 1) % self.max; + } + + pub fn in_order(&self) -> impl Iterator { + let (head, tail) = self.buf.split_at(self.write_index); + tail.iter().chain(head.iter()) + } + + fn reached_max(&mut self) -> bool { + self.buf.len() >= self.max + } +} + +/// This is wrapper type adds the time of event +#[allow(dead_code)] +#[derive(Default, Clone, Debug)] +pub struct Event { + time: i64, + data: D, +} + +impl Event { + pub fn new(time: i64, data: D) -> Self { + Self { time, data } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use iox_time::{MockProvider, Time}; + use observability_deps::tracing::debug; + + use crate::{RingBuffer, SysEventStore}; + + #[allow(dead_code)] + #[derive(Default, Clone, Debug)] + struct SampleEvent1 { + pub start_time: i64, + pub time_taken: u64, + pub total_fetched: u64, + pub random_name: String, + } + + #[allow(dead_code)] + #[derive(Default, Clone, Debug)] + struct SampleEvent2 { + pub start_time: i64, + pub time_taken: u64, + pub generation_id: u64, + } + + #[test] + fn test_ring_buffer_not_full_at_less_than_max() { + let mut buf = RingBuffer::new(2); + buf.push(1); + + let all_results: Vec<&u64> = buf.in_order().collect(); + let first = *all_results.first().unwrap(); + + assert_eq!(1, all_results.len()); + assert_eq!(&1, first); + } + + #[test] + fn test_ring_buffer_not_full_at_max() { + let mut buf = RingBuffer::new(2); + buf.push(1); + buf.push(2); + + let all_results: Vec<&u64> = buf.in_order().collect(); + let first = *all_results.first().unwrap(); + let second = *all_results.get(1).unwrap(); + + assert_eq!(2, all_results.len()); + assert_eq!(&1, first); + assert_eq!(&2, second); + } + + #[test] + fn test_ring_buffer() { + let mut buf = RingBuffer::new(2); + buf.push(1); + buf.push(2); + buf.push(3); + + let all_results: Vec<&u64> = buf.in_order().collect(); + let first = *all_results.first().unwrap(); + let second = *all_results.get(1).unwrap(); + + assert_eq!(2, all_results.len()); + assert_eq!(&2, first); + assert_eq!(&3, second); + } + + #[test_log::test(test)] + fn test_event_store() { + let event_data = SampleEvent1 { + start_time: 0, + time_taken: 10, + total_fetched: 10, + random_name: "foo".to_owned(), + }; + + let event_data2 = SampleEvent2 { + start_time: 0, + time_taken: 10, + generation_id: 100, + }; + + let event_data3 = SampleEvent1 { + start_time: 0, + time_taken: 10, + total_fetched: 10, + random_name: "boo".to_owned(), + }; + + let time_provider = MockProvider::new(Time::from_timestamp_nanos(100)); + + let event_store = SysEventStore::new(Arc::new(time_provider)); + event_store.add(event_data); + + event_store.add(event_data2); + event_store.add(event_data3); + assert_eq!(2, event_store.events.len()); + + let all_events = event_store.query::(); + assert_eq!(2, all_events.len()); + debug!(all_events = ?all_events, "all events in sys events for type SampleEvent1"); + + let all_events = event_store.query::(); + assert_eq!(1, all_events.len()); + debug!(all_events = ?all_events, "all events in sys events for type SampleEvent2"); + } +}