Skip to content

Commit

Permalink
Cleanup upload logic for better efficiency and more indepth timing
Browse files Browse the repository at this point in the history
  • Loading branch information
ChillFish8 committed Apr 3, 2022
1 parent 9b18cc3 commit 5058714
Showing 1 changed file with 60 additions and 48 deletions.
108 changes: 60 additions & 48 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::hash::Hash;
use std::sync::Arc;
use std::time::Instant;
use bytes::Bytes;
use once_cell::sync::OnceCell;
use uuid::Uuid;
use poem_openapi::Object;
use tokio::sync::{Semaphore, SemaphorePermit};
use tokio::time::Instant;
use crate::cache::{Cache, global_cache};

use crate::config::{BucketConfig, ImageKind};
Expand Down Expand Up @@ -62,6 +62,9 @@ pub struct UploadInfo {
/// The time spent processing the image in seconds.
processing_time: f32,

/// The time spent uploading the image to the persistent store.
io_time: f32,

/// The crc32 checksum of the uploaded image.
checksum: u32,

Expand All @@ -77,7 +80,7 @@ pub struct UploadInfo {

pub struct BucketController {
bucket_id: u32,
cache: Option<Cache>,
cache: Option<Arc<Cache>>,
global_limiter: Option<Arc<Semaphore>>,
config: BucketConfig,
pipeline: PipelineController,
Expand All @@ -96,7 +99,7 @@ impl BucketController {
) -> Self {
Self {
bucket_id,
cache,
cache: cache.map(Arc::new),
global_limiter,
limiter: config.max_concurrency.map(Semaphore::new),
config,
Expand All @@ -114,44 +117,27 @@ impl BucketController {
debug!("Uploading processed image with kind: {:?} and is {} bytes in size.", kind, data.len());

let _permit = get_optional_permit(&self.global_limiter, &self.limiter).await?;
let start = Instant::now();

let processing_start = Instant::now();
let checksum = crc32fast::hash(&data);
let pipeline = self.pipeline.clone();
let result = tokio::task::spawn_blocking(move || {
pipeline.on_upload(kind, data)
}).await??;
let processing_time = processing_start.elapsed();

let mut image_upload_info = vec![];
let image_id = Uuid::new_v4();
for store_entry in result.result.to_store {
self.storage
.store(
self.bucket_id,
image_id,
store_entry.kind,
store_entry.sizing_id,
store_entry.data.clone(),
).await?;

image_upload_info.push(ImageUploadInfo { sizing_id: store_entry.sizing_id });
if let Some(ref cache) = self.cache {
let cache_key = self.cache_key(
store_entry.sizing_id,
image_id,
store_entry.kind,
);

cache.insert(cache_key, store_entry.data);
}
}
let io_start = Instant::now();
let image_upload_info = self.concurrent_upload(image_id, result.result.to_store).await?;
let io_time = io_start.elapsed();

Ok(UploadInfo {
checksum,
image_id,
bucket_id: self.bucket_id,
images: image_upload_info,
processing_time: start.elapsed().as_secs_f32(),
processing_time: processing_time.as_secs_f32(),
io_time: io_time.as_secs_f32(),
})
}

Expand Down Expand Up @@ -227,26 +213,7 @@ impl BucketController {
pipeline.on_fetch(desired_kind, retrieved_kind, data, sizing_id, custom_sizing)
}).await??;

let mut tasks = vec![];
for store_entry in result.result.to_store {
let storage = self.storage.clone();
let bucket_id = self.bucket_id;
let t = tokio::spawn(async move {
storage.store(
bucket_id,
image_id,
store_entry.kind,
store_entry.sizing_id,
store_entry.data,
).await
});

tasks.push(t);
}

for task in tasks {
task.await??;
}
self.concurrent_upload(image_id, result.result.to_store).await?;

Ok(result.result.response)
}
Expand Down Expand Up @@ -288,7 +255,7 @@ impl BucketController {
) -> anyhow::Result<Option<Bytes>> {
let maybe_cache_backend = self.cache
.as_ref()
.map(Some)
.map(|v| Some(v.as_ref()))
.unwrap_or_else(global_cache);

let cache_key = self.cache_key(sizing_id, image_id, fetch_kind);
Expand All @@ -314,4 +281,49 @@ impl BucketController {

Ok(maybe_existing)
}

async fn concurrent_upload(
&self,
image_id: Uuid,
to_store: Vec<StoreEntry>,
) -> anyhow::Result<Vec<ImageUploadInfo>> {
let mut image_upload_info = vec![];
let mut tasks = vec![];
for store_entry in to_store {
image_upload_info.push(ImageUploadInfo { sizing_id: store_entry.sizing_id });
let storage = self.storage.clone();
let bucket_id = self.bucket_id;
let cache = self.cache.clone();
let cache_key = self.cache_key(
store_entry.sizing_id,
image_id,
store_entry.kind,
);

let t = tokio::spawn(async move {
storage.store(
bucket_id,
image_id,
store_entry.kind,
store_entry.sizing_id,
store_entry.data.clone(),
).await?;

if let Some(ref cache) = cache {
cache.insert(cache_key, store_entry.data);
}

Ok::<_, anyhow::Error>(())
});

tasks.push(t);
}

for task in tasks {
task.await??;
}

Ok(image_upload_info)
}
}

0 comments on commit 5058714

Please sign in to comment.