Skip to content

Commit

Permalink
s3s-fs: fix list_objects_v2 on Windows (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
amunra authored Nov 8, 2023
1 parent 8ffaeff commit 54124ba
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 9 deletions.
33 changes: 29 additions & 4 deletions crates/s3s-fs/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::collections::VecDeque;
use std::io;
use std::ops::Neg;
use std::ops::Not;
use std::path::PathBuf;
use std::path::{Path, PathBuf};

use tokio::fs;
use tokio::io::AsyncSeekExt;
Expand All @@ -23,9 +23,32 @@ use futures::TryStreamExt;
use md5::{Digest, Md5};
use numeric_cast::NumericCast;
use rust_utils::default::default;
use std::path::Component;
use std::string::ToString;
use tracing::debug;
use uuid::Uuid;

fn normalize_path(path: &Path, delimiter: &str) -> Option<String> {
let mut normalized = String::new();
let mut first = true;
for component in path.components() {
match component {
Component::RootDir | Component::CurDir | Component::ParentDir | Component::Prefix(_) => {
return None;
}
Component::Normal(name) => {
let name = name.to_str()?;
if !first {
normalized.push_str(delimiter);
}
normalized.push_str(name);
first = false;
}
}
}
Some(normalized)
}

#[async_trait::async_trait]
impl S3 for FileSystem {
#[tracing::instrument]
Expand Down Expand Up @@ -335,10 +358,12 @@ impl S3 for FileSystem {
} else {
let file_path = entry.path();
let key = try_!(file_path.strip_prefix(&path));
let Some(key) = key.to_str() else { continue };
let delimiter = input.delimiter.as_ref().map_or("/", |d| d.as_str());
let Some(key_str) = normalize_path(key, delimiter) else { continue };

if let Some(ref prefix) = input.prefix {
if !key.starts_with(prefix) {
let prefix_path: PathBuf = prefix.split(delimiter).collect();
if !key.starts_with(prefix_path) {
continue;
}
}
Expand All @@ -348,7 +373,7 @@ impl S3 for FileSystem {
let size = metadata.len();

let object = Object {
key: Some(key.to_owned()),
key: Some(key_str),
last_modified: Some(last_modified),
size: try_!(i64::try_from(size)),
..Default::default()
Expand Down
82 changes: 77 additions & 5 deletions crates/s3s-fs/tests/it_aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,90 @@ async fn delete_bucket(c: &Client, bucket: &str) -> Result<()> {
Ok(())
}

macro_rules! log_and_unwrap {
($result:expr) => {
match $result {
Ok(ans) => {
debug!(?ans);
ans
}
Err(err) => {
error!(?err);
return Err(err.into());
}
}
};
}

#[tokio::test]
#[tracing::instrument]
async fn test_list_buckets() -> Result<()> {
let c = Client::new(config());
let result = c.list_buckets().send().await;
let response1 = log_and_unwrap!(c.list_buckets().send().await);
assert!(response1.buckets().is_some());

let bucket1 = format!("test-list-buckets-1-{}", Uuid::new_v4());
let bucket1_str = bucket1.as_str();
let bucket2 = format!("test-list-buckets-2-{}", Uuid::new_v4());
let bucket2_str = bucket2.as_str();

create_bucket(&c, bucket1_str).await?;
create_bucket(&c, bucket2_str).await?;

let response2 = log_and_unwrap!(c.list_buckets().send().await);
assert!(response2.buckets().is_some());
let bucket_names: Vec<_> = response2
.buckets()
.unwrap()
.iter()
.filter_map(|bucket| bucket.name())
.collect();
assert!(bucket_names.contains(&bucket1_str));
assert!(bucket_names.contains(&bucket2_str));

Ok(())
}

match result {
Ok(ref ans) => debug!(?ans),
Err(ref err) => error!(?err),
#[tokio::test]
#[tracing::instrument]
async fn test_list_objects_v2() -> Result<()> {
let c = Client::new(config());
let bucket = format!("test-list-objects-v2-{}", Uuid::new_v4());
let bucket_str = bucket.as_str();
create_bucket(&c, bucket_str).await?;

let test_prefix = "/this/is/a/test/";
let key1 = "this/is/a/test/path/file1.txt";
let key2 = "this/is/a/test/path/file2.txt";
{
let content = "hello world\nनमस्ते दुनिया\n";
let crc32c = base64_simd::STANDARD.encode_to_string(crc32c::crc32c(content.as_bytes()).to_be_bytes());
c.put_object()
.bucket(bucket_str)
.key(key1)
.body(ByteStream::from_static(content.as_bytes()))
.checksum_crc32_c(crc32c.as_str())
.send()
.await?;
c.put_object()
.bucket(bucket_str)
.key(key2)
.body(ByteStream::from_static(content.as_bytes()))
.checksum_crc32_c(crc32c.as_str())
.send()
.await?;
}

result?;
let result = c.list_objects_v2().bucket(bucket_str).prefix(test_prefix).send().await;

let response = log_and_unwrap!(result);

assert!(response.contents().is_some());
let contents: Vec<_> = response.contents().unwrap().iter().filter_map(|obj| obj.key()).collect();
assert!(!contents.is_empty());
assert!(contents.contains(&key1));
assert!(contents.contains(&key2));

Ok(())
}

Expand Down

0 comments on commit 54124ba

Please sign in to comment.