diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index dde5c7eee..620d59c1b 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -288,12 +288,8 @@ impl ObjectStore { { let object_meta: ObjectMetadata = meta.into(); - let encoded_object_name = encode_object_name(&object_meta.name); - if !is_valid_object_name(&encoded_object_name) { - return Err(PutError::new(PutErrorKind::InvalidName)); - } // Fetch any existing object info, if there is any for later use. - let maybe_existing_object_info = match self.info(&encoded_object_name).await { + let maybe_existing_object_info = match self.info(&object_meta.name).await { Ok(object_info) => Some(object_info), Err(_) => None, }; @@ -343,7 +339,13 @@ impl ObjectStore { })?; } let digest = sha256.finish(); + + let encoded_object_name = encode_object_name(&object_meta.name); + if !is_valid_object_name(&encoded_object_name) { + return Err(PutError::new(PutErrorKind::InvalidName)); + } let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name); + let object_info = ObjectInfo { name: object_meta.name, description: object_meta.description, diff --git a/async-nats/tests/object_store.rs b/async-nats/tests/object_store.rs index d2d588564..ca664a1e3 100644 --- a/async-nats/tests/object_store.rs +++ b/async-nats/tests/object_store.rs @@ -105,6 +105,63 @@ mod object_store { ); } + #[tokio::test] + async fn purge_existing_object_on_put() { + async fn at_least_one_message_contains_nuid_in_subject( + client: &async_nats::Client, + stream: &mut async_nats::jetstream::stream::Stream, + nuid: &str, + ) -> bool { + let message_count = stream.info().await.unwrap().state.messages; + stream + .create_consumer(async_nats::jetstream::consumer::push::OrderedConfig { + deliver_subject: client.new_inbox(), + ..Default::default() + }) + .await + .unwrap() + .messages() + .await + .unwrap() + .take(message_count as usize) + .any(|msg| { + let cond = msg.unwrap().message.subject.contains(nuid); + async move { cond } + }) + .await + } + + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let jetstream = async_nats::jetstream::new(client.clone()); + + let bucket_name = "bucket"; + let stream_name = format!("OBJ_{}", bucket_name); + + let bucket = jetstream + .create_object_store(async_nats::jetstream::object_store::Config { + bucket: bucket_name.to_owned(), + ..Default::default() + }) + .await + .unwrap(); + + let mut stream = jetstream.get_stream(stream_name).await.unwrap(); + + let info = bucket.put("file", &mut b"foo".as_slice()).await.unwrap(); + + assert!( + at_least_one_message_contains_nuid_in_subject(&client, &mut stream, &info.nuid).await + ); + + bucket.put("file", &mut b"bar".as_slice()).await.unwrap(); + + assert!( + !at_least_one_message_contains_nuid_in_subject(&client, &mut stream, &info.nuid).await + ); + } + #[tokio::test] async fn watch() { let server = nats_server::run_server("tests/configs/jetstream.conf");