Skip to content

Commit

Permalink
Add builder for ListenBucketNotification
Browse files Browse the repository at this point in the history
- Also update the types used in NotificationRecords
  • Loading branch information
donatello committed Oct 24, 2023
1 parent a03a8bf commit 08d1eb5
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 263 deletions.
37 changes: 0 additions & 37 deletions src/s3/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,43 +1054,6 @@ impl<'a> SelectObjectContentArgs<'a> {
}
}

/// Argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API
#[derive(Clone, Debug)]
pub struct ListenBucketNotificationArgs {
pub extra_headers: Option<Multimap>,
pub extra_query_params: Option<Multimap>,
pub region: Option<String>,
pub bucket: String,
pub prefix: Option<String>,
pub suffix: Option<String>,
pub events: Option<Vec<String>>,
}

impl ListenBucketNotificationArgs {
/// Returns argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API with given bucket name and callback function for results.
///
/// # Examples
///
/// ```
/// use minio::s3::args::*;
/// use minio::s3::types::NotificationRecords;
///
/// let args = ListenBucketNotificationArgs::new("my-bucket").unwrap();
/// ```
pub fn new(bucket_name: &str) -> Result<ListenBucketNotificationArgs, Error> {
check_bucket_name(bucket_name, true)?;
Ok(ListenBucketNotificationArgs {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name.to_owned(),
prefix: None,
suffix: None,
events: None,
})
}
}

#[derive(Clone, Debug, Default)]
/// Argument for [upload_part_copy()](crate::s3::client::Client::upload_part_copy) S3 API
pub struct UploadPartCopyArgs<'a> {
Expand Down
2 changes: 2 additions & 0 deletions src/s3/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@
//! Argument builders for [minio::s3::client::Client](crate::s3::client::Client) APIs
mod list_objects;
mod listen_bucket_notification;

pub use list_objects::*;
pub use listen_bucket_notification::*;
139 changes: 139 additions & 0 deletions src/s3/builders/listen_bucket_notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2023 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use futures_util::Stream;
use http::Method;

use crate::s3::{
client::Client,
error::Error,
response::ListenBucketNotificationResponse,
types::{NotificationRecords, S3Api, S3Request, ToS3Request},
utils::{check_bucket_name, merge, Multimap},
};

/// Argument builder for
/// [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification)
/// API.
#[derive(Clone, Debug, Default)]
pub struct ListenBucketNotification {
client: Option<Client>,

extra_headers: Option<Multimap>,
extra_query_params: Option<Multimap>,
region: Option<String>,
bucket: String,
prefix: Option<String>,
suffix: Option<String>,
events: Option<Vec<String>>,
}

#[async_trait]
impl S3Api for ListenBucketNotification {
type S3Response = (
ListenBucketNotificationResponse,
Box<dyn Stream<Item = Result<NotificationRecords, Error>> + Unpin + Send>,
);
}

impl ToS3Request for ListenBucketNotification {
fn to_s3request(&self) -> Result<S3Request, Error> {
let client = self.client.as_ref().ok_or(Error::NoClientProvided)?;
if client.is_aws_host() {
return Err(Error::UnsupportedApi(String::from(
"ListenBucketNotification",
)));
}

check_bucket_name(&self.bucket, true)?;

let mut headers = Multimap::new();
if let Some(v) = &self.extra_headers {
merge(&mut headers, v);
}

let mut query_params = Multimap::new();
if let Some(v) = &self.extra_query_params {
merge(&mut query_params, v);
}
if let Some(v) = &self.prefix {
query_params.insert(String::from("prefix"), v.to_string());
}
if let Some(v) = &self.suffix {
query_params.insert(String::from("suffix"), v.to_string());
}
if let Some(v) = &self.events {
for e in v.iter() {
query_params.insert(String::from("events"), e.to_string());
}
} else {
query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*"));
}

let req = S3Request::new(client, Method::GET)
.region(self.region.as_deref())
.bucket(Some(&self.bucket))
.query_params(query_params)
.headers(headers);
Ok(req)
}
}

impl ListenBucketNotification {
pub fn new(bucket_name: &str) -> ListenBucketNotification {
ListenBucketNotification {
bucket: bucket_name.to_owned(),
..Default::default()
}
}

pub fn client(mut self, client: &Client) -> Self {
self.client = Some(client.clone());
self
}

pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
self.extra_headers = extra_headers;
self
}

pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
self.extra_query_params = extra_query_params;
self
}

pub fn region(mut self, region: Option<String>) -> Self {
self.region = region;
self
}

