From 95999cd8db76c19053d565dbd3875ac4f1ef54d1 Mon Sep 17 00:00:00 2001 From: iamvigneshwars Date: Wed, 10 Apr 2024 11:36:12 +0000 Subject: [PATCH] Dataloader testing --- Cargo.lock | 18 +++++ .../charts/processed_data/Chart.yaml | 2 +- processed_data/Cargo.toml | 1 + processed_data/src/graphql/mod.rs | 67 +++++++++++++++---- processed_data/src/main.rs | 19 +++--- 5 files changed, 85 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8506c74..d864bf8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -162,10 +162,13 @@ dependencies = [ "bytes", "chrono", "fnv", + "futures-channel", + "futures-timer", "futures-util", "handlebars", "http 1.1.0", "indexmap 2.2.5", + "lru", "mime", "multer", "num-traits", @@ -1363,6 +1366,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.30" @@ -1886,6 +1895,15 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "lru" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a" +dependencies = [ + "hashbrown 0.12.3", +] + [[package]] name = "matchit" version = "0.7.3" diff --git a/charts/processed_data/charts/processed_data/Chart.yaml b/charts/processed_data/charts/processed_data/Chart.yaml index d50a2bd..9eeea22 100644 --- a/charts/processed_data/charts/processed_data/Chart.yaml +++ b/charts/processed_data/charts/processed_data/Chart.yaml @@ -5,4 +5,4 @@ type: application version: 0.1.0 -appVersion: 0.1.0-rc4 +appVersion: latest diff --git a/processed_data/Cargo.toml b/processed_data/Cargo.toml index 1eee7e9..e9ad768 100644 --- a/processed_data/Cargo.toml +++ b/processed_data/Cargo.toml @@ -11,6 +11,7 @@ async-graphql = { version = "7.0.2", default-features = false, features = [ "chrono", "graphiql", "tracing", + "dataloader", ] } async-graphql-axum = { version = "7.0.2" } aws-credential-types = { version = "0.56.0" } diff --git a/processed_data/src/graphql/mod.rs b/processed_data/src/graphql/mod.rs index bf30fe3..a7148c9 100644 --- a/processed_data/src/graphql/mod.rs +++ b/processed_data/src/graphql/mod.rs @@ -2,9 +2,10 @@ mod entities; use crate::S3Bucket; use async_graphql::{ - ComplexObject, Context, EmptyMutation, EmptySubscription, Object, Schema, SchemaBuilder, + dataloader::{DataLoader, Loader}, ComplexObject, Context, EmptyMutation, EmptySubscription, Object, Schema, SchemaBuilder }; use aws_sdk_s3::presigning::PresigningConfig; +use std::collections::HashMap; use entities::{ AutoProc, AutoProcIntegration, AutoProcScaling, AutoProcScalingStatics, DataCollection, DataProcessing, ProcessingJob, ProcessingJobParameter, @@ -24,29 +25,62 @@ use self::entities::AutoProcProgram; pub type RootSchema = Schema; /// A schema builder for the service -pub fn root_schema_builder() -> SchemaBuilder { - Schema::build(Query, EmptyMutation, EmptySubscription).enable_federation() +pub fn root_schema_builder(database: DatabaseConnection) -> SchemaBuilder { + Schema::build(Query, EmptyMutation, EmptySubscription) + .data(DataLoader::new( + DataCollectionLoader::new(database.clone()), + tokio::spawn, + )) + .data(database) + .enable_federation() } /// The root query of the service #[derive(Debug, Clone, Default)] pub struct Query; +pub struct DataCollectionLoader(DatabaseConnection); + +impl DataCollectionLoader { + fn new(database: DatabaseConnection) -> Self { + Self ( database ) + } +} + +impl Loader for DataCollectionLoader { + type Value = Vec; + type Error = async_graphql::Error; + + async fn load(&self, keys: &[u32]) -> Result, Self::Error> { + + let mut results = HashMap::new(); + let keys_vec: Vec = keys.iter().cloned().collect(); + print!("DEBUG#############: {:?}", keys_vec); + let records = data_collection_file_attachment::Entity::find() + .filter(data_collection_file_attachment::Column::DataCollectionId.is_in(keys_vec)) + .all(&self.0) + .await?; + + for record in records { + let data_collection_id = record.data_collection_id; + let data = DataProcessing::from(record); + + results.entry(data_collection_id).or_insert_with(Vec::new).push(data); + } + + Ok(results) + } +} + #[ComplexObject] impl DataCollection { /// Fetched all the processed data from data collection during a session async fn processed_data( &self, ctx: &Context<'_>, - ) -> Result, async_graphql::Error> { - let database = ctx.data::()?; - Ok(data_collection_file_attachment::Entity::find() - .filter(data_collection_file_attachment::Column::DataCollectionId.eq(self.id)) - .all(database) - .await? - .into_iter() - .map(DataProcessing::from) - .collect()) + ) -> Result>, async_graphql::Error> { + let loader = ctx.data_unchecked::>(); + Ok(loader.load_one(self.id).await?) } /// Fetched all the processing jobs @@ -185,4 +219,13 @@ impl Query { async fn router_data_collection(&self, id: u32) -> DataCollection { DataCollection { id } } + + async fn processed_data( + &self, + ctx: &Context<'_>, + id: u32, + ) -> Result>, async_graphql::Error> { + let loader = ctx.data_unchecked::>(); + Ok(loader.load_one(id).await?) + } } diff --git a/processed_data/src/main.rs b/processed_data/src/main.rs index f6732cf..6850a60 100644 --- a/processed_data/src/main.rs +++ b/processed_data/src/main.rs @@ -239,19 +239,20 @@ async fn main() { Cli::Serve(args) => { setup_telemetry(args.log_level, args.otel_collector_url).unwrap(); let database = setup_database(args.database_url).await.unwrap(); - let schema = root_schema_builder().data(database).finish(); + let schema = root_schema_builder(database).finish(); let router = setup_router(schema); serve(router, args.port).await.unwrap(); } Cli::Schema(args) => { - let schema = root_schema_builder().finish(); - let schema_string = schema.sdl_with_options(SDLExportOptions::new().federation()); - if let Some(path) = args.path { - let mut file = File::create(path).unwrap(); - file.write_all(schema_string.as_bytes()).unwrap(); - } else { - println!("{}", schema_string) - } + // let database = setup_database(args.database_url).await.unwrap(); + // let schema = root_schema_builder(database).finish(); + // let schema_string = schema.sdl_with_options(SDLExportOptions::new().federation()); + // if let Some(path) = args.path { + // let mut file = File::create(path).unwrap(); + // file.write_all(schema_string.as_bytes()).unwrap(); + // } else { + // println!("{}", schema_string) + // } } } }