diff --git a/Cargo.lock b/Cargo.lock index d864bf8..904be68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,8 +180,6 @@ dependencies = [ "serde_urlencoded", "static_assertions_next", "thiserror", - "tracing", - "tracing-futures", ] [[package]] @@ -1292,7 +1290,6 @@ checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", - "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -3769,18 +3766,6 @@ dependencies = [ "valuable", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "futures", - "futures-task", - "pin-project", - "tracing", -] - [[package]] name = "tracing-log" version = "0.2.0" diff --git a/charts/processed_data/charts/processed_data/Chart.yaml b/charts/processed_data/charts/processed_data/Chart.yaml index fa912a5..f2d6f74 100644 --- a/charts/processed_data/charts/processed_data/Chart.yaml +++ b/charts/processed_data/charts/processed_data/Chart.yaml @@ -5,4 +5,4 @@ type: application version: 0.1.0 -appVersion: 0.1.0-rc5 +appVersion: 0.1.0-rc6 diff --git a/processed_data/Cargo.toml b/processed_data/Cargo.toml index e9ad768..b97c1fa 100644 --- a/processed_data/Cargo.toml +++ b/processed_data/Cargo.toml @@ -10,7 +10,6 @@ anyhow = "1.0.81" async-graphql = { version = "7.0.2", default-features = false, features = [ "chrono", "graphiql", - "tracing", "dataloader", ] } async-graphql-axum = { version = "7.0.2" } diff --git a/processed_data/src/graphql/mod.rs b/processed_data/src/graphql/mod.rs index 185cf0e..6d1916f 100644 --- a/processed_data/src/graphql/mod.rs +++ b/processed_data/src/graphql/mod.rs @@ -32,7 +32,15 @@ pub fn root_schema_builder( ) -> SchemaBuilder { Schema::build(Query, EmptyMutation, EmptySubscription) .data(DataLoader::new( - DataCollectionLoader::new(database.clone()), + ProcessedDataLoader::new(database.clone()), + tokio::spawn, + )) + .data(DataLoader::new( + ProcessingJobDataLoader::new(database.clone()), + tokio::spawn, + )) + .data(DataLoader::new( + ProcessingJobParameterDataLoader::new(database.clone()), tokio::spawn, )) .data(database) @@ -43,18 +51,33 @@ pub fn root_schema_builder( #[derive(Debug, Clone, Default)] pub struct Query; -pub struct DataCollectionLoader(DatabaseConnection); +pub struct ProcessedDataLoader(DatabaseConnection); +pub struct ProcessingJobDataLoader(DatabaseConnection); +pub struct ProcessingJobParameterDataLoader(DatabaseConnection); -impl DataCollectionLoader { +impl ProcessingJobDataLoader { fn new(database: DatabaseConnection) -> Self { Self(database) } } -impl Loader for DataCollectionLoader { +impl ProcessedDataLoader { + fn new(database: DatabaseConnection) -> Self { + Self(database) + } +} + +impl ProcessingJobParameterDataLoader { + fn new(database: DatabaseConnection) -> Self { + Self(database) + } +} + +impl Loader for ProcessedDataLoader { type Value = DataProcessing; type Error = async_graphql::Error; + #[instrument(name = "load_processed_data", skip(self))] async fn load(&self, keys: &[u32]) -> Result, Self::Error> { let mut results = HashMap::new(); let keys_vec: Vec = keys.iter().cloned().collect(); @@ -74,6 +97,58 @@ impl Loader for DataCollectionLoader { } } +impl Loader for ProcessingJobDataLoader { + type Value = Vec; + type Error = async_graphql::Error; + + #[instrument(name = "load_processing_job", skip(self))] + async fn load(&self, keys: &[u32]) -> Result, Self::Error> { + let mut results = HashMap::new(); + let keys_vec: Vec = keys.iter().cloned().collect(); + let records = processing_job::Entity::find() + .filter(processing_job::Column::DataCollectionId.is_in(keys_vec)) + .all(&self.0) + .await?; + + for record in records { + let data_collection_id = record.data_collection_id.unwrap(); + let data = ProcessingJob::from(record); + + results + .entry(data_collection_id) + .or_insert_with(Vec::new) + .push(data) + } + Ok(results) + } +} + +impl Loader for ProcessingJobParameterDataLoader { + type Value = Vec; + type Error = async_graphql::Error; + + #[instrument(name = "load_processing_job_parameter", skip(self))] + async fn load(&self, keys: &[u32]) -> Result, Self::Error> { + let mut results = HashMap::new(); + let keys_vec: Vec = keys.iter().cloned().collect(); + let records = processing_job_parameter::Entity::find() + .filter(processing_job_parameter::Column::ProcessingJobId.is_in(keys_vec)) + .all(&self.0) + .await?; + + for record in records { + let processing_job_id = record.processing_job_id.unwrap(); + let data = ProcessingJobParameter::from(record); + results + .entry(processing_job_id) + .or_insert_with(Vec::new) + .push(data) + } + + Ok(results) + } +} + #[ComplexObject] impl DataCollection { /// Fetched all the processed data from data collection during a session @@ -81,7 +156,7 @@ impl DataCollection { &self, ctx: &Context<'_>, ) -> Result, async_graphql::Error> { - let loader = ctx.data_unchecked::>(); + let loader = ctx.data_unchecked::>(); Ok(loader.load_one(self.id).await?) } @@ -89,15 +164,9 @@ impl DataCollection { async fn processing_jobs( &self, ctx: &Context<'_>, - ) -> async_graphql::Result, async_graphql::Error> { - let database = ctx.data::()?; - Ok(processing_job::Entity::find() - .filter(processing_job::Column::DataCollectionId.eq(self.id)) - .all(database) - .await? - .into_iter() - .map(ProcessingJob::from) - .collect()) + ) -> async_graphql::Result>, async_graphql::Error> { + let loader = ctx.data_unchecked::>(); + Ok(loader.load_one(self.id).await?) } /// Fetches all the automatic process @@ -141,15 +210,9 @@ impl ProcessingJob { async fn parameters( &self, ctx: &Context<'_>, - ) -> async_graphql::Result> { - let database = ctx.data::()?; - Ok(processing_job_parameter::Entity::find() - .filter(processing_job_parameter::Column::ProcessingJobId.eq(self.processing_job_id)) - .all(database) - .await? - .into_iter() - .map(ProcessingJobParameter::from) - .collect()) + ) -> async_graphql::Result>> { + let loader = ctx.data_unchecked::>(); + Ok(loader.load_one(self.processing_job_id).await?) } }