Skip to content

Commit

Permalink
Optimize s3 url generation
Browse files Browse the repository at this point in the history
  • Loading branch information
iamvigneshwars committed Apr 22, 2024
1 parent 708cb79 commit f215d1b
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 84 deletions.
2 changes: 1 addition & 1 deletion charts/processed_data/charts/processed_data/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ type: application

version: 0.1.0

appVersion: 0.1.0-rc13
appVersion: 0.2.0-rc2
19 changes: 19 additions & 0 deletions charts/processed_data/templates/ispyb-secret.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apiVersion: bitnami.com/v1alpha1
kind: SealedSecret
metadata:
annotations:
sealedsecrets.bitnami.com/namespace-wide: "true"
creationTimestamp: null
name: ispyb
namespace: graph
spec:
encryptedData:
password: AgAE9My2sdmPfJCuKFPt5O6CBCAGytIlcrChly3IH2s70PlM/B41Lj4dXId1pCGsZkCtx0CTOfDp6+oJbyP0LA8i7qJkZFqBB/bZpsEU3HR/ki/5//xXyepKoUSbFd3Gu3UFNBVZCTqnMClms9HFVyFaoARYKc3gKLO/CqSbFl+7dWurXO4DC0VvQpmHEZ3MVIrU7b53YPV9r1VbrTntvFwvdM/SBXhNuZVQVeHJmrdbiewSbRLUxSoRWy394PfdHV05LWOVcKkGOHbh9PoJuefu+5j6Xc3H1P1YWteH/0/u67Cnjx2193LU4GJxexoQjBUY6O5EkeOwE5N1hM7xyNPPndCQEHYfSHSZWKR9m6Q8OGAMp/JHIiOiwX/li81Q9s30x0pcNRNjr//+Bxo1eVI1TzXlQ1M8RO2r4q774p7jGtXzo7fhBfjPHm3K0LD4WJovemVybjj2eAwlI+i3b3K56/s6JqdOa42LydxjxZu6LXlWLUKpxE3uQ9SE7RyHq1thWdD2OnR+BjXfoYIaoGVcTiJQpk34zorSgIpwN4WqdTFFM9x/yCrMTAl2gA7AjTNGrTlaTjU7ypbzjwubGDwxCW3XXRTqdlTi0ShEsyT2yqIZQWvibbTUc5GICHsyfcjudmarWZ4VDR9UV4XPRb8cHEJehVhBC8ugxg3ioCv3+AsLv5MOLZfVC+vAHDGyJ6x+XRRZdY8BwJOzEss=
template:
metadata:
annotations:
sealedsecrets.bitnami.com/namespace-wide: "true"
creationTimestamp: null
name: ispyb
namespace: graph

