Skip to content

Commit

Permalink
Dataloader testing
Browse files Browse the repository at this point in the history
  • Loading branch information
iamvigneshwars committed Apr 10, 2024
1 parent 295a3ea commit 95999cd
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 22 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion charts/processed_data/charts/processed_data/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ type: application

version: 0.1.0

appVersion: 0.1.0-rc4
appVersion: latest
1 change: 1 addition & 0 deletions processed_data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async-graphql = { version = "7.0.2", default-features = false, features = [
"chrono",
"graphiql",
"tracing",
"dataloader",
] }
async-graphql-axum = { version = "7.0.2" }
aws-credential-types = { version = "0.56.0" }
Expand Down
67 changes: 55 additions & 12 deletions processed_data/src/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
mod entities;
use crate::S3Bucket;
use async_graphql::{
ComplexObject, Context, EmptyMutation, EmptySubscription, Object, Schema, SchemaBuilder,
dataloader::{DataLoader, Loader}, ComplexObject, Context, EmptyMutation, EmptySubscription, Object, Schema, SchemaBuilder
};
use aws_sdk_s3::presigning::PresigningConfig;
use std::collections::HashMap;
use entities::{
AutoProc, AutoProcIntegration, AutoProcScaling, AutoProcScalingStatics, DataCollection,
DataProcessing, ProcessingJob, ProcessingJobParameter,
Expand All @@ -24,29 +25,62 @@ use self::entities::AutoProcProgram;
pub type RootSchema = Schema<Query, EmptyMutation, EmptySubscription>;

/// A schema builder for the service
pub fn root_schema_builder() -> SchemaBuilder<Query, EmptyMutation, EmptySubscription> {
Schema::build(Query, EmptyMutation, EmptySubscription).enable_federation()
pub fn root_schema_builder(database: DatabaseConnection) -> SchemaBuilder<Query, EmptyMutation, EmptySubscription> {
Schema::build(Query, EmptyMutation, EmptySubscription)
.data(DataLoader::new(
DataCollectionLoader::new(database.clone()),
tokio::spawn,
))
.data(database)
.enable_federation()
}

/// The root query of the service
#[derive(Debug, Clone, Default)]
pub struct Query;

pub struct DataCollectionLoader(DatabaseConnection);

impl DataCollectionLoader {
fn new(database: DatabaseConnection) -> Self {
Self ( database )
}
}

impl Loader<u32> for DataCollectionLoader {
type Value = Vec<DataProcessing>;
type Error = async_graphql::Error;

async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {

let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.iter().cloned().collect();
print!("DEBUG#############: {:?}", keys_vec);
let records = data_collection_file_attachment::Entity::find()
.filter(data_collection_file_attachment::Column::DataCollectionId.is_in(keys_vec))
.all(&self.0)
.await?;

for record in records {
let data_collection_id = record.data_collection_id;
let data = DataProcessing::from(record);

results.entry(data_collection_id).or_insert_with(Vec::new).push(data);
}

Ok(results)
}
}

#[ComplexObject]
impl DataCollection {
/// Fetched all the processed data from data collection during a session
async fn processed_data(
&self,
ctx: &Context<'_>,
) -> Result<Vec<DataProcessing>, async_graphql::Error> {
let database = ctx.data::<DatabaseConnection>()?;
Ok(data_collection_file_attachment::Entity::find()
.filter(data_collection_file_attachment::Column::DataCollectionId.eq(self.id))
.all(database)
.await?
.into_iter()
.map(DataProcessing::from)
.collect())
) -> Result<Option<Vec<DataProcessing>>, async_graphql::Error> {
let loader = ctx.data_unchecked::<DataLoader<DataCollectionLoader>>();
Ok(loader.load_one(self.id).await?)
}

/// Fetched all the processing jobs
Expand Down Expand Up @@ -185,4 +219,13 @@ impl Query {
async fn router_data_collection(&self, id: u32) -> DataCollection {
DataCollection { id }
}

async fn processed_data(
&self,
ctx: &Context<'_>,
id: u32,
) -> Result<Option<Vec<DataProcessing>>, async_graphql::Error> {
let loader = ctx.data_unchecked::<DataLoader<DataCollectionLoader>>();
Ok(loader.load_one(id).await?)
}
}
19 changes: 10 additions & 9 deletions processed_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,19 +239,20 @@ async fn main() {
Cli::Serve(args) => {
setup_telemetry(args.log_level, args.otel_collector_url).unwrap();
let database = setup_database(args.database_url).await.unwrap();
let schema = root_schema_builder().data(database).finish();
let schema = root_schema_builder(database).finish();
let router = setup_router(schema);
serve(router, args.port).await.unwrap();
}
Cli::Schema(args) => {

Check warning on line 246 in processed_data/src/main.rs

View workflow job for this annotation

GitHub Actions / generate

unused variable: `args`

Check warning on line 246 in processed_data/src/main.rs

View workflow job for this annotation

GitHub Actions / test

unused variable: `args`
let schema = root_schema_builder().finish();
let schema_string = schema.sdl_with_options(SDLExportOptions::new().federation());
if let Some(path) = args.path {
let mut file = File::create(path).unwrap();
file.write_all(schema_string.as_bytes()).unwrap();
} else {
println!("{}", schema_string)
}
// let database = setup_database(args.database_url).await.unwrap();
// let schema = root_schema_builder(database).finish();
// let schema_string = schema.sdl_with_options(SDLExportOptions::new().federation());
// if let Some(path) = args.path {
// let mut file = File::create(path).unwrap();
// file.write_all(schema_string.as_bytes()).unwrap();
// } else {
// println!("{}", schema_string)
// }
}
}
}

0 comments on commit 95999cd

Please sign in to comment.