pub fn prefix(mut self, prefix: Option<String>) -> Self {
self.prefix = prefix;
self
}

pub fn suffix(mut self, suffix: Option<String>) -> Self {
self.suffix = suffix;
self
}

pub fn events(mut self, events: Option<Vec<String>>) -> Self {
self.events = events;
self
}
}
4 changes: 4 additions & 0 deletions src/s3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ impl Client {
.build()
}

pub fn is_aws_host(&self) -> bool {
self.base_url.is_aws_host()
}

fn build_headers(
&self,
headers: &mut Multimap,
Expand Down
108 changes: 3 additions & 105 deletions src/s3/client/listen_bucket_notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,7 @@

//! MinIO Extension API for S3 Buckets: ListenBucketNotification
use futures_util::stream;
use http::Method;
use tokio::io::AsyncBufReadExt;
use tokio_stream::{Stream, StreamExt};
use tokio_util::io::StreamReader;

use crate::s3::{
args::ListenBucketNotificationArgs,
error::Error,
response::ListenBucketNotificationResponse,
types::NotificationRecords,
utils::{merge, Multimap},
};
use crate::s3::builders::ListenBucketNotification;

use super::Client;

Expand All @@ -38,97 +26,7 @@ impl Client {
/// returned by the server and the latter is a stream of notification
/// records. In normal operation (when there are no errors), the stream
/// never ends.
pub async fn listen_bucket_notification(
&self,
args: ListenBucketNotificationArgs,
) -> Result<
(
ListenBucketNotificationResponse,
impl Stream<Item = Result<NotificationRecords, Error>>,
),
Error,
> {
if self.base_url.is_aws_host() {
return Err(Error::UnsupportedApi(String::from(
"ListenBucketNotification",
)));
}

let region = self
.get_region(&args.bucket, args.region.as_deref())
.await?;

let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}

let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
if let Some(v) = args.prefix {
query_params.insert(String::from("prefix"), v.to_string());
}
if let Some(v) = args.suffix {
query_params.insert(String::from("suffix"), v.to_string());
}
if let Some(v) = &args.events {
for e in v.iter() {
query_params.insert(String::from("events"), e.to_string());
}
} else {
query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*"));
}

let resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(&args.bucket),
None,
None,
)
.await?;

let header_map = resp.headers().clone();

let body_stream = resp.bytes_stream();
let body_stream = body_stream
.map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)));
let stream_reader = StreamReader::new(body_stream);

let record_stream = Box::pin(stream::unfold(
stream_reader,
move |mut reader| async move {
loop {
let mut line = String::new();
match reader.read_line(&mut line).await {
Ok(n) => {
if n == 0 {
return None;
}
let s = line.trim();
if s.is_empty() {
continue;
}
let records_res: Result<NotificationRecords, Error> =
serde_json::from_str(&s).map_err(|e| e.into());
return Some((records_res, reader));
}
Err(e) => return Some((Err(e.into()), reader)),
}
}
},
));

Ok((
ListenBucketNotificationResponse::new(header_map, &region, &args.bucket),
record_stream,
))
pub fn listen_bucket_notification(&self, bucket: &str) -> ListenBucketNotification {
ListenBucketNotification::new(bucket).client(self)
}
}
24 changes: 2 additions & 22 deletions src/s3/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ use crate::s3::utils::{
};

mod list_objects;
mod listen_bucket_notification;

pub use list_objects::{
ListObjectVersionsResponse, ListObjectsResponse, ListObjectsV1Response, ListObjectsV2Response,
};
pub use listen_bucket_notification::ListenBucketNotificationResponse;

#[derive(Debug)]
/// Response of [list_buckets()](crate::s3::client::Client::list_buckets) API
Expand Down Expand Up @@ -566,28 +568,6 @@ impl SelectObjectContentResponse {
}
}

#[derive(Clone, Debug)]
/// Response of [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API
pub struct ListenBucketNotificationResponse {
pub headers: HeaderMap,
pub region: String,
pub bucket_name: String,
}

impl ListenBucketNotificationResponse {
pub fn new(
headers: HeaderMap,
region: &str,
bucket_name: &str,
) -> ListenBucketNotificationResponse {
ListenBucketNotificationResponse {
headers,
region: region.to_string(),
bucket_name: bucket_name.to_string(),
}
}
}

/// Response of [delete_bucket_encryption()](crate::s3::client::Client::delete_bucket_encryption) API
pub type DeleteBucketEncryptionResponse = BucketResponse;

Expand Down
Loading

0 comments on commit 08d1eb5

Please sign in to comment.