14 changes: 14 additions & 0 deletions charts/processed_data/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,17 @@ processed-data:
secretName: ispyb
secretKey: password
otelCollectorUrl: http://federation-opentelemetry-collector:4317
affinity:
podAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/instance
operator: In
values:
- ispyb
- key: app.kubernetes.io/name
operator: In
values:
- mariadb-galera
topologyKey: kubernetes.io/hostname
28 changes: 14 additions & 14 deletions processed_data/src/graphql/entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl From<Option<FileType>> for AttachmentFileType {
#[graphql(name = "AutoProcFileAttachment", unresolvable, complex)]
pub struct AutoProcFileAttachment {
/// An opaque unique identifier for the autoproc file attachment
#[graphql(skip)]
pub id: u32,
/// An opaque unique identifier for auto proc program
#[graphql(skip)]
Expand All @@ -130,7 +131,6 @@ pub struct AutoProcFileAttachment {
#[graphql(skip)]
pub file_path: Option<String>,
}

impl From<auto_proc_program_attachment::Model> for AutoProcFileAttachment {
fn from(value: auto_proc_program_attachment::Model) -> Self {
Self {
Expand All @@ -143,6 +143,19 @@ impl From<auto_proc_program_attachment::Model> for AutoProcFileAttachment {
}
}

impl AutoProcFileAttachment {
/// S3 bucket object key
pub fn object_key(&self) -> String {
let mut key = std::path::PathBuf::from(
<Option<String> as Clone>::clone(&self.file_path)
.unwrap()
.to_string(),
);
key.push(<Option<String> as Clone>::clone(&self.file_name).unwrap());
key.to_string_lossy().to_string()
}
}

/// Represents a processing job
#[derive(Clone, Debug, PartialEq, SimpleObject)]
#[graphql(name = "ProcessingJobs", unresolvable)]
Expand Down Expand Up @@ -272,19 +285,6 @@ impl From<auto_proc_scaling_statistics::Model> for AutoProcScalingStatics {
}
}

impl AutoProcFileAttachment {
/// S3 bucket object key
pub fn object_key(&self) -> String {
let mut key = std::path::PathBuf::from(
<Option<String> as Clone>::clone(&self.file_path)
.unwrap()
.to_string(),
);
key.push(<Option<String> as Clone>::clone(&self.file_name).unwrap());
key.to_string_lossy().to_string()
}
}

/// Datasets subgraph extension
#[derive(SimpleObject)]
#[graphql(name = "DataCollection", complex)]
Expand Down
62 changes: 24 additions & 38 deletions processed_data/src/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_graphql::{
dataloader::{DataLoader, Loader},
ComplexObject, Context, EmptyMutation, EmptySubscription, Object, Schema, SchemaBuilder,
};
use aws_sdk_s3::{presigning::PresigningConfig, Client};
use aws_sdk_s3::presigning::PresigningConfig;
use entities::{
AutoProcFileAttachment, AutoProcScalingStatics, AutoProcessing, DataCollection, ProcessingJob,
StatisticsType,
Expand All @@ -29,23 +29,13 @@ pub type RootSchema = Schema<Query, EmptyMutation, EmptySubscription>;
/// router handler extension
pub trait AddDataLoadersExt {
/// Adds dataloader to graphql request
fn add_data_loaders(
self,
database: DatabaseConnection,
s3_client: Client,
s3_bucket: S3Bucket,
) -> Self;
fn add_data_loaders(self, database: DatabaseConnection) -> Self;
}

impl AddDataLoadersExt for async_graphql::Request {
fn add_data_loaders(
self,
database: DatabaseConnection,
s3_client: Client,
s3_bucket: S3Bucket,
) -> Self {
fn add_data_loaders(self, database: DatabaseConnection) -> Self {
self.data(DataLoader::new(
FileAttachmentDataLoader::new(database.clone(), s3_client, s3_bucket),
FileAttachmentDataLoader::new(database.clone()),
tokio::spawn,
))
.data(DataLoader::new(
Expand Down Expand Up @@ -77,8 +67,6 @@ pub struct Query;
pub struct FileAttachmentDataLoader {
database: DatabaseConnection,
parent_span: Span,
s3_client: Client,
s3_bucket: S3Bucket,
}
/// DataLoader for Process Job
#[allow(clippy::missing_docs_in_private_items)]
Expand Down Expand Up @@ -111,12 +99,10 @@ impl ProcessingJobDataLoader {

#[allow(clippy::missing_docs_in_private_items)]
impl FileAttachmentDataLoader {
fn new(database: DatabaseConnection, s3_client: Client, s3_bucket: S3Bucket) -> Self {
fn new(database: DatabaseConnection) -> Self {
Self {
database,
parent_span: Span::current(),
s3_client,
s3_bucket,
}
}
}
Expand Down Expand Up @@ -372,25 +358,6 @@ impl DataCollection {
}
}

#[ComplexObject]
impl AutoProcFileAttachment {
/// Gives downloadable link for the processed image in the s3 bucket
async fn file_url(&self, ctx: &Context<'_>) -> async_graphql::Result<String> {
let s3_client = ctx.data::<aws_sdk_s3::Client>()?;
let bucket = ctx.data::<S3Bucket>()?;
let object_uri = s3_client
.get_object()
.bucket(bucket.clone())
.key(self.object_key())
.presigned(PresigningConfig::expires_in(Duration::from_secs(10 * 60))?)
.await?
.uri()
.clone();
let object_url = Url::parse(&object_uri.to_string())?;
Ok(object_url.to_string())
}
}

#[ComplexObject]
impl AutoProcessing {
/// Fetches the overall scaling statistics type
Expand Down Expand Up @@ -442,6 +409,25 @@ impl AutoProcessing {
}
}

#[ComplexObject]
impl AutoProcFileAttachment {
/// Gives downloadable link for the processed image in the s3 bucket
async fn file_url(&self, ctx: &Context<'_>) -> async_graphql::Result<String> {
let s3_client = ctx.data::<aws_sdk_s3::Client>()?;
let bucket = ctx.data::<S3Bucket>()?;
let object_uri = s3_client
.get_object()
.bucket(bucket.clone())
.key(self.object_key())
.presigned(PresigningConfig::expires_in(Duration::from_secs(10 * 60))?)
.await?
.uri()
.clone();
let object_url = Url::parse(&object_uri.to_string())?;
Ok(object_url.to_string())
}
}

#[Object]
impl Query {
/// Reference datasets resolver for the router
Expand Down
16 changes: 7 additions & 9 deletions processed_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,7 @@ async fn setup_database(database_url: Url) -> Result<DatabaseConnection, Transac
}

/// Creates an [`axum::Router`] serving GraphiQL, synchronous GraphQL and GraphQL subscriptions
fn setup_router(
schema: RootSchema,
database: DatabaseConnection,
s3_client: aws_sdk_s3::Client,
s3_bucket: S3Bucket,
) -> Router {
fn setup_router(schema: RootSchema, database: DatabaseConnection) -> Router {
#[allow(clippy::missing_docs_in_private_items)]
const GRAPHQL_ENDPOINT: &str = "/";

Expand All @@ -159,7 +154,7 @@ fn setup_router(
get(Html(
GraphiQLSource::build().endpoint(GRAPHQL_ENDPOINT).finish(),
))
.post(GraphQLHandler::new(schema, database, s3_client, s3_bucket)),
.post(GraphQLHandler::new(schema, database)),
)
.layer(OtelInResponseLayer)
.layer(OtelAxumLayer::default())
Expand Down Expand Up @@ -248,8 +243,11 @@ async fn main() {
setup_telemetry(args.log_level, args.otel_collector_url).unwrap();
let database = setup_database(args.database_url).await.unwrap();
let s3_client = Client::from_s3_client_args(args.s3_client);
let schema = root_schema_builder().finish();
let router = setup_router(schema, database, s3_client, args.s3_bucket);
let schema = root_schema_builder()
.data(s3_client)
.data(args.s3_bucket)
.finish();
let router = setup_router(schema, database);
serve(router, args.port).await.unwrap();
}
Cli::Schema(args) => {
Expand Down
26 changes: 4 additions & 22 deletions processed_data/src/route_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use axum::{
use sea_orm::DatabaseConnection;
use std::{future::Future, pin::Pin};

use crate::{graphql::AddDataLoadersExt, S3Bucket};
use crate::graphql::AddDataLoadersExt;

/// An [`Handler`] which executes an [`Executor`] including the [`Authorization<Bearer>`] in the [`async_graphql::Context`]
#[derive(Debug, Clone)]
Expand All @@ -19,26 +19,12 @@ pub struct GraphQLHandler<E: Executor> {
executor: E,
/// Database connection
database: DatabaseConnection,
/// S3 client
s3_client: aws_sdk_s3::Client,
/// S3 Bucket
s3_bucket: S3Bucket,
}

impl<E: Executor> GraphQLHandler<E> {
/// Constructs an instance of the handler with the provided schema.
pub fn new(
executor: E,
database: DatabaseConnection,
s3_client: aws_sdk_s3::Client,
s3_bucket: S3Bucket,
) -> Self {
Self {
executor,
database,
s3_client,
s3_bucket,
}
pub fn new(executor: E, database: DatabaseConnection) -> Self {
Self { executor, database }
}
}

Expand All @@ -54,11 +40,7 @@ where
match request {
Ok(request) => GraphQLResponse::from(
self.executor
.execute(request.into_inner().add_data_loaders(
self.database,
self.s3_client,
self.s3_bucket,
))
.execute(request.into_inner().add_data_loaders(self.database))
.await,
)
.into_response(),
Expand Down

0 comments on commit f215d1b

Please sign in to comment.