From ccd1ff4a2557d247eaf19429ca84bea164f0a367 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Thu, 28 Sep 2023 22:04:41 -0700 Subject: [PATCH 1/2] Modify list_bucket API to use iteration style - Add doc on list_bucket_notification API. --- src/s3/args.rs | 48 ++--- src/s3/client.rs | 100 +--------- src/s3/client/listen_bucket_notification.rs | 200 ++++++++++++++++++++ tests/tests.rs | 12 +- 4 files changed, 233 insertions(+), 127 deletions(-) create mode 100644 src/s3/client/listen_bucket_notification.rs diff --git a/src/s3/args.rs b/src/s3/args.rs index 7173a99..cb5414a 100644 --- a/src/s3/args.rs +++ b/src/s3/args.rs @@ -19,8 +19,8 @@ use crate::s3::error::Error; use crate::s3::signer::post_presign_v4; use crate::s3::sse::{Sse, SseCustomerKey}; use crate::s3::types::{ - DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, NotificationRecords, - ObjectLockConfig, Part, ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig, + DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part, + ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig, }; use crate::s3::utils::{ b64encode, check_bucket_name, merge, to_amz_date, to_http_header_value, to_iso8601utc, @@ -1262,18 +1262,18 @@ impl<'a> SelectObjectContentArgs<'a> { } /// Argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API -pub struct ListenBucketNotificationArgs<'a> { - pub extra_headers: Option<&'a Multimap>, - pub extra_query_params: Option<&'a Multimap>, - pub region: Option<&'a str>, - pub bucket: &'a str, - pub prefix: Option<&'a str>, - pub suffix: Option<&'a str>, - pub events: Option>, - pub event_fn: &'a (dyn Fn(NotificationRecords) -> bool + Send + Sync), +#[derive(Clone, Debug)] +pub struct ListenBucketNotificationArgs { + pub extra_headers: Option, + pub extra_query_params: Option, + pub region: Option, + pub bucket: String, + pub prefix: Option, + pub suffix: Option, + pub events: Option>, } -impl<'a> ListenBucketNotificationArgs<'a> { +impl ListenBucketNotificationArgs { /// Returns argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API with given bucket name and callback function for results. /// /// # Examples @@ -1281,35 +1281,19 @@ impl<'a> ListenBucketNotificationArgs<'a> { /// ``` /// use minio::s3::args::*; /// use minio::s3::types::NotificationRecords; - /// let event_fn = |event: NotificationRecords| { - /// for record in event.records.iter() { - /// if let Some(s3) = &record.s3 { - /// if let Some(object) = &s3.object { - /// if let Some(key) = &object.key { - /// println!("{:?} {:?}", record.event_name, key); - /// } - /// } - /// } - /// } - /// true - /// }; - /// let args = ListenBucketNotificationArgs::new("my-bucket", &event_fn).unwrap(); + /// + /// let args = ListenBucketNotificationArgs::new("my-bucket").unwrap(); /// ``` - pub fn new( - bucket_name: &'a str, - event_fn: &'a (dyn Fn(NotificationRecords) -> bool + Send + Sync), - ) -> Result, Error> { + pub fn new(bucket_name: &str) -> Result { check_bucket_name(bucket_name, true)?; - Ok(ListenBucketNotificationArgs { extra_headers: None, extra_query_params: None, region: None, - bucket: bucket_name, + bucket: bucket_name.to_owned(), prefix: None, suffix: None, events: None, - event_fn, }) } } diff --git a/src/s3/client.rs b/src/s3/client.rs index ecb2374..e83ae6f 100644 --- a/src/s3/client.rs +++ b/src/s3/client.rs @@ -23,8 +23,8 @@ use crate::s3::response::*; use crate::s3::signer::{presign_v4, sign_v4_s3}; use crate::s3::sse::SseCustomerKey; use crate::s3::types::{ - Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, - NotificationRecords, ObjectLockConfig, Part, ReplicationConfig, RetentionMode, SseConfig, + Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig, + Part, ReplicationConfig, RetentionMode, SseConfig, }; use crate::s3::utils::{ from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash, @@ -36,7 +36,7 @@ use dashmap::DashMap; use hyper::http::Method; use os_info; use reqwest::header::HeaderMap; -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::fs::File; use std::io::prelude::*; use std::io::Read; @@ -44,6 +44,10 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use xmltree::Element; +mod listen_bucket_notification; + +pub use listen_bucket_notification::*; + fn url_decode( encoding_type: &Option, prefix: Option, @@ -2429,96 +2433,6 @@ impl Client { }) } - pub async fn listen_bucket_notification( - &self, - args: &ListenBucketNotificationArgs<'_>, - ) -> Result { - if self.base_url.is_aws_host() { - return Err(Error::UnsupportedApi(String::from( - "ListenBucketNotification", - ))); - } - - let region = self.get_region(args.bucket, args.region).await?; - - let mut headers = Multimap::new(); - if let Some(v) = &args.extra_headers { - merge(&mut headers, v); - } - - let mut query_params = Multimap::new(); - if let Some(v) = &args.extra_query_params { - merge(&mut query_params, v); - } - if let Some(v) = args.prefix { - query_params.insert(String::from("prefix"), v.to_string()); - } - if let Some(v) = args.suffix { - query_params.insert(String::from("suffix"), v.to_string()); - } - if let Some(v) = &args.events { - for e in v.iter() { - query_params.insert(String::from("events"), e.to_string()); - } - } else { - query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*")); - query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*")); - query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*")); - } - - let mut resp = self - .execute( - Method::GET, - ®ion, - &mut headers, - &query_params, - Some(args.bucket), - None, - None, - ) - .await?; - - let header_map = resp.headers().clone(); - - let mut done = false; - let mut buf = VecDeque::::new(); - while !done { - let chunk = match resp.chunk().await? { - Some(v) => v, - None => { - done = true; - Bytes::new() - } - }; - buf.extend(chunk.iter().copied()); - - while !done { - match buf.iter().position(|&v| v == b'\n') { - Some(i) => { - let mut data = vec![0_u8; i + 1]; - #[allow(clippy::needless_range_loop)] - for j in 0..=i { - data[j] = buf.pop_front().ok_or(Error::InsufficientData(i, j))?; - } - let mut line = String::from_utf8(data)?; - line = line.trim().to_string(); - if !line.is_empty() { - let records: NotificationRecords = serde_json::from_str(&line)?; - done = !(args.event_fn)(records); - } - } - None => break, - }; - } - } - - Ok(ListenBucketNotificationResponse::new( - header_map, - ®ion, - args.bucket, - )) - } - /// Executes [ListObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html) S3 API pub async fn list_objects_v1( &self, diff --git a/src/s3/client/listen_bucket_notification.rs b/src/s3/client/listen_bucket_notification.rs new file mode 100644 index 0000000..9ec06a9 --- /dev/null +++ b/src/s3/client/listen_bucket_notification.rs @@ -0,0 +1,200 @@ +use std::collections::VecDeque; + +use bytes::{Bytes, BytesMut}; +use futures_core::Stream; +use futures_util::stream; +use http::Method; + +use crate::s3::{ + args::ListenBucketNotificationArgs, + error::Error, + response::ListenBucketNotificationResponse, + types::NotificationRecords, + utils::{merge, Multimap}, +}; + +use super::Client; + +impl Client { + /// Listens for bucket notifications. This is MinIO extension API. This + /// function returns a tuple of `ListenBucketNotificationResponse` and a + /// stream of `NotificationRecords`. The former contains the HTTP headers + /// returned by the server and the latter is a stream of notification + /// records. In normal operation (when there are no errors), the stream + /// never ends. + pub async fn listen_bucket_notification( + &self, + args: ListenBucketNotificationArgs, + ) -> Result< + ( + ListenBucketNotificationResponse, + impl Stream>, + ), + Error, + > { + if self.base_url.is_aws_host() { + return Err(Error::UnsupportedApi(String::from( + "ListenBucketNotification", + ))); + } + + let region = self + .get_region(&args.bucket, args.region.as_deref()) + .await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + if let Some(v) = args.prefix { + query_params.insert(String::from("prefix"), v.to_string()); + } + if let Some(v) = args.suffix { + query_params.insert(String::from("suffix"), v.to_string()); + } + if let Some(v) = &args.events { + for e in v.iter() { + query_params.insert(String::from("events"), e.to_string()); + } + } else { + query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*")); + query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*")); + query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*")); + } + + let resp = self + .execute( + Method::GET, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await?; + + let header_map = resp.headers().clone(); + + let line = BytesMut::with_capacity(16 * 1024); + let lines: VecDeque = VecDeque::new(); + + // We use a stream::unfold to process the response body. The unfold + // state consists of the current (possibly incomplete) line , a deque of + // (complete) lines extracted from the response body and the response + // itself wrapped in an Option (the Option is to indicate if the + // response body has been fully consumed). The unfold operation here + // generates a stream of notification records. + let record_stream = Box::pin(stream::unfold( + (line, lines, Some(resp)), + move |(mut line, mut lines, mut resp_opt)| async move { + loop { + // 1. If we have some lines in the deque, deserialize and return them. + while let Some(v) = lines.pop_front() { + let s = match String::from_utf8((&v).to_vec()) { + Err(e) => return Some((Err(e.into()), (line, lines, resp_opt))), + Ok(s) => { + let s = s.trim().to_string(); + // Skip empty strings. + if s.is_empty() { + continue; + } + s + } + }; + let records_res: Result = + serde_json::from_str(&s).map_err(|e| e.into()); + return Some((records_res, (line, lines, resp_opt))); + } + + // At this point `lines` is empty. We may have a partial line in + // `line`. We now process the next chunk in the response body. + + if resp_opt.is_none() { + if line.len() > 0 { + // Since we have no more chunks to process, we + // consider this as a complete line and deserialize + // it in the next loop iteration. + lines.push_back(line.freeze()); + line = BytesMut::with_capacity(16 * 1024); + continue; + } + // We have no more chunks to process, no partial line + // and no more lines to return. So we are done. + return None; + } + + // Attempt to read the next chunk of the response. + let next_chunk_res = resp_opt.as_mut().map(|r| r.chunk()).unwrap().await; + let mut done = false; + let chunk = match next_chunk_res { + Err(e) => return Some((Err(e.into()), (line, lines, None))), + Ok(Some(chunk)) => chunk, + Ok(None) => { + done = true; + Bytes::new() + } + }; + + // Now we process the chunk. The `.split()` splits the chunk + // around each newline character. + // + // For e.g. "\nab\nc\n\n" becomes ["", "ab", "c", "", ""]. + // + // This means that a newline was found in the chunk only + // when `.split()` returns at least 2 elements. The main + // tricky situation is when a line is split across chunks. + // We use the length of `lines_in_chunk` to determine if + // this is the case. + let lines_in_chunk = chunk.split(|&v| v == b'\n').collect::>(); + + if lines_in_chunk.len() == 1 { + // No newline found in the chunk. So we just append the + // chunk to the current line and continue to the next + // chunk. + line.extend_from_slice(&chunk); + continue; + } + + // At least one newline was found in the chunk. + for (i, chunk_line) in lines_in_chunk.iter().enumerate() { + if i == 0 { + // The first split component in the chunk completes + // the line. + line.extend_from_slice(chunk_line); + lines.push_back(line.freeze()); + line = BytesMut::with_capacity(16 * 1024); + continue; + } + if i == lines_in_chunk.len() - 1 { + // The last split component in the chunk is a + // partial line. We append it to the current line + // (which will be empty because we just re-created + // it). + line.extend_from_slice(chunk_line); + continue; + } + + lines.push_back(Bytes::copy_from_slice(chunk_line)); + } + + if done { + lines.push_back(line.freeze()); + line = BytesMut::with_capacity(16 * 1024); + resp_opt = None; + } + } + }, + )); + + Ok(( + ListenBucketNotificationResponse::new(header_map, ®ion, &args.bucket), + record_stream, + )) + } +} diff --git a/tests/tests.rs b/tests/tests.rs index 33aed54..619fa20 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -15,6 +15,7 @@ use async_std::task; use chrono::Duration; +use futures_util::stream::StreamExt; use hyper::http::Method; use minio::s3::types::NotificationRecords; use rand::distributions::{Alphanumeric, DistString}; @@ -564,8 +565,15 @@ impl ClientTest { false }; - let args = &ListenBucketNotificationArgs::new(&test_bucket, &event_fn).unwrap(); - client.listen_bucket_notification(args).await.unwrap(); + let args = ListenBucketNotificationArgs::new(&test_bucket).unwrap(); + let (_, mut event_stream) = client.listen_bucket_notification(args).await.unwrap(); + while let Some(event) = event_stream.next().await { + // println!("event: {:?}", event); + let event = event.unwrap(); + if !event_fn(event) { + break; + } + } }; let spawned_task = task::spawn(listen_task()); From dad5a31a81ffe99dff070a4db937bf2dd5cb96fd Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Fri, 29 Sep 2023 11:59:04 -0700 Subject: [PATCH 2/2] Use tokio_util's StreamReader for simpler impl --- Cargo.toml | 41 +++--- src/s3/client/listen_bucket_notification.rs | 142 ++++++-------------- tests/tests.rs | 1 - 3 files changed, 60 insertions(+), 124 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e60073f..68c2de9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,37 +11,40 @@ keywords = ["object-storage", "minio", "s3"] categories = ["api-bindings", "web-programming::http-client"] [dependencies] -hyper = { version = "0.14.27", features = ["full"] } -tokio = { version = "1.32.0", features = ["full"] } -derivative = "2.2.0" -multimap = "0.9.0" -urlencoding = "2.1.3" -lazy_static = "1.4.0" -regex = "1.9.4" -chrono = "0.4.27" -sha2 = "0.10.7" +async-recursion = "1.0.4" base64 = "0.21.3" -md5 = "0.7.0" -crc = "3.0.1" byteorder = "1.4.3" -hmac = "0.12.1" -hex = "0.4.3" -futures-core = "0.3.28" bytes = "1.4.0" +chrono = "0.4.27" +crc = "3.0.1" +dashmap = "5.5.3" +derivative = "2.2.0" futures-util = "0.3.28" -xmltree = "0.10.3" +hex = "0.4.3" +hmac = "0.12.1" http = "0.2.9" -dashmap = "5.5.3" +hyper = { version = "0.14.27", features = ["full"] } +lazy_static = "1.4.0" +md5 = "0.7.0" +multimap = "0.9.0" +os_info = "3.7.0" rand = "0.8.5" +regex = "1.9.4" serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.105" -async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } -async-recursion = "1.0.4" -os_info = "3.7.0" +sha2 = "0.10.7" +tokio = { version = "1.32.0", features = ["full"] } +tokio-stream = "0.1.14" +tokio-util = { version = "0.7.8", features = ["io"] } +urlencoding = "2.1.3" +xmltree = "0.10.3" [dependencies.reqwest] version = "0.11.20" features = ["native-tls", "blocking", "rustls-tls", "stream"] +[dev-dependencies] +async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } + [[example]] name = "file-uploader" diff --git a/src/s3/client/listen_bucket_notification.rs b/src/s3/client/listen_bucket_notification.rs index 9ec06a9..881cd68 100644 --- a/src/s3/client/listen_bucket_notification.rs +++ b/src/s3/client/listen_bucket_notification.rs @@ -1,9 +1,25 @@ -use std::collections::VecDeque; +// MinIO Rust Library for Amazon S3 Compatible Cloud Storage +// Copyright 2023 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! MinIO Extension API for S3 Buckets: ListenBucketNotification -use bytes::{Bytes, BytesMut}; -use futures_core::Stream; use futures_util::stream; use http::Method; +use tokio::io::AsyncBufReadExt; +use tokio_stream::{Stream, StreamExt}; +use tokio_util::io::StreamReader; use crate::s3::{ args::ListenBucketNotificationArgs, @@ -81,112 +97,30 @@ impl Client { let header_map = resp.headers().clone(); - let line = BytesMut::with_capacity(16 * 1024); - let lines: VecDeque = VecDeque::new(); + let body_stream = resp.bytes_stream(); + let body_stream = body_stream + .map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))); + let stream_reader = StreamReader::new(body_stream); - // We use a stream::unfold to process the response body. The unfold - // state consists of the current (possibly incomplete) line , a deque of - // (complete) lines extracted from the response body and the response - // itself wrapped in an Option (the Option is to indicate if the - // response body has been fully consumed). The unfold operation here - // generates a stream of notification records. let record_stream = Box::pin(stream::unfold( - (line, lines, Some(resp)), - move |(mut line, mut lines, mut resp_opt)| async move { + stream_reader, + move |mut reader| async move { loop { - // 1. If we have some lines in the deque, deserialize and return them. - while let Some(v) = lines.pop_front() { - let s = match String::from_utf8((&v).to_vec()) { - Err(e) => return Some((Err(e.into()), (line, lines, resp_opt))), - Ok(s) => { - let s = s.trim().to_string(); - // Skip empty strings. - if s.is_empty() { - continue; - } - s + let mut line = String::new(); + match reader.read_line(&mut line).await { + Ok(n) => { + if n == 0 { + return None; } - }; - let records_res: Result = - serde_json::from_str(&s).map_err(|e| e.into()); - return Some((records_res, (line, lines, resp_opt))); - } - - // At this point `lines` is empty. We may have a partial line in - // `line`. We now process the next chunk in the response body. - - if resp_opt.is_none() { - if line.len() > 0 { - // Since we have no more chunks to process, we - // consider this as a complete line and deserialize - // it in the next loop iteration. - lines.push_back(line.freeze()); - line = BytesMut::with_capacity(16 * 1024); - continue; - } - // We have no more chunks to process, no partial line - // and no more lines to return. So we are done. - return None; - } - - // Attempt to read the next chunk of the response. - let next_chunk_res = resp_opt.as_mut().map(|r| r.chunk()).unwrap().await; - let mut done = false; - let chunk = match next_chunk_res { - Err(e) => return Some((Err(e.into()), (line, lines, None))), - Ok(Some(chunk)) => chunk, - Ok(None) => { - done = true; - Bytes::new() - } - }; - - // Now we process the chunk. The `.split()` splits the chunk - // around each newline character. - // - // For e.g. "\nab\nc\n\n" becomes ["", "ab", "c", "", ""]. - // - // This means that a newline was found in the chunk only - // when `.split()` returns at least 2 elements. The main - // tricky situation is when a line is split across chunks. - // We use the length of `lines_in_chunk` to determine if - // this is the case. - let lines_in_chunk = chunk.split(|&v| v == b'\n').collect::>(); - - if lines_in_chunk.len() == 1 { - // No newline found in the chunk. So we just append the - // chunk to the current line and continue to the next - // chunk. - line.extend_from_slice(&chunk); - continue; - } - - // At least one newline was found in the chunk. - for (i, chunk_line) in lines_in_chunk.iter().enumerate() { - if i == 0 { - // The first split component in the chunk completes - // the line. - line.extend_from_slice(chunk_line); - lines.push_back(line.freeze()); - line = BytesMut::with_capacity(16 * 1024); - continue; - } - if i == lines_in_chunk.len() - 1 { - // The last split component in the chunk is a - // partial line. We append it to the current line - // (which will be empty because we just re-created - // it). - line.extend_from_slice(chunk_line); - continue; + let s = line.trim(); + if s.is_empty() { + continue; + } + let records_res: Result = + serde_json::from_str(&s).map_err(|e| e.into()); + return Some((records_res, reader)); } - - lines.push_back(Bytes::copy_from_slice(chunk_line)); - } - - if done { - lines.push_back(line.freeze()); - line = BytesMut::with_capacity(16 * 1024); - resp_opt = None; + Err(e) => return Some((Err(e.into()), reader)), } } }, diff --git a/tests/tests.rs b/tests/tests.rs index 619fa20..f482090 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -568,7 +568,6 @@ impl ClientTest { let args = ListenBucketNotificationArgs::new(&test_bucket).unwrap(); let (_, mut event_stream) = client.listen_bucket_notification(args).await.unwrap(); while let Some(event) = event_stream.next().await { - // println!("event: {:?}", event); let event = event.unwrap(); if !event_fn(event) { break;