Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming support to listen_bucket_notification() API #55

Merged
merged 2 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,40 @@ keywords = ["object-storage", "minio", "s3"]
categories = ["api-bindings", "web-programming::http-client"]

[dependencies]
hyper = { version = "0.14.27", features = ["full"] }
tokio = { version = "1.32.0", features = ["full"] }
derivative = "2.2.0"
multimap = "0.9.0"
urlencoding = "2.1.3"
lazy_static = "1.4.0"
regex = "1.9.4"
chrono = "0.4.27"
sha2 = "0.10.7"
async-recursion = "1.0.4"
base64 = "0.21.3"
md5 = "0.7.0"
crc = "3.0.1"
byteorder = "1.4.3"
hmac = "0.12.1"
hex = "0.4.3"
futures-core = "0.3.28"
bytes = "1.4.0"
chrono = "0.4.27"
crc = "3.0.1"
dashmap = "5.5.3"
derivative = "2.2.0"
futures-util = "0.3.28"
xmltree = "0.10.3"
hex = "0.4.3"
hmac = "0.12.1"
http = "0.2.9"
dashmap = "5.5.3"
hyper = { version = "0.14.27", features = ["full"] }
lazy_static = "1.4.0"
md5 = "0.7.0"
multimap = "0.9.0"
os_info = "3.7.0"
rand = "0.8.5"
regex = "1.9.4"
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.105"
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
async-recursion = "1.0.4"
os_info = "3.7.0"
sha2 = "0.10.7"
tokio = { version = "1.32.0", features = ["full"] }
tokio-stream = "0.1.14"
tokio-util = { version = "0.7.8", features = ["io"] }
urlencoding = "2.1.3"
xmltree = "0.10.3"

[dependencies.reqwest]
version = "0.11.20"
features = ["native-tls", "blocking", "rustls-tls", "stream"]

[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }

[[example]]
name = "file-uploader"
48 changes: 16 additions & 32 deletions src/s3/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::s3::error::Error;
use crate::s3::signer::post_presign_v4;
use crate::s3::sse::{Sse, SseCustomerKey};
use crate::s3::types::{
DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, NotificationRecords,
ObjectLockConfig, Part, ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig,
DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part,
ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig,
};
use crate::s3::utils::{
b64encode, check_bucket_name, merge, to_amz_date, to_http_header_value, to_iso8601utc,
Expand Down Expand Up @@ -1262,54 +1262,38 @@ impl<'a> SelectObjectContentArgs<'a> {
}

/// Argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API
pub struct ListenBucketNotificationArgs<'a> {
pub extra_headers: Option<&'a Multimap>,
pub extra_query_params: Option<&'a Multimap>,
pub region: Option<&'a str>,
pub bucket: &'a str,
pub prefix: Option<&'a str>,
pub suffix: Option<&'a str>,
pub events: Option<Vec<&'a str>>,
pub event_fn: &'a (dyn Fn(NotificationRecords) -> bool + Send + Sync),
#[derive(Clone, Debug)]
pub struct ListenBucketNotificationArgs {
pub extra_headers: Option<Multimap>,
pub extra_query_params: Option<Multimap>,
pub region: Option<String>,
pub bucket: String,
pub prefix: Option<String>,
pub suffix: Option<String>,
pub events: Option<Vec<String>>,
}

impl<'a> ListenBucketNotificationArgs<'a> {
impl ListenBucketNotificationArgs {
/// Returns argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API with given bucket name and callback function for results.
///
/// # Examples
///
/// ```
/// use minio::s3::args::*;
/// use minio::s3::types::NotificationRecords;
/// let event_fn = |event: NotificationRecords| {
/// for record in event.records.iter() {
/// if let Some(s3) = &record.s3 {
/// if let Some(object) = &s3.object {
/// if let Some(key) = &object.key {
/// println!("{:?} {:?}", record.event_name, key);
/// }
/// }
/// }
/// }
/// true
/// };
/// let args = ListenBucketNotificationArgs::new("my-bucket", &event_fn).unwrap();
///
/// let args = ListenBucketNotificationArgs::new("my-bucket").unwrap();
/// ```
pub fn new(
bucket_name: &'a str,
event_fn: &'a (dyn Fn(NotificationRecords) -> bool + Send + Sync),
) -> Result<ListenBucketNotificationArgs<'a>, Error> {
pub fn new(bucket_name: &str) -> Result<ListenBucketNotificationArgs, Error> {
check_bucket_name(bucket_name, true)?;

Ok(ListenBucketNotificationArgs {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name,
bucket: bucket_name.to_owned(),
prefix: None,
suffix: None,
events: None,
event_fn,
})
}
}
Expand Down
100 changes: 7 additions & 93 deletions src/s3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::s3::response::*;
use crate::s3::signer::{presign_v4, sign_v4_s3};
use crate::s3::sse::SseCustomerKey;
use crate::s3::types::{
Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig,
NotificationRecords, ObjectLockConfig, Part, ReplicationConfig, RetentionMode, SseConfig,
Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig,
Part, ReplicationConfig, RetentionMode, SseConfig,
};
use crate::s3::utils::{
from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash,
Expand All @@ -36,14 +36,18 @@ use dashmap::DashMap;
use hyper::http::Method;
use os_info;
use reqwest::header::HeaderMap;
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use xmltree::Element;

mod listen_bucket_notification;

pub use listen_bucket_notification::*;

fn url_decode(
encoding_type: &Option<String>,
prefix: Option<String>,
Expand Down Expand Up @@ -2429,96 +2433,6 @@ impl Client {
})
}

pub async fn listen_bucket_notification(
&self,
args: &ListenBucketNotificationArgs<'_>,
) -> Result<ListenBucketNotificationResponse, Error> {
if self.base_url.is_aws_host() {
return Err(Error::UnsupportedApi(String::from(
"ListenBucketNotification",
)));
}

let region = self.get_region(args.bucket, args.region).await?;

let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}

let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
if let Some(v) = args.prefix {
query_params.insert(String::from("prefix"), v.to_string());
}
if let Some(v) = args.suffix {
query_params.insert(String::from("suffix"), v.to_string());
}
if let Some(v) = &args.events {
for e in v.iter() {
query_params.insert(String::from("events"), e.to_string());
}
} else {
query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*"));
}

let mut resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(args.bucket),
None,
None,
)
.await?;

let header_map = resp.headers().clone();

let mut done = false;
let mut buf = VecDeque::<u8>::new();
while !done {
let chunk = match resp.chunk().await? {
Some(v) => v,
None => {
done = true;
Bytes::new()
}
};
buf.extend(chunk.iter().copied());

while !done {
match buf.iter().position(|&v| v == b'\n') {
Some(i) => {
let mut data = vec![0_u8; i + 1];
#[allow(clippy::needless_range_loop)]
for j in 0..=i {
data[j] = buf.pop_front().ok_or(Error::InsufficientData(i, j))?;
}
let mut line = String::from_utf8(data)?;
line = line.trim().to_string();
if !line.is_empty() {
let records: NotificationRecords = serde_json::from_str(&line)?;
done = !(args.event_fn)(records);
}
}
None => break,
};
}
}

Ok(ListenBucketNotificationResponse::new(
header_map,
&region,
args.bucket,
))
}

/// Executes [ListObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html) S3 API
pub async fn list_objects_v1(
&self,
Expand Down
Loading