Skip to content

Commit

Permalink
Use tokio_util's StreamReader for simpler impl
Browse files Browse the repository at this point in the history
  • Loading branch information
donatello committed Sep 29, 2023
1 parent ccd1ff4 commit 11b2401
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 124 deletions.
41 changes: 22 additions & 19 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,40 @@ keywords = ["object-storage", "minio", "s3"]
categories = ["api-bindings", "web-programming::http-client"]

[dependencies]
hyper = { version = "0.14.27", features = ["full"] }
tokio = { version = "1.32.0", features = ["full"] }
derivative = "2.2.0"
multimap = "0.9.0"
urlencoding = "2.1.3"
lazy_static = "1.4.0"
regex = "1.9.4"
chrono = "0.4.27"
sha2 = "0.10.7"
async-recursion = "1.0.4"
base64 = "0.21.3"
md5 = "0.7.0"
crc = "3.0.1"
byteorder = "1.4.3"
hmac = "0.12.1"
hex = "0.4.3"
futures-core = "0.3.28"
bytes = "1.4.0"
chrono = "0.4.27"
crc = "3.0.1"
dashmap = "5.5.3"
derivative = "2.2.0"
futures-util = "0.3.28"
xmltree = "0.10.3"
hex = "0.4.3"
hmac = "0.12.1"
http = "0.2.9"
dashmap = "5.5.3"
hyper = { version = "0.14.27", features = ["full"] }
lazy_static = "1.4.0"
md5 = "0.7.0"
multimap = "0.9.0"
os_info = "3.7.0"
rand = "0.8.5"
regex = "1.9.4"
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.105"
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
async-recursion = "1.0.4"
os_info = "3.7.0"
sha2 = "0.10.7"
tokio = { version = "1.32.0", features = ["full"] }
tokio-stream = "0.1.14"
tokio-util = { version = "0.7.8", features = ["io"] }
urlencoding = "2.1.3"
xmltree = "0.10.3"

[dependencies.reqwest]
version = "0.11.20"
features = ["native-tls", "blocking", "rustls-tls", "stream"]

[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }

[[example]]
name = "file-uploader"
127 changes: 22 additions & 105 deletions src/s3/client/listen_bucket_notification.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::collections::VecDeque;

use bytes::{Bytes, BytesMut};
use futures_core::Stream;
use futures_util::stream;
use http::Method;
use tokio::io::AsyncBufReadExt;
use tokio_stream::{Stream, StreamExt};
use tokio_util::io::StreamReader;

use crate::s3::{
args::ListenBucketNotificationArgs,
Expand Down Expand Up @@ -81,112 +80,30 @@ impl Client {

let header_map = resp.headers().clone();

let line = BytesMut::with_capacity(16 * 1024);
let lines: VecDeque<Bytes> = VecDeque::new();
let body_stream = resp.bytes_stream();
let body_stream = body_stream
.map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)));
let stream_reader = StreamReader::new(body_stream);

// We use a stream::unfold to process the response body. The unfold
// state consists of the current (possibly incomplete) line , a deque of
// (complete) lines extracted from the response body and the response
// itself wrapped in an Option (the Option is to indicate if the
// response body has been fully consumed). The unfold operation here
// generates a stream of notification records.
let record_stream = Box::pin(stream::unfold(
(line, lines, Some(resp)),
move |(mut line, mut lines, mut resp_opt)| async move {
stream_reader,
move |mut reader| async move {
loop {
// 1. If we have some lines in the deque, deserialize and return them.
while let Some(v) = lines.pop_front() {
let s = match String::from_utf8((&v).to_vec()) {
Err(e) => return Some((Err(e.into()), (line, lines, resp_opt))),
Ok(s) => {
let s = s.trim().to_string();
// Skip empty strings.
if s.is_empty() {
continue;
}
s
let mut line = String::new();
match reader.read_line(&mut line).await {
Ok(n) => {
if n == 0 {
return None;
}
};
let records_res: Result<NotificationRecords, Error> =
serde_json::from_str(&s).map_err(|e| e.into());
return Some((records_res, (line, lines, resp_opt)));
}

// At this point `lines` is empty. We may have a partial line in
// `line`. We now process the next chunk in the response body.

if resp_opt.is_none() {
if line.len() > 0 {
// Since we have no more chunks to process, we
// consider this as a complete line and deserialize
// it in the next loop iteration.
lines.push_back(line.freeze());
line = BytesMut::with_capacity(16 * 1024);
continue;
}
// We have no more chunks to process, no partial line
// and no more lines to return. So we are done.
return None;
}

// Attempt to read the next chunk of the response.
let next_chunk_res = resp_opt.as_mut().map(|r| r.chunk()).unwrap().await;
let mut done = false;
let chunk = match next_chunk_res {
Err(e) => return Some((Err(e.into()), (line, lines, None))),
Ok(Some(chunk)) => chunk,
Ok(None) => {
done = true;
Bytes::new()
}
};

// Now we process the chunk. The `.split()` splits the chunk
// around each newline character.
//
// For e.g. "\nab\nc\n\n" becomes ["", "ab", "c", "", ""].
//
// This means that a newline was found in the chunk only
// when `.split()` returns at least 2 elements. The main
// tricky situation is when a line is split across chunks.
// We use the length of `lines_in_chunk` to determine if
// this is the case.
let lines_in_chunk = chunk.split(|&v| v == b'\n').collect::<Vec<_>>();

if lines_in_chunk.len() == 1 {
// No newline found in the chunk. So we just append the
// chunk to the current line and continue to the next
// chunk.
line.extend_from_slice(&chunk);
continue;
}

// At least one newline was found in the chunk.
for (i, chunk_line) in lines_in_chunk.iter().enumerate() {
if i == 0 {
// The first split component in the chunk completes
// the line.
line.extend_from_slice(chunk_line);
lines.push_back(line.freeze());
line = BytesMut::with_capacity(16 * 1024);
continue;
}
if i == lines_in_chunk.len() - 1 {
// The last split component in the chunk is a
// partial line. We append it to the current line
// (which will be empty because we just re-created
// it).
line.extend_from_slice(chunk_line);
continue;
let s = line.trim();
if s.is_empty() {
continue;
}
let records_res: Result<NotificationRecords, Error> =
serde_json::from_str(&s).map_err(|e| e.into());
return Some((records_res, reader));
}

lines.push_back(Bytes::copy_from_slice(chunk_line));
}

if done {
lines.push_back(line.freeze());
line = BytesMut::with_capacity(16 * 1024);
resp_opt = None;
Err(e) => return Some((Err(e.into()), reader)),
}
}
},
Expand Down

0 comments on commit 11b2401

Please sign in to comment.