From 0f22bbe9899351cf87a111e038f296c28995922c Mon Sep 17 00:00:00 2001 From: iamvigneshwars Date: Fri, 12 Apr 2024 13:18:28 +0000 Subject: [PATCH] Update query for scaling statistics --- processed_data/src/graphql/entities.rs | 32 ++++- processed_data/src/graphql/mod.rs | 180 +++++++++++-------------- processed_data/src/main.rs | 1 + processed_data/src/route_handlers.rs | 1 + 4 files changed, 109 insertions(+), 105 deletions(-) diff --git a/processed_data/src/graphql/entities.rs b/processed_data/src/graphql/entities.rs index 944b41a..6a3f65d 100644 --- a/processed_data/src/graphql/entities.rs +++ b/processed_data/src/graphql/entities.rs @@ -74,7 +74,7 @@ impl From for ProcessingJobParameter { /// Represents an auto processed job #[derive(Clone, Debug, PartialEq, SimpleObject)] -#[graphql(name = "AutoProc", unresolvable, complex)] +#[graphql(name = "AutoProc", unresolvable)] pub struct AutoProc { /// An opaque unique identifier for the auto processing pub auto_proc_id: u32, @@ -114,7 +114,7 @@ impl From for AutoProc { /// Represents an auto processed program #[derive(Clone, Debug, PartialEq, SimpleObject)] -#[graphql(name = "AutoProcProgram", unresolvable, complex)] +#[graphql(name = "AutoProcProgram", unresolvable)] pub struct AutoProcProgram { /// An opaque unique identifier for the auto processing program pub auto_proc_program_id: u32, @@ -170,7 +170,7 @@ impl From for AutoProcIntegration { /// Represents and auto processing scaling #[derive(Clone, Debug, PartialEq, SimpleObject)] -#[graphql(name = "AutoProcScaling", unresolvable, complex)] +#[graphql(name = "AutoProcScaling", unresolvable)] pub struct AutoProcScaling { /// An opaque unique identifier for the auto processing scaling pub auto_proc_scaling_id: u32, @@ -266,17 +266,39 @@ pub struct DataCollection { pub id: u32, } -/// Datasets subgraph extension +/// Combines processing job and its paremeters #[derive(Debug, Clone, SimpleObject)] #[graphql(name = "ProcessJob", unresolvable = "processingJobId")] pub struct ProcessJob { + #[graphql(flatten)] + /// Represents Processing Job table pub processing_job: ProcessingJob, + /// Represents Processing Job Parameters table pub parameters: Option, } +/// Combines auto proc integration and its programs #[derive(Debug, Clone, SimpleObject)] -#[graphql(name = "AutoProcessing", unresolvable = "auto_proc_integration_id")] +#[graphql( + name = "AutoProcessing", + unresolvable = "autoProcIntegrationId", + complex +)] pub struct AutoProcessing { + #[graphql(flatten)] + /// Represents auto proc integration table pub auto_proc_integration: AutoProcIntegration, + /// Represents auto proc program table pub auto_proc_program: Option, } + +/// Combines autoproc and its scaling and statistics +#[derive(Debug, Clone, SimpleObject)] +#[graphql(name = "AutoProcess", unresolvable = "autoProcId", complex)] +pub struct AutoProcess { + #[graphql(flatten)] + /// Represents autoproc table + pub auto_proc: AutoProc, + /// Represents auto proc scaling table + pub auto_proc_scaling: Option, +} diff --git a/processed_data/src/graphql/mod.rs b/processed_data/src/graphql/mod.rs index 9858217..44a98df 100644 --- a/processed_data/src/graphql/mod.rs +++ b/processed_data/src/graphql/mod.rs @@ -7,17 +7,16 @@ use async_graphql::{ }; use aws_sdk_s3::presigning::PresigningConfig; use entities::{ - AutoProc, AutoProcIntegration, AutoProcScaling, AutoProcScalingStatics, AutoProcessing, - DataCollection, DataProcessing, ProcessJob, ProcessingJob, ProcessingJobParameter, + AutoProc, AutoProcIntegration, AutoProcScaling, AutoProcScalingStatics, AutoProcess, + AutoProcessing, DataCollection, DataProcessing, ProcessJob, ProcessingJob, + ProcessingJobParameter, }; use models::{ auto_proc, auto_proc_integration, auto_proc_program, auto_proc_scaling, auto_proc_scaling_statistics, data_collection_file_attachment, processing_job, processing_job_parameter, }; -use sea_orm::{ - ColumnTrait, DatabaseConnection, EntityTrait, JoinType, QueryFilter, QuerySelect, RelationTrait, -}; +use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use std::collections::HashMap; use std::time::Duration; use tracing::{instrument, Span}; @@ -28,7 +27,9 @@ use self::entities::AutoProcProgram; /// The GraphQL schema exposed by the service pub type RootSchema = Schema; +/// router handler extension pub trait AddDataLoadersExt { + /// Adds dataloader to graphql request fn add_data_loaders(self, database: DatabaseConnection) -> Self; } @@ -39,7 +40,7 @@ impl AddDataLoadersExt for async_graphql::Request { tokio::spawn, )) .data(DataLoader::new( - ProcessingJobDataLoader::new(database.clone()), + ProcessJobDataLoader::new(database.clone()), tokio::spawn, )) .data(DataLoader::new( @@ -47,11 +48,7 @@ impl AddDataLoadersExt for async_graphql::Request { tokio::spawn, )) .data(DataLoader::new( - AutoProcDataLoader::new(database.clone()), - tokio::spawn, - )) - .data(DataLoader::new( - AutoProcScalingDataLoader::new(database.clone()), + AutoProcessingDataLoader::new(database.clone()), tokio::spawn, )) .data(DataLoader::new( @@ -78,42 +75,51 @@ pub fn root_schema_builder() -> SchemaBuilder Self { Self { database, @@ -122,6 +128,7 @@ impl ProcessingJobDataLoader { } } +#[allow(clippy::missing_docs_in_private_items)] impl ProcessedDataLoader { fn new(database: DatabaseConnection) -> Self { Self { @@ -131,6 +138,7 @@ impl ProcessedDataLoader { } } +#[allow(clippy::missing_docs_in_private_items)] impl AutoProcIntegrationDataLoader { fn new(database: DatabaseConnection) -> Self { Self { @@ -140,16 +148,8 @@ impl AutoProcIntegrationDataLoader { } } -impl AutoProcDataLoader { - fn new(database: DatabaseConnection) -> Self { - Self { - database, - parent_span: Span::current(), - } - } -} - -impl AutoProcScalingDataLoader { +#[allow(clippy::missing_docs_in_private_items)] +impl AutoProcessingDataLoader { fn new(database: DatabaseConnection) -> Self { Self { database, @@ -158,6 +158,7 @@ impl AutoProcScalingDataLoader { } } +#[allow(clippy::missing_docs_in_private_items)] impl AutoProcScalingOverall { fn new(database: DatabaseConnection) -> Self { Self { @@ -167,6 +168,7 @@ impl AutoProcScalingOverall { } } +#[allow(clippy::missing_docs_in_private_items)] impl AutoProcScalingInnerShell { fn new(database: DatabaseConnection) -> Self { Self { @@ -176,6 +178,7 @@ impl AutoProcScalingInnerShell { } } +#[allow(clippy::missing_docs_in_private_items)] impl AutoProcScalingOuterShell { fn new(database: DatabaseConnection) -> Self { Self { @@ -210,13 +213,13 @@ impl Loader for ProcessedDataLoader { } } -impl Loader for ProcessingJobDataLoader { +impl Loader for ProcessJobDataLoader { type Value = Vec; type Error = async_graphql::Error; - #[instrument(name = "load_processing_job", skip(self))] + #[instrument(name = "load_process_job", skip(self))] async fn load(&self, keys: &[u32]) -> Result, Self::Error> { - let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data"); + let span = tracing::info_span!(parent: &self.parent_span, "load_process_job"); let _span = span.enter(); let mut results = HashMap::new(); let keys_vec: Vec = keys.to_vec(); @@ -249,19 +252,11 @@ impl Loader for AutoProcIntegrationDataLoader { #[instrument(name = "load_auto_proc_integration", skip(self))] async fn load(&self, keys: &[u32]) -> Result, Self::Error> { - let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data"); + let span = tracing::info_span!(parent: &self.parent_span, "load_auto_proc_integration"); let _span = span.enter(); let mut results = HashMap::new(); let keys_vec: Vec = keys.to_vec(); let records = auto_proc_integration::Entity::find() - // .join_rev( - // JoinType::InnerJoin, - // auto_proc_program::Entity::belongs_to(auto_proc_integration::Entity) - // .from(auto_proc_program::Column::AutoProcProgramId) - // .to(auto_proc_integration::Column::AutoProcProgramId) - // .into() - // ) - // .join(JoinType::InnerJoin, auto_proc_program::Relation::AutoProcIntegration.def()) .find_also_related(auto_proc_program::Entity) .filter(auto_proc_integration::Column::DataCollectionId.is_in(keys_vec)) .all(&self.database) @@ -285,50 +280,31 @@ impl Loader for AutoProcIntegrationDataLoader { } } -impl Loader for AutoProcDataLoader { - type Value = AutoProc; +impl Loader for AutoProcessingDataLoader { + type Value = AutoProcess; type Error = async_graphql::Error; - #[instrument(name = "load_auto_proc", skip(self))] + #[instrument(name = "load_process", skip(self))] async fn load(&self, keys: &[u32]) -> Result, Self::Error> { - let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data"); + let span = tracing::info_span!(parent: &self.parent_span, "load_process"); let _span = span.enter(); let mut results = HashMap::new(); let keys_vec: Vec = keys.to_vec(); let records = auto_proc::Entity::find() .filter(auto_proc::Column::AutoProcProgramId.is_in(keys_vec)) + .find_also_related(auto_proc_scaling::Entity) .all(&self.database) - .await?; - - for record in records { - let program_id = record.auto_proc_program_id.unwrap(); - let data = AutoProc::from(record); - results.insert(program_id, data); - } - - Ok(results) - } -} - -impl Loader for AutoProcScalingDataLoader { - type Value = AutoProcScaling; - type Error = async_graphql::Error; - - #[instrument(name = "load_auto_proc_scaling", skip(self))] - async fn load(&self, keys: &[u32]) -> Result, Self::Error> { - let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data"); - let _span = span.enter(); - let mut results = HashMap::new(); - let keys_vec: Vec = keys.to_vec(); - let records = auto_proc_scaling::Entity::find() - .filter(auto_proc_scaling::Column::AutoProcId.is_in(keys_vec)) - .all(&self.database) - .await?; + .await? + .into_iter() + .map(|(auto_proc, scaling)| AutoProcess { + auto_proc: AutoProc::from(auto_proc), + auto_proc_scaling: scaling.map(AutoProcScaling::from), + }) + .collect::>(); for record in records { - let auto_proc_id = record.auto_proc_id.unwrap(); - let data = AutoProcScaling::from(record); - results.insert(auto_proc_id, data); + let program_id = record.auto_proc.auto_proc_program_id.unwrap(); + results.insert(program_id, record); } Ok(results) @@ -339,9 +315,9 @@ impl Loader for AutoProcScalingOverall { type Value = AutoProcScalingStatics; type Error = async_graphql::Error; - #[instrument(name = "load_auto_proc_scaling_statics", skip(self))] + #[instrument(name = "load_auto_proc_overall", skip(self))] async fn load(&self, keys: &[u32]) -> Result, Self::Error> { - let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data"); + let span = tracing::info_span!(parent: &self.parent_span, "load_auto_proc_overall"); let _span = span.enter(); let mut results = HashMap::new(); let keys_vec: Vec = keys.to_vec(); @@ -365,9 +341,9 @@ impl Loader for AutoProcScalingInnerShell { type Value = AutoProcScalingStatics; type Error = async_graphql::Error; - #[instrument(name = "load_auto_proc_scaling_statics", skip(self))] + #[instrument(name = "load_auto_proc_innershell", skip(self))] async fn load(&self, keys: &[u32]) -> Result, Self::Error> { - let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data"); + let span = tracing::info_span!(parent: &self.parent_span, "load_auto_proc_innershedll"); let _span = span.enter(); let mut results = HashMap::new(); let keys_vec: Vec = keys.to_vec(); @@ -391,9 +367,10 @@ impl Loader for AutoProcScalingOuterShell { type Value = AutoProcScalingStatics; type Error = async_graphql::Error; - #[instrument(name = "load_auto_proc_scaling_statics", skip(self))] + #[instrument(name = "load_auto_proc_scaling_outershell", skip(self))] async fn load(&self, keys: &[u32]) -> Result, Self::Error> { - let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data"); + let span = + tracing::info_span!(parent: &self.parent_span, "load_auto_proc_scaling_outershell"); let _span = span.enter(); let mut results = HashMap::new(); let keys_vec: Vec = keys.to_vec(); @@ -429,7 +406,7 @@ impl DataCollection { &self, ctx: &Context<'_>, ) -> async_graphql::Result>, async_graphql::Error> { - let loader = ctx.data_unchecked::>(); + let loader = ctx.data_unchecked::>(); loader.load_one(self.id).await } @@ -463,48 +440,51 @@ impl DataProcessing { } #[ComplexObject] -impl AutoProcProgram { +impl AutoProcessing { /// Fetched the automatic process - async fn auto_proc(&self, ctx: &Context<'_>) -> async_graphql::Result> { - let loader = ctx.data_unchecked::>(); - loader.load_one(self.auto_proc_program_id).await - } -} - -#[ComplexObject] -impl AutoProc { - /// Fetches the scaling for automatic process - async fn scaling(&self, ctx: &Context<'_>) -> async_graphql::Result> { - let loader = ctx.data_unchecked::>(); - loader.load_one(self.auto_proc_id).await + async fn auto_proc(&self, ctx: &Context<'_>) -> async_graphql::Result> { + let loader = ctx.data_unchecked::>(); + let id = self.auto_proc_integration.auto_proc_program_id; + loader.load_one(id.unwrap()).await } } #[ComplexObject] -impl AutoProcScaling { - /// Fetches the scaling statistics +impl AutoProcess { + /// Fetches the overall scaling statistics type async fn overall( &self, ctx: &Context<'_>, ) -> async_graphql::Result> { let loader = ctx.data_unchecked::>(); - loader.load_one(self.auto_proc_scaling_id).await + let id = as Clone>::clone(&self.auto_proc_scaling) + .unwrap() + .auto_proc_id; + loader.load_one(id.unwrap()).await } + /// Fetches the innershell scaling statistics type async fn inner_shell( &self, ctx: &Context<'_>, ) -> async_graphql::Result> { let loader = ctx.data_unchecked::>(); - loader.load_one(self.auto_proc_scaling_id).await + let id = as Clone>::clone(&self.auto_proc_scaling) + .unwrap() + .auto_proc_id; + loader.load_one(id.unwrap()).await } + /// Fetches the outershell scaling statistics type async fn outer_shell( &self, ctx: &Context<'_>, ) -> async_graphql::Result> { let loader = ctx.data_unchecked::>(); - loader.load_one(self.auto_proc_scaling_id).await + let id = as Clone>::clone(&self.auto_proc_scaling) + .unwrap() + .auto_proc_id; + loader.load_one(id.unwrap()).await } } diff --git a/processed_data/src/main.rs b/processed_data/src/main.rs index aad8308..f648afa 100644 --- a/processed_data/src/main.rs +++ b/processed_data/src/main.rs @@ -126,6 +126,7 @@ struct SchemaArgs { /// The path to write the schema to, if not set the schema will be printed to stdout #[arg(short, long)] path: Option, + /// The URL of the ISPyB instance which should be connected to #[arg(long, env = "DATABASE_URL")] database_url: Url, } diff --git a/processed_data/src/route_handlers.rs b/processed_data/src/route_handlers.rs index bb5ff74..f5b33ba 100644 --- a/processed_data/src/route_handlers.rs +++ b/processed_data/src/route_handlers.rs @@ -17,6 +17,7 @@ use crate::graphql::AddDataLoadersExt; pub struct GraphQLHandler { /// The GraphQL executor used to process the request executor: E, + /// Database connection database: DatabaseConnection, }