diff --git a/Cargo.toml b/Cargo.toml index e60073f..68c2de9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,37 +11,40 @@ keywords = ["object-storage", "minio", "s3"] categories = ["api-bindings", "web-programming::http-client"] [dependencies] -hyper = { version = "0.14.27", features = ["full"] } -tokio = { version = "1.32.0", features = ["full"] } -derivative = "2.2.0" -multimap = "0.9.0" -urlencoding = "2.1.3" -lazy_static = "1.4.0" -regex = "1.9.4" -chrono = "0.4.27" -sha2 = "0.10.7" +async-recursion = "1.0.4" base64 = "0.21.3" -md5 = "0.7.0" -crc = "3.0.1" byteorder = "1.4.3" -hmac = "0.12.1" -hex = "0.4.3" -futures-core = "0.3.28" bytes = "1.4.0" +chrono = "0.4.27" +crc = "3.0.1" +dashmap = "5.5.3" +derivative = "2.2.0" futures-util = "0.3.28" -xmltree = "0.10.3" +hex = "0.4.3" +hmac = "0.12.1" http = "0.2.9" -dashmap = "5.5.3" +hyper = { version = "0.14.27", features = ["full"] } +lazy_static = "1.4.0" +md5 = "0.7.0" +multimap = "0.9.0" +os_info = "3.7.0" rand = "0.8.5" +regex = "1.9.4" serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.105" -async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } -async-recursion = "1.0.4" -os_info = "3.7.0" +sha2 = "0.10.7" +tokio = { version = "1.32.0", features = ["full"] } +tokio-stream = "0.1.14" +tokio-util = { version = "0.7.8", features = ["io"] } +urlencoding = "2.1.3" +xmltree = "0.10.3" [dependencies.reqwest] version = "0.11.20" features = ["native-tls", "blocking", "rustls-tls", "stream"] +[dev-dependencies] +async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } + [[example]] name = "file-uploader" diff --git a/src/s3/args.rs b/src/s3/args.rs index 7173a99..cb5414a 100644 --- a/src/s3/args.rs +++ b/src/s3/args.rs @@ -19,8 +19,8 @@ use crate::s3::error::Error; use crate::s3::signer::post_presign_v4; use crate::s3::sse::{Sse, SseCustomerKey}; use crate::s3::types::{ - DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, NotificationRecords, - ObjectLockConfig, Part, ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig, + DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part, + ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig, }; use crate::s3::utils::{ b64encode, check_bucket_name, merge, to_amz_date, to_http_header_value, to_iso8601utc, @@ -1262,18 +1262,18 @@ impl<'a> SelectObjectContentArgs<'a> { } /// Argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API -pub struct ListenBucketNotificationArgs<'a> { - pub extra_headers: Option<&'a Multimap>, - pub extra_query_params: Option<&'a Multimap>, - pub region: Option<&'a str>, - pub bucket: &'a str, - pub prefix: Option<&'a str>, - pub suffix: Option<&'a str>, - pub events: Option>, - pub event_fn: &'a (dyn Fn(NotificationRecords) -> bool + Send + Sync), +#[derive(Clone, Debug)] +pub struct ListenBucketNotificationArgs { + pub extra_headers: Option, + pub extra_query_params: Option, + pub region: Option, + pub bucket: String, + pub prefix: Option, + pub suffix: Option, + pub events: Option>, } -impl<'a> ListenBucketNotificationArgs<'a> { +impl ListenBucketNotificationArgs { /// Returns argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API with given bucket name and callback function for results. /// /// # Examples @@ -1281,35 +1281,19 @@ impl<'a> ListenBucketNotificationArgs<'a> { /// ``` /// use minio::s3::args::*; /// use minio::s3::types::NotificationRecords; - /// let event_fn = |event: NotificationRecords| { - /// for record in event.records.iter() { - /// if let Some(s3) = &record.s3 { - /// if let Some(object) = &s3.object { - /// if let Some(key) = &object.key { - /// println!("{:?} {:?}", record.event_name, key); - /// } - /// } - /// } - /// } - /// true - /// }; - /// let args = ListenBucketNotificationArgs::new("my-bucket", &event_fn).unwrap(); + /// + /// let args = ListenBucketNotificationArgs::new("my-bucket").unwrap(); /// ``` - pub fn new( - bucket_name: &'a str, - event_fn: &'a (dyn Fn(NotificationRecords) -> bool + Send + Sync), - ) -> Result, Error> { + pub fn new(bucket_name: &str) -> Result { check_bucket_name(bucket_name, true)?; - Ok(ListenBucketNotificationArgs { extra_headers: None, extra_query_params: None, region: None, - bucket: bucket_name, + bucket: bucket_name.to_owned(), prefix: None, suffix: None, events: None, - event_fn, }) } } diff --git a/src/s3/client.rs b/src/s3/client.rs index ecb2374..e83ae6f 100644 --- a/src/s3/client.rs +++ b/src/s3/client.rs @@ -23,8 +23,8 @@ use crate::s3::response::*; use crate::s3::signer::{presign_v4, sign_v4_s3}; use crate::s3::sse::SseCustomerKey; use crate::s3::types::{ - Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, - NotificationRecords, ObjectLockConfig, Part, ReplicationConfig, RetentionMode, SseConfig, + Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig, + Part, ReplicationConfig, RetentionMode, SseConfig, }; use crate::s3::utils::{ from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash, @@ -36,7 +36,7 @@ use dashmap::DashMap; use hyper::http::Method; use os_info; use reqwest::header::HeaderMap; -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::fs::File; use std::io::prelude::*; use std::io::Read; @@ -44,6 +44,10 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use xmltree::Element; +mod listen_bucket_notification; + +pub use listen_bucket_notification::*; + fn url_decode( encoding_type: &Option, prefix: Option, @@ -2429,96 +2433,6 @@ impl Client { }) } - pub async fn listen_bucket_notification( - &self, - args: &ListenBucketNotificationArgs<'_>, - ) -> Result { - if self.base_url.is_aws_host() { - return Err(Error::UnsupportedApi(String::from( - "ListenBucketNotification", - ))); - } - - let region = self.get_region(args.bucket, args.region).await?; - - let mut headers = Multimap::new(); - if let Some(v) = &args.extra_headers { - merge(&mut headers, v); - } - - let mut query_params = Multimap::new(); - if let Some(v) = &args.extra_query_params { - merge(&mut query_params, v); - } - if let Some(v) = args.prefix { - query_params.insert(String::from("prefix"), v.to_string()); - } - if let Some(v) = args.suffix { - query_params.insert(String::from("suffix"), v.to_string()); - } - if let Some(v) = &args.events { - for e in v.iter() { - query_params.insert(String::from("events"), e.to_string()); - } - } else { - query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*")); - query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*")); - query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*")); - } - - let mut resp = self - .execute( - Method::GET, - ®ion, - &mut headers, - &query_params, - Some(args.bucket), - None, - None, - ) - .await?; - - let header_map = resp.headers().clone(); - - let mut done = false; - let mut buf = VecDeque::::new(); - while !done { - let chunk = match resp.chunk().await? { - Some(v) => v, - None => { - done = true; - Bytes::new() - } - }; - buf.extend(chunk.iter().copied()); - - while !done { - match buf.iter().position(|&v| v == b'\n') { - Some(i) => { - let mut data = vec![0_u8; i + 1]; - #[allow(clippy::needless_range_loop)] - for j in 0..=i { - data[j] = buf.pop_front().ok_or(Error::InsufficientData(i, j))?; - } - let mut line = String::from_utf8(data)?; - line = line.trim().to_string(); - if !line.is_empty() { - let records: NotificationRecords = serde_json::from_str(&line)?; - done = !(args.event_fn)(records); - } - } - None => break, - }; - } - } - - Ok(ListenBucketNotificationResponse::new( - header_map, - ®ion, - args.bucket, - )) - } - /// Executes [ListObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html) S3 API pub async fn list_objects_v1( &self, diff --git a/src/s3/client/listen_bucket_notification.rs b/src/s3/client/listen_bucket_notification.rs new file mode 100644 index 0000000..881cd68 --- /dev/null +++ b/src/s3/client/listen_bucket_notification.rs @@ -0,0 +1,134 @@ +// MinIO Rust Library for Amazon S3 Compatible Cloud Storage +// Copyright 2023 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! MinIO Extension API for S3 Buckets: ListenBucketNotification + +use futures_util::stream; +use http::Method; +use tokio::io::AsyncBufReadExt; +use tokio_stream::{Stream, StreamExt}; +use tokio_util::io::StreamReader; + +use crate::s3::{ + args::ListenBucketNotificationArgs, + error::Error, + response::ListenBucketNotificationResponse, + types::NotificationRecords, + utils::{merge, Multimap}, +}; + +use super::Client; + +impl Client { + /// Listens for bucket notifications. This is MinIO extension API. This + /// function returns a tuple of `ListenBucketNotificationResponse` and a + /// stream of `NotificationRecords`. The former contains the HTTP headers + /// returned by the server and the latter is a stream of notification + /// records. In normal operation (when there are no errors), the stream + /// never ends. + pub async fn listen_bucket_notification( + &self, + args: ListenBucketNotificationArgs, + ) -> Result< + ( + ListenBucketNotificationResponse, + impl Stream>, + ), + Error, + > { + if self.base_url.is_aws_host() { + return Err(Error::UnsupportedApi(String::from( + "ListenBucketNotification", + ))); + } + + let region = self + .get_region(&args.bucket, args.region.as_deref()) + .await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + if let Some(v) = args.prefix { + query_params.insert(String::from("prefix"), v.to_string()); + } + if let Some(v) = args.suffix { + query_params.insert(String::from("suffix"), v.to_string()); + } + if let Some(v) = &args.events { + for e in v.iter() { + query_params.insert(String::from("events"), e.to_string()); + } + } else { + query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*")); + query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*")); + query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*")); + } + + let resp = self + .execute( + Method::GET, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await?; + + let header_map = resp.headers().clone(); + + let body_stream = resp.bytes_stream(); + let body_stream = body_stream + .map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))); + let stream_reader = StreamReader::new(body_stream); + + let record_stream = Box::pin(stream::unfold( + stream_reader, + move |mut reader| async move { + loop { + let mut line = String::new(); + match reader.read_line(&mut line).await { + Ok(n) => { + if n == 0 { + return None; + } + let s = line.trim(); + if s.is_empty() { + continue; + } + let records_res: Result = + serde_json::from_str(&s).map_err(|e| e.into()); + return Some((records_res, reader)); + } + Err(e) => return Some((Err(e.into()), reader)), + } + } + }, + )); + + Ok(( + ListenBucketNotificationResponse::new(header_map, ®ion, &args.bucket), + record_stream, + )) + } +} diff --git a/tests/tests.rs b/tests/tests.rs index 33aed54..f482090 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -15,6 +15,7 @@ use async_std::task; use chrono::Duration; +use futures_util::stream::StreamExt; use hyper::http::Method; use minio::s3::types::NotificationRecords; use rand::distributions::{Alphanumeric, DistString}; @@ -564,8 +565,14 @@ impl ClientTest { false }; - let args = &ListenBucketNotificationArgs::new(&test_bucket, &event_fn).unwrap(); - client.listen_bucket_notification(args).await.unwrap(); + let args = ListenBucketNotificationArgs::new(&test_bucket).unwrap(); + let (_, mut event_stream) = client.listen_bucket_notification(args).await.unwrap(); + while let Some(event) = event_stream.next().await { + let event = event.unwrap(); + if !event_fn(event) { + break; + } + } }; let spawned_task = task::spawn(listen_task());