Skip to content

Commit

Permalink
Clippy fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
donatello committed Apr 3, 2024
1 parent 54b671e commit 8b35d13
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 55 deletions.
6 changes: 2 additions & 4 deletions src/s3/builders/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,8 @@ impl ToS3Request for GetObject {

let client = self.client.clone().ok_or(Error::NoClientProvided)?;

if let Some(_) = &self.ssec {
if !client.is_secure() {
return Err(Error::SseTlsRequired(None));
}
if self.ssec.is_some() && !client.is_secure() {
return Err(Error::SseTlsRequired(None));
}

let mut headers = Multimap::new();
Expand Down
2 changes: 1 addition & 1 deletion src/s3/builders/list_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl ToStream for ListObjectsV1 {
}

impl ToS3Request for ListObjectsV1 {
fn to_s3request<'a>(&'a self) -> Result<S3Request<'a>, Error> {
fn to_s3request(&self) -> Result<S3Request<'_>, Error> {
check_bucket_name(&self.bucket, true)?;

let mut headers = Multimap::new();
Expand Down
6 changes: 5 additions & 1 deletion src/s3/builders/object_content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ impl SegmentedBytes {
self.total_size
}

pub fn is_empty(&self) -> bool {
self.total_size == 0
}

pub fn append(&mut self, bytes: Bytes) {
let last_segment = self.segments.last_mut();
if let Some(last_segment) = last_segment {
Expand Down Expand Up @@ -201,7 +205,7 @@ impl SegmentedBytes {
let mut buf = BytesMut::with_capacity(self.total_size);
for segment in &self.segments {
for bytes in segment {
buf.extend_from_slice(&bytes);
buf.extend_from_slice(bytes);
}
}
buf.freeze()
Expand Down
72 changes: 34 additions & 38 deletions src/s3/builders/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ impl ToS3Request for UploadPart {
.object(Some(&self.object))
.query_params(query_params)
.headers(headers)
.body(Some(self.data.clone().into()));
.body(Some(self.data.clone()));

Ok(req)
}
Expand Down Expand Up @@ -796,17 +796,35 @@ impl PutObjectContent {
.region(self.region.clone());

let create_mpu_resp = create_mpu.send().await?;
let upload_id = create_mpu_resp.upload_id;

// This ensures that if we fail to complete the upload due to an error,
// we abort the upload.
let mut droppable_upload_id = DroppableUploadId {
client: client.clone(),
bucket: self.bucket.clone(),
object: self.object.clone(),
upload_id: Some(upload_id.clone()),
};

let res = self
.send_mpu(
psize,
expected_parts,
create_mpu_resp.upload_id.clone(),
object_size,
seg_bytes,
)
.await;
if res.is_err() {
// If we failed to complete the multipart upload, we should abort it.
let _ =
AbortMultipartUpload::new(&self.bucket, &self.object, &create_mpu_resp.upload_id)
.client(&client)
.send()
.await;
}
res
}

async fn send_mpu(
&mut self,
psize: u64,
expected_parts: Option<u16>,
upload_id: String,
object_size: Option<u64>,
seg_bytes: SegmentedBytes,
) -> Result<PutObjectResponse2, Error> {
let mut done = false;
let mut part_number = 0;
let mut parts: Vec<Part> = if let Some(pc) = expected_parts {
Expand Down Expand Up @@ -860,12 +878,7 @@ impl PutObjectContent {

// Complete the multipart upload.
let complete_mpu = self.to_complete_multipart_upload(&upload_id, parts);
complete_mpu.send().await.map(|v| {
// We are done with the upload. Clear the upload ID to prevent a
// useless abort multipart upload call.
droppable_upload_id.upload_id = None;
v
})
complete_mpu.send().await
}
}

Expand Down Expand Up @@ -931,23 +944,6 @@ impl PutObjectContent {
}
}

struct DroppableUploadId {
client: Client,
bucket: String,
object: String,
upload_id: Option<String>,
}

impl Drop for DroppableUploadId {
fn drop(&mut self) {
if let Some(upload_id) = &self.upload_id {
let _ = AbortMultipartUpload::new(&self.bucket, &self.object, &upload_id)
.client(&self.client)
.send();
}
}
}

pub const MIN_PART_SIZE: u64 = 5 * 1024 * 1024; // 5 MiB
pub const MAX_PART_SIZE: u64 = 1024 * MIN_PART_SIZE; // 5 GiB
pub const MAX_OBJECT_SIZE: u64 = 1024 * MAX_PART_SIZE; // 5 TiB
Expand Down Expand Up @@ -977,8 +973,8 @@ fn calc_part_info(
}

match (object_size, part_size) {
(None, None) => return Err(Error::MissingPartSize),
(None, Some(part_size)) => return Ok((part_size, None)),
(None, None) => Err(Error::MissingPartSize),
(None, Some(part_size)) => Ok((part_size, None)),
(Some(object_size), None) => {
let mut psize: u64 = (object_size as f64 / MAX_MULTIPART_COUNT as f64).ceil() as u64;

Expand All @@ -994,7 +990,7 @@ fn calc_part_info(
1
};

return Ok((psize, Some(part_count)));
Ok((psize, Some(part_count)))
}
(Some(object_size), Some(part_size)) => {
let part_count = (object_size as f64 / part_size as f64).ceil() as u16;
Expand All @@ -1006,7 +1002,7 @@ fn calc_part_info(
));
}

return Ok((part_size, Some(part_count)));
Ok((part_size, Some(part_count)))
}
}
}
9 changes: 5 additions & 4 deletions src/s3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ impl ClientBuilder {
/// Creates a builder given a base URL for the MinIO service or other AWS S3
/// compatible object storage service.
pub fn new(base_url: BaseUrl) -> Self {
let mut c = ClientBuilder::default();
c.base_url = base_url;
c
Self {
base_url,
..Default::default()
}
}

/// Set the credential provider. If not set anonymous access is used.
Expand Down Expand Up @@ -413,7 +414,7 @@ impl Client {
let url =
self.base_url
.build_url(method, region, query_params, bucket_name, object_name)?;
self.build_headers(headers, query_params, region, &url, &method, body);
self.build_headers(headers, query_params, region, &url, method, body);

let mut req = self.client.request(method.clone(), url.to_string());

Expand Down
4 changes: 2 additions & 2 deletions src/s3/response/list_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ fn parse_list_objects_contents(
} else {
vec![]
};
let mut merged = MergeXmlElements::new(&children1, &children2);
while let Some(content) = merged.next() {
let merged = MergeXmlElements::new(&children1, &children2);
for content in merged {
let etype = encoding_type.as_ref().cloned();
let key = url_decode(&etype, Some(content.get_child_text_or_error("Key")?))?.unwrap();
let last_modified = Some(from_iso8601utc(
Expand Down
2 changes: 1 addition & 1 deletion src/s3/response/listen_bucket_notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl FromS3Response
continue;
}
let records_res: Result<NotificationRecords, Error> =
serde_json::from_str(&s).map_err(|e| e.into());
serde_json::from_str(s).map_err(|e| e.into());
return Some((records_res, reader));
}
Err(e) => return Some((Err(e.into()), reader)),
Expand Down
8 changes: 4 additions & 4 deletions src/s3/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ pub mod aws_date_format {
where
S: Serializer,
{
serializer.serialize_str(&to_iso8601utc(date.clone()))
serializer.serialize_str(&to_iso8601utc(*date))
}

pub fn deserialize<'de, D>(deserializer: D) -> Result<UtcTime, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Ok(from_iso8601utc(&s).map_err(serde::de::Error::custom)?)
from_iso8601utc(&s).map_err(serde::de::Error::custom)
}
}

Expand Down Expand Up @@ -501,7 +501,7 @@ pub mod xml {
fn get_first(&self, tag: &str) -> Option<usize> {
let tag: String = tag.to_string();
let is = self.children.get(&tag)?;
is.first().map(|v| *v)
is.first().copied()
}

fn get(&self, tag: &str) -> Option<&Vec<usize>> {
Expand Down Expand Up @@ -575,7 +575,7 @@ pub mod xml {
self.child_element_index
.get(tag)
.unwrap_or(&vec![])
.into_iter()
.iter()
.map(|i| (*i, self.inner.children[*i].as_element().unwrap().into()))
.collect()
}
Expand Down

0 comments on commit 8b35d13

Please sign in to comment.