From 18ddd6868cd6afe0481a9c5e6cfae820022b0ad9 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Fri, 29 Sep 2023 11:59:04 -0700 Subject: [PATCH] Use tokio_util's StreamReader for simpler impl --- Cargo.toml | 41 ++++--- src/s3/client/listen_bucket_notification.rs | 127 ++++---------------- 2 files changed, 44 insertions(+), 124 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e60073f..c110e9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,37 +11,40 @@ keywords = ["object-storage", "minio", "s3"] categories = ["api-bindings", "web-programming::http-client"] [dependencies] -hyper = { version = "0.14.27", features = ["full"] } -tokio = { version = "1.32.0", features = ["full"] } -derivative = "2.2.0" -multimap = "0.9.0" -urlencoding = "2.1.3" -lazy_static = "1.4.0" -regex = "1.9.4" -chrono = "0.4.27" -sha2 = "0.10.7" +async-recursion = "1.0.4" base64 = "0.21.3" -md5 = "0.7.0" -crc = "3.0.1" byteorder = "1.4.3" -hmac = "0.12.1" -hex = "0.4.3" -futures-core = "0.3.28" bytes = "1.4.0" +chrono = "0.4.27" +crc = "3.0.1" +dashmap = "5.5.3" +derivative = "2.2.0" futures-util = "0.3.28" -xmltree = "0.10.3" +hex = "0.4.3" +hmac = "0.12.1" http = "0.2.9" -dashmap = "5.5.3" +hyper = { version = "0.14.27", features = ["full"] } +lazy_static = "1.4.0" +md5 = "0.7.0" +multimap = "0.9.0" +os_info = "3.7.0" rand = "0.8.5" +regex = "1.9.4" serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.105" -async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } -async-recursion = "1.0.4" -os_info = "3.7.0" +sha2 = "0.10.7" +tokio = { version = "1.32.0", features = ["full"] } +tokio-stream = "0.1.14" +tokio-util = { version = "0.7.9", features = ["io"] } +urlencoding = "2.1.3" +xmltree = "0.10.3" [dependencies.reqwest] version = "0.11.20" features = ["native-tls", "blocking", "rustls-tls", "stream"] +[dev-dependencies] +async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } + [[example]] name = "file-uploader" diff --git a/src/s3/client/listen_bucket_notification.rs b/src/s3/client/listen_bucket_notification.rs index 9ec06a9..2df7524 100644 --- a/src/s3/client/listen_bucket_notification.rs +++ b/src/s3/client/listen_bucket_notification.rs @@ -1,9 +1,8 @@ -use std::collections::VecDeque; - -use bytes::{Bytes, BytesMut}; -use futures_core::Stream; use futures_util::stream; use http::Method; +use tokio::io::AsyncBufReadExt; +use tokio_stream::{Stream, StreamExt}; +use tokio_util::io::StreamReader; use crate::s3::{ args::ListenBucketNotificationArgs, @@ -81,112 +80,30 @@ impl Client { let header_map = resp.headers().clone(); - let line = BytesMut::with_capacity(16 * 1024); - let lines: VecDeque = VecDeque::new(); + let body_stream = resp.bytes_stream(); + let body_stream = body_stream + .map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))); + let stream_reader = StreamReader::new(body_stream); - // We use a stream::unfold to process the response body. The unfold - // state consists of the current (possibly incomplete) line , a deque of - // (complete) lines extracted from the response body and the response - // itself wrapped in an Option (the Option is to indicate if the - // response body has been fully consumed). The unfold operation here - // generates a stream of notification records. let record_stream = Box::pin(stream::unfold( - (line, lines, Some(resp)), - move |(mut line, mut lines, mut resp_opt)| async move { + stream_reader, + move |mut reader| async move { loop { - // 1. If we have some lines in the deque, deserialize and return them. - while let Some(v) = lines.pop_front() { - let s = match String::from_utf8((&v).to_vec()) { - Err(e) => return Some((Err(e.into()), (line, lines, resp_opt))), - Ok(s) => { - let s = s.trim().to_string(); - // Skip empty strings. - if s.is_empty() { - continue; - } - s + let mut line = String::new(); + match reader.read_line(&mut line).await { + Ok(n) => { + if n == 0 { + return None; } - }; - let records_res: Result = - serde_json::from_str(&s).map_err(|e| e.into()); - return Some((records_res, (line, lines, resp_opt))); - } - - // At this point `lines` is empty. We may have a partial line in - // `line`. We now process the next chunk in the response body. - - if resp_opt.is_none() { - if line.len() > 0 { - // Since we have no more chunks to process, we - // consider this as a complete line and deserialize - // it in the next loop iteration. - lines.push_back(line.freeze()); - line = BytesMut::with_capacity(16 * 1024); - continue; - } - // We have no more chunks to process, no partial line - // and no more lines to return. So we are done. - return None; - } - - // Attempt to read the next chunk of the response. - let next_chunk_res = resp_opt.as_mut().map(|r| r.chunk()).unwrap().await; - let mut done = false; - let chunk = match next_chunk_res { - Err(e) => return Some((Err(e.into()), (line, lines, None))), - Ok(Some(chunk)) => chunk, - Ok(None) => { - done = true; - Bytes::new() - } - }; - - // Now we process the chunk. The `.split()` splits the chunk - // around each newline character. - // - // For e.g. "\nab\nc\n\n" becomes ["", "ab", "c", "", ""]. - // - // This means that a newline was found in the chunk only - // when `.split()` returns at least 2 elements. The main - // tricky situation is when a line is split across chunks. - // We use the length of `lines_in_chunk` to determine if - // this is the case. - let lines_in_chunk = chunk.split(|&v| v == b'\n').collect::>(); - - if lines_in_chunk.len() == 1 { - // No newline found in the chunk. So we just append the - // chunk to the current line and continue to the next - // chunk. - line.extend_from_slice(&chunk); - continue; - } - - // At least one newline was found in the chunk. - for (i, chunk_line) in lines_in_chunk.iter().enumerate() { - if i == 0 { - // The first split component in the chunk completes - // the line. - line.extend_from_slice(chunk_line); - lines.push_back(line.freeze()); - line = BytesMut::with_capacity(16 * 1024); - continue; - } - if i == lines_in_chunk.len() - 1 { - // The last split component in the chunk is a - // partial line. We append it to the current line - // (which will be empty because we just re-created - // it). - line.extend_from_slice(chunk_line); - continue; + let s = line.trim(); + if s.is_empty() { + continue; + } + let records_res: Result = + serde_json::from_str(&s).map_err(|e| e.into()); + return Some((records_res, reader)); } - - lines.push_back(Bytes::copy_from_slice(chunk_line)); - } - - if done { - lines.push_back(line.freeze()); - line = BytesMut::with_capacity(16 * 1024); - resp_opt = None; + Err(e) => return Some((Err(e.into()), reader)), } } },