Skip to content

Commit

Permalink
Delete old objects when calling ObjectStore::put
Browse files Browse the repository at this point in the history
* Delete old objects when calling `ObjectStore::put`
* Add `object_store::purge_existing_object_on_put` test
  • Loading branch information
thinety authored Jan 30, 2025
1 parent 98896d6 commit 7a094ff
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 5 deletions.
12 changes: 7 additions & 5 deletions async-nats/src/jetstream/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,8 @@ impl ObjectStore {
{
let object_meta: ObjectMetadata = meta.into();

let encoded_object_name = encode_object_name(&object_meta.name);
if !is_valid_object_name(&encoded_object_name) {
return Err(PutError::new(PutErrorKind::InvalidName));
}
// Fetch any existing object info, if there is any for later use.
let maybe_existing_object_info = match self.info(&encoded_object_name).await {
let maybe_existing_object_info = match self.info(&object_meta.name).await {
Ok(object_info) => Some(object_info),
Err(_) => None,
};
Expand Down Expand Up @@ -343,7 +339,13 @@ impl ObjectStore {
})?;
}
let digest = sha256.finish();

let encoded_object_name = encode_object_name(&object_meta.name);
if !is_valid_object_name(&encoded_object_name) {
return Err(PutError::new(PutErrorKind::InvalidName));
}
let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);

let object_info = ObjectInfo {
name: object_meta.name,
description: object_meta.description,
Expand Down
57 changes: 57 additions & 0 deletions async-nats/tests/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,63 @@ mod object_store {
);
}

#[tokio::test]
async fn purge_existing_object_on_put() {
async fn at_least_one_message_contains_nuid_in_subject(
client: &async_nats::Client,
stream: &mut async_nats::jetstream::stream::Stream,
nuid: &str,
) -> bool {
let message_count = stream.info().await.unwrap().state.messages;
stream
.create_consumer(async_nats::jetstream::consumer::push::OrderedConfig {
deliver_subject: client.new_inbox(),
..Default::default()
})
.await
.unwrap()
.messages()
.await
.unwrap()
.take(message_count as usize)
.any(|msg| {
let cond = msg.unwrap().message.subject.contains(nuid);
async move { cond }
})
.await
}

let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();

let jetstream = async_nats::jetstream::new(client.clone());

let bucket_name = "bucket";
let stream_name = format!("OBJ_{}", bucket_name);

let bucket = jetstream
.create_object_store(async_nats::jetstream::object_store::Config {
bucket: bucket_name.to_owned(),
..Default::default()
})
.await
.unwrap();

let mut stream = jetstream.get_stream(stream_name).await.unwrap();

let info = bucket.put("file", &mut b"foo".as_slice()).await.unwrap();

assert!(
at_least_one_message_contains_nuid_in_subject(&client, &mut stream, &info.nuid).await
);

bucket.put("file", &mut b"bar".as_slice()).await.unwrap();

assert!(
!at_least_one_message_contains_nuid_in_subject(&client, &mut stream, &info.nuid).await
);
}

#[tokio::test]
async fn watch() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit 7a094ff

Please sign in to comment.