Skip to content

Commit

Permalink
feat: sys events buffer added
Browse files Browse the repository at this point in the history
closes: #25581
  • Loading branch information
praveen-influx committed Nov 29, 2024
1 parent b7fd8e2 commit fa6773d
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 1 deletion.
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ influxdb3_server = { path = "../influxdb3_server" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }
influxdb3_sys_events = { path = "../influxdb3_sys_events" }

# Crates.io dependencies
anyhow.workspace = true
Expand Down
5 changes: 4 additions & 1 deletion influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use influxdb3_server::{
query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl},
serve, CommonServerState,
};
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
Expand Down Expand Up @@ -381,9 +382,10 @@ pub async fn command(config: Config) -> Result<()> {
// Construct a token to trigger clean shutdown
let frontend_shutdown = CancellationToken::new();

let time_provider = Arc::new(SystemProvider::new());
let sys_events_store = Arc::new(SysEventStore::new(Arc::clone(&time_provider) as _));
let object_store: Arc<dyn ObjectStore> =
make_object_store(&config.object_store_config).map_err(Error::ObjectStoreParsing)?;
let time_provider = Arc::new(SystemProvider::new());

let (object_store, parquet_cache) = if !config.disable_parquet_mem_cache {
let (object_store, parquet_cache) = create_cached_obj_store_and_oracle(
Expand Down Expand Up @@ -516,6 +518,7 @@ pub async fn command(config: Config) -> Result<()> {
concurrent_query_limit: 10,
query_log_size: config.query_log_size,
telemetry_store: Arc::clone(&telemetry_store),
sys_events_store: Arc::clone(&sys_events_store),
}));

let listener = TcpListener::bind(*config.http_bind_address)
Expand Down
1 change: 1 addition & 0 deletions influxdb3_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ influxdb3_process = { path = "../influxdb3_process", default-features = false }
influxdb3_wal = { path = "../influxdb3_wal"}
influxdb3_write = { path = "../influxdb3_write" }
iox_query_influxql_rewrite = { path = "../iox_query_influxql_rewrite" }
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }

# crates.io Dependencies
Expand Down
5 changes: 5 additions & 0 deletions influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ mod tests {
use influxdb3_cache::meta_cache::MetaCacheProvider;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_id::{DbId, TableId};
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::WalConfig;
use influxdb3_write::parquet_cache::test_cached_obj_store_and_oracle;
Expand Down Expand Up @@ -798,6 +799,9 @@ mod tests {
.unwrap(),
);

let sys_events_store = Arc::new(SysEventStore::new(Arc::<MockProvider>::clone(
&time_provider,
)));
let parquet_metrics_provider: Arc<PersistedFiles> =
Arc::clone(&write_buffer_impl.persisted_files());
let sample_telem_store =
Expand All @@ -819,6 +823,7 @@ mod tests {
concurrent_query_limit: 10,
query_log_size: 10,
telemetry_store: Arc::clone(&sample_telem_store),
sys_events_store: Arc::clone(&sys_events_store),
});

// bind to port 0 will assign a random available port:
Expand Down
13 changes: 13 additions & 0 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use datafusion_util::config::DEFAULT_SCHEMA;
use datafusion_util::MemoryStream;
use influxdb3_cache::meta_cache::{MetaCacheFunction, META_CACHE_UDTF_NAME};
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_write::last_cache::LastCacheFunction;
use influxdb3_write::WriteBuffer;
Expand Down Expand Up @@ -57,6 +58,7 @@ pub struct QueryExecutorImpl {
query_execution_semaphore: Arc<InstrumentedAsyncSemaphore>,
query_log: Arc<QueryLog>,
telemetry_store: Arc<TelemetryStore>,
sys_events_store: Arc<SysEventStore>,
}

/// Arguments for [`QueryExecutorImpl::new`]
Expand All @@ -70,6 +72,7 @@ pub struct CreateQueryExecutorArgs {
pub concurrent_query_limit: usize,
pub query_log_size: usize,
pub telemetry_store: Arc<TelemetryStore>,
pub sys_events_store: Arc<SysEventStore>,
}

impl QueryExecutorImpl {
Expand All @@ -83,6 +86,7 @@ impl QueryExecutorImpl {
concurrent_query_limit,
query_log_size,
telemetry_store,
sys_events_store,
}: CreateQueryExecutorArgs,
) -> Self {
let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new(
Expand All @@ -103,6 +107,7 @@ impl QueryExecutorImpl {
query_execution_semaphore,
query_log,
telemetry_store,
sys_events_store,
}
}
}
Expand Down Expand Up @@ -340,6 +345,7 @@ impl QueryDatabase for QueryExecutorImpl {
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
Arc::clone(&self.query_log),
Arc::clone(&self.sys_events_store),
))))
}

Expand Down Expand Up @@ -372,11 +378,13 @@ impl Database {
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
query_log: Arc<QueryLog>,
sys_events_store: Arc<SysEventStore>,
) -> Self {
let system_schema_provider = Arc::new(SystemSchemaProvider::new(
Arc::clone(&db_schema),
Arc::clone(&query_log),
Arc::clone(&write_buffer),
Arc::clone(&sys_events_store),
));
Self {
db_schema,
Expand Down Expand Up @@ -616,6 +624,7 @@ mod tests {
use futures::TryStreamExt;
use influxdb3_cache::meta_cache::MetaCacheProvider;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
Expand Down Expand Up @@ -697,6 +706,9 @@ mod tests {

let persisted_files: Arc<PersistedFiles> = Arc::clone(&write_buffer_impl.persisted_files());
let telemetry_store = TelemetryStore::new_without_background_runners(persisted_files);
let sys_events_store = Arc::new(SysEventStore::new(Arc::<MockProvider>::clone(
&time_provider,
)));
let write_buffer: Arc<dyn WriteBuffer> = write_buffer_impl;
let metrics = Arc::new(Registry::new());
let datafusion_config = Arc::new(Default::default());
Expand All @@ -709,6 +721,7 @@ mod tests {
concurrent_query_limit: 10,
query_log_size: 10,
telemetry_store,
sys_events_store,
});

(write_buffer, query_executor, time_provider)
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_server/src/system_tables/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{any::Any, collections::HashMap, sync::Arc};

use datafusion::{catalog::SchemaProvider, datasource::TableProvider, error::DataFusionError};
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_sys_events::SysEventStore;
use influxdb3_write::WriteBuffer;
use iox_query::query_log::QueryLog;
use iox_system_tables::SystemTableProvider;
Expand Down Expand Up @@ -45,6 +46,7 @@ impl SystemSchemaProvider {
db_schema: Arc<DatabaseSchema>,
query_log: Arc<QueryLog>,
buffer: Arc<dyn WriteBuffer>,
_sys_events_store: Arc<SysEventStore>,
) -> Self {
let mut tables = HashMap::<&'static str, Arc<dyn TableProvider>>::new();
let queries = Arc::new(SystemTableProvider::new(Arc::new(QueriesTable::new(
Expand Down
25 changes: 25 additions & 0 deletions influxdb3_sys_events/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "influxdb3_sys_events"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true


[dependencies]
# core crates
iox_time.workspace = true
iox_system_tables.workspace = true
observability_deps.workspace = true

# crates.io deps
arrow.workspace = true
arrow-array.workspace = true
async-trait.workspace = true
chrono.workspace = true
dashmap.workspace = true
datafusion.workspace = true

[dev-dependencies]
test-log.workspace = true
proptest.workspace = true
Loading

0 comments on commit fa6773d

Please sign in to comment.