Skip to content

Commit

Permalink
Modify list_bucket API to use iteration style
Browse files Browse the repository at this point in the history
- Add doc on list_bucket_notification API.
  • Loading branch information
donatello committed Sep 29, 2023
1 parent c63d3f9 commit ccd1ff4
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 127 deletions.
48 changes: 16 additions & 32 deletions src/s3/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::s3::error::Error;
use crate::s3::signer::post_presign_v4;
use crate::s3::sse::{Sse, SseCustomerKey};
use crate::s3::types::{
DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, NotificationRecords,
ObjectLockConfig, Part, ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig,
DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part,
ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig,
};
use crate::s3::utils::{
b64encode, check_bucket_name, merge, to_amz_date, to_http_header_value, to_iso8601utc,
Expand Down Expand Up @@ -1262,54 +1262,38 @@ impl<'a> SelectObjectContentArgs<'a> {
}

/// Argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API
pub struct ListenBucketNotificationArgs<'a> {
pub extra_headers: Option<&'a Multimap>,
pub extra_query_params: Option<&'a Multimap>,
pub region: Option<&'a str>,
pub bucket: &'a str,
pub prefix: Option<&'a str>,
pub suffix: Option<&'a str>,
pub events: Option<Vec<&'a str>>,
pub event_fn: &'a (dyn Fn(NotificationRecords) -> bool + Send + Sync),
#[derive(Clone, Debug)]
pub struct ListenBucketNotificationArgs {
pub extra_headers: Option<Multimap>,
pub extra_query_params: Option<Multimap>,
pub region: Option<String>,
pub bucket: String,
pub prefix: Option<String>,
pub suffix: Option<String>,
pub events: Option<Vec<String>>,
}

impl<'a> ListenBucketNotificationArgs<'a> {
impl ListenBucketNotificationArgs {
/// Returns argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API with given bucket name and callback function for results.
///
/// # Examples
///
/// ```
/// use minio::s3::args::*;
/// use minio::s3::types::NotificationRecords;
/// let event_fn = |event: NotificationRecords| {
/// for record in event.records.iter() {
/// if let Some(s3) = &record.s3 {
/// if let Some(object) = &s3.object {
/// if let Some(key) = &object.key {
/// println!("{:?} {:?}", record.event_name, key);
/// }
/// }
/// }
/// }
/// true
/// };
/// let args = ListenBucketNotificationArgs::new("my-bucket", &event_fn).unwrap();
///
/// let args = ListenBucketNotificationArgs::new("my-bucket").unwrap();
/// ```
pub fn new(
bucket_name: &'a str,
event_fn: &'a (dyn Fn(NotificationRecords) -> bool + Send + Sync),
) -> Result<ListenBucketNotificationArgs<'a>, Error> {
pub fn new(bucket_name: &str) -> Result<ListenBucketNotificationArgs, Error> {
check_bucket_name(bucket_name, true)?;

Ok(ListenBucketNotificationArgs {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name,
bucket: bucket_name.to_owned(),
prefix: None,
suffix: None,
events: None,
event_fn,
})
}
}
Expand Down
100 changes: 7 additions & 93 deletions src/s3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::s3::response::*;
use crate::s3::signer::{presign_v4, sign_v4_s3};
use crate::s3::sse::SseCustomerKey;
use crate::s3::types::{
Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig,
NotificationRecords, ObjectLockConfig, Part, ReplicationConfig, RetentionMode, SseConfig,
Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig,
Part, ReplicationConfig, RetentionMode, SseConfig,
};
use crate::s3::utils::{
from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash,
Expand All @@ -36,14 +36,18 @@ use dashmap::DashMap;
use hyper::http::Method;
use os_info;
use reqwest::header::HeaderMap;
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use xmltree::Element;

mod listen_bucket_notification;

pub use listen_bucket_notification::*;

fn url_decode(
encoding_type: &Option<String>,
prefix: Option<String>,
Expand Down Expand Up @@ -2429,96 +2433,6 @@ impl Client {
})
}

pub async fn listen_bucket_notification(
&self,
args: &ListenBucketNotificationArgs<'_>,
) -> Result<ListenBucketNotificationResponse, Error> {
if self.base_url.is_aws_host() {
return Err(Error::UnsupportedApi(String::from(
"ListenBucketNotification",
)));
}

let region = self.get_region(args.bucket, args.region).await?;

let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}

let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
if let Some(v) = args.prefix {
query_params.insert(String::from("prefix"), v.to_string());
}
if let Some(v) = args.suffix {
query_params.insert(String::from("suffix"), v.to_string());
}
if let Some(v) = &args.events {
for e in v.iter() {
query_params.insert(String::from("events"), e.to_string());
}
} else {
query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*"));
}

let mut resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(args.bucket),
None,
None,
)
.await?;

let header_map = resp.headers().clone();

let mut done = false;
let mut buf = VecDeque::<u8>::new();
while !done {
let chunk = match resp.chunk().await? {
Some(v) => v,
None => {
done = true;
Bytes::new()
}
};
buf.extend(chunk.iter().copied());

while !done {
match buf.iter().position(|&v| v == b'\n') {
Some(i) => {
let mut data = vec![0_u8; i + 1];
#[allow(clippy::needless_range_loop)]
for j in 0..=i {
data[j] = buf.pop_front().ok_or(Error::InsufficientData(i, j))?;
}
let mut line = String::from_utf8(data)?;
line = line.trim().to_string();
if !line.is_empty() {
let records: NotificationRecords = serde_json::from_str(&line)?;
done = !(args.event_fn)(records);
}
}
None => break,
};
}
}

Ok(ListenBucketNotificationResponse::new(
header_map,
&region,
args.bucket,
))
}

/// Executes [ListObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html) S3 API
pub async fn list_objects_v1(
&self,
Expand Down
Loading

0 comments on commit ccd1ff4

Please sign in to comment.