Skip to content

Commit 72ff620

Browse files
authored
[ENH] Add a delete_many call to the storage API. (#5020)
## Description of changes This adds a delete-many call to storage that can batch a delete of many keys to S3 storage. The goal is to reduce GC's CPU overhead from many serial deletes. ## Test plan - [X] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes N/A
1 parent 837d824 commit 72ff620

File tree

3 files changed

+135
-4
lines changed

3 files changed

+135
-4
lines changed

rust/storage/src/admissioncontrolleds3.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -489,8 +489,15 @@ impl AdmissionControlledS3Storage {
489489
self.storage.list_prefix(prefix).await
490490
}
491491

492-
pub async fn delete(&self, prefix: &str, options: DeleteOptions) -> Result<(), StorageError> {
493-
self.storage.delete(prefix, options).await
492+
pub async fn delete(&self, key: &str, options: DeleteOptions) -> Result<(), StorageError> {
493+
self.storage.delete(key, options).await
494+
}
495+
496+
pub async fn delete_many<S: AsRef<str> + std::fmt::Debug, I: IntoIterator<Item = S>>(
497+
&self,
498+
keys: I,
499+
) -> Result<crate::s3::DeletedObjects, StorageError> {
500+
self.storage.delete_many(keys).await
494501
}
495502
}
496503

rust/storage/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,18 @@ impl Storage {
316316
}
317317
}
318318

319+
pub async fn delete_many<S: AsRef<str> + std::fmt::Debug, I: IntoIterator<Item = S>>(
320+
&self,
321+
keys: I,
322+
) -> Result<crate::s3::DeletedObjects, StorageError> {
323+
match self {
324+
Storage::ObjectStore(_) => Err(StorageError::NotImplemented),
325+
Storage::S3(s3) => s3.delete_many(keys).await,
326+
Storage::Local(_) => Err(StorageError::NotImplemented),
327+
Storage::AdmissionControlledS3(ac) => ac.delete_many(keys).await,
328+
}
329+
}
330+
319331
pub async fn rename(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError> {
320332
match self {
321333
Storage::ObjectStore(object_store) => object_store.rename(src_key, dst_key).await,

rust/storage/src/s3.rs

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ use aws_sdk_s3::error::SdkError;
2323
use aws_sdk_s3::operation::create_bucket::CreateBucketError;
2424
use aws_sdk_s3::operation::get_object::GetObjectOutput;
2525
use aws_sdk_s3::primitives::ByteStream;
26-
use aws_sdk_s3::types::CompletedMultipartUpload;
27-
use aws_sdk_s3::types::CompletedPart;
26+
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier};
2827
use aws_smithy_types::byte_stream::Length;
2928
use bytes::Bytes;
3029
use chroma_config::registry::Registry;
@@ -43,6 +42,12 @@ use std::time::Duration;
4342
use tokio::io::AsyncReadExt;
4443
use tracing::Instrument;
4544

45+
#[derive(Clone, Debug, Default)]
46+
pub struct DeletedObjects {
47+
pub deleted: Vec<String>,
48+
pub errors: Vec<StorageError>,
49+
}
50+
4651
#[derive(Clone)]
4752
pub struct S3Storage {
4853
pub(super) bucket: String,
@@ -659,6 +664,67 @@ impl S3Storage {
659664
}
660665
}
661666

667+
#[tracing::instrument(skip(self, keys), level = "trace")]
668+
pub async fn delete_many<S: AsRef<str> + std::fmt::Debug, I: IntoIterator<Item = S>>(
669+
&self,
670+
keys: I,
671+
) -> Result<DeletedObjects, StorageError> {
672+
tracing::trace!("Deleting objects from S3");
673+
let mut objects = vec![];
674+
for key in keys {
675+
objects.push(
676+
ObjectIdentifier::builder()
677+
.key(key.as_ref())
678+
.build()
679+
.map_err(|err| StorageError::Generic {
680+
source: Arc::new(err),
681+
})?,
682+
);
683+
}
684+
let delete = Delete::builder()
685+
.set_objects(Some(objects))
686+
.build()
687+
.map_err(|err| StorageError::Generic {
688+
source: Arc::new(err),
689+
})?;
690+
691+
let req = self
692+
.client
693+
.delete_objects()
694+
.bucket(&self.bucket)
695+
.delete(delete);
696+
697+
match req.send().await {
698+
Ok(resp) => {
699+
let mut out = DeletedObjects::default();
700+
for deleted in resp.deleted() {
701+
if let Some(key) = deleted.key.clone() {
702+
out.deleted.push(key);
703+
}
704+
}
705+
for error in resp.errors() {
706+
out.errors.push(if Some("NoSuchKey") == error.code() {
707+
StorageError::NotFound {
708+
path: error.key.clone().unwrap_or(String::new()),
709+
source: Arc::new(StorageError::Message {
710+
message: format!("{error:#?}"),
711+
}),
712+
}
713+
} else {
714+
StorageError::Message {
715+
message: format!("{error:#?}"),
716+
}
717+
});
718+
}
719+
tracing::trace!("Successfully deleted objects from S3");
720+
Ok(out)
721+
}
722+
Err(e) => Err(StorageError::Generic {
723+
source: Arc::new(e),
724+
}),
725+
}
726+
}
727+
662728
#[tracing::instrument(skip(self), level = "trace")]
663729
pub async fn rename(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError> {
664730
// S3 doesn't have a native rename operation, so we need to copy and delete
@@ -1214,4 +1280,50 @@ mod tests {
12141280
assert_eq!(format!("test/{:02x}", i), *result, "index = {}", i);
12151281
}
12161282
}
1283+
1284+
#[tokio::test]
1285+
async fn test_k8s_integration_delete_many() {
1286+
let storage = setup_with_bucket(1024 * 1024 * 8, 1024 * 1024 * 8).await;
1287+
1288+
// Create every other file (0, 2, 4, 6, 8, 10, 12, 14)
1289+
let mut created_files = Vec::new();
1290+
for i in (0..16).step_by(2) {
1291+
let key = format!("test/{:02x}", i);
1292+
storage
1293+
.oneshot_upload(
1294+
&key,
1295+
0,
1296+
|_| Box::pin(ready(Ok(ByteStream::from(Bytes::new())))) as _,
1297+
PutOptions {
1298+
if_not_exists: true,
1299+
if_match: None,
1300+
priority: StorageRequestPriority::P0,
1301+
},
1302+
)
1303+
.await
1304+
.unwrap();
1305+
created_files.push(key);
1306+
}
1307+
1308+
// Try to delete all files (0-15), including ones that don't exist
1309+
let mut all_keys = Vec::new();
1310+
for i in 0..16 {
1311+
all_keys.push(format!("test/{:02x}", i));
1312+
}
1313+
let all_key_refs: Vec<&str> = all_keys.iter().map(|s| s.as_str()).collect();
1314+
1315+
let delete_result = storage.delete_many(&all_key_refs).await.unwrap();
1316+
1317+
// Verify that every created file appears in the deleted files
1318+
for created_file in &created_files {
1319+
assert!(
1320+
delete_result.deleted.contains(created_file),
1321+
"Created file {} should be in deleted files",
1322+
created_file
1323+
);
1324+
}
1325+
1326+
eprintln!("Successfully deleted: {:#?}", delete_result.deleted);
1327+
eprintln!("Errors for non-existent files: {:#?}", delete_result.errors);
1328+
}
12171329
}

0 commit comments

Comments
 (0)