Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small improvements to object store #52

Open
wants to merge 7 commits into
base: object-store-0.10.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 0 additions & 46 deletions .github/workflows/arrow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,49 +151,3 @@ jobs:
run: cargo build -p arrow --no-default-features --features=json,csv,ipc,ffi --target wasm32-unknown-unknown
- name: Build wasm32-wasi
run: cargo build -p arrow --no-default-features --features=json,csv,ipc,ffi --target wasm32-wasi

clippy:
name: Clippy
runs-on: ubuntu-latest
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
- name: Setup Clippy
run: rustup component add clippy
- name: Clippy arrow-buffer with all features
run: cargo clippy -p arrow-buffer --all-targets --all-features -- -D warnings
- name: Clippy arrow-data with all features
run: cargo clippy -p arrow-data --all-targets --all-features -- -D warnings
- name: Clippy arrow-schema with all features
run: cargo clippy -p arrow-schema --all-targets --all-features -- -D warnings
- name: Clippy arrow-array with all features
run: cargo clippy -p arrow-array --all-targets --all-features -- -D warnings
- name: Clippy arrow-select with all features
run: cargo clippy -p arrow-select --all-targets --all-features -- -D warnings
- name: Clippy arrow-cast with all features
run: cargo clippy -p arrow-cast --all-targets --all-features -- -D warnings
- name: Clippy arrow-ipc with all features
run: cargo clippy -p arrow-ipc --all-targets --all-features -- -D warnings
- name: Clippy arrow-csv with all features
run: cargo clippy -p arrow-csv --all-targets --all-features -- -D warnings
- name: Clippy arrow-json with all features
run: cargo clippy -p arrow-json --all-targets --all-features -- -D warnings
- name: Clippy arrow-avro with all features
run: cargo clippy -p arrow-avro --all-targets --all-features -- -D warnings
- name: Clippy arrow-string with all features
run: cargo clippy -p arrow-string --all-targets --all-features -- -D warnings
- name: Clippy arrow-ord with all features
run: cargo clippy -p arrow-ord --all-targets --all-features -- -D warnings
- name: Clippy arrow-arith with all features
run: cargo clippy -p arrow-arith --all-targets --all-features -- -D warnings
- name: Clippy arrow-row with all features
run: cargo clippy -p arrow-row --all-targets --all-features -- -D warnings
- name: Clippy arrow with all features
run: cargo clippy -p arrow --all-features --all-targets -- -D warnings
- name: Clippy arrow-integration-test with all features
run: cargo clippy -p arrow-integration-test --all-targets --all-features -- -D warnings
- name: Clippy arrow-integration-testing with all features
run: cargo clippy -p arrow-integration-testing --all-targets --all-features -- -D warnings
14 changes: 0 additions & 14 deletions .github/workflows/arrow_flight.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,3 @@ jobs:
run: ./arrow-flight/regen.sh
- name: Verify workspace clean (if this fails, run ./arrow-flight/regen.sh and check in results)
run: git diff --exit-code

clippy:
name: Clippy
runs-on: ubuntu-latest
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
- name: Setup Clippy
run: rustup component add clippy
- name: Run clippy
run: cargo clippy -p arrow-flight --all-targets --all-features -- -D warnings
2 changes: 1 addition & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ jobs:
run: |
rustup toolchain install ${{ matrix.rust }}
rustup default ${{ matrix.rust }}
rustup component add rustfmt clippy
rustup component add rustfmt
- name: Cache Cargo
uses: actions/cache@v4
with:
Expand Down
34 changes: 1 addition & 33 deletions .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,6 @@ on:
- .github/**

jobs:
clippy:
name: Clippy
runs-on: ubuntu-latest
container:
image: amd64/rust
defaults:
run:
working-directory: object_store
steps:
- uses: actions/checkout@v4
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
- name: Setup Clippy
run: rustup component add clippy
# Run different tests for the library on its own as well as
# all targets to ensure that it still works in the absence of
# features that might be enabled by dev-dependencies of other
# targets.
- name: Run clippy with default features
run: cargo clippy -- -D warnings
- name: Run clippy with aws feature
run: cargo clippy --features aws -- -D warnings
- name: Run clippy with gcp feature
run: cargo clippy --features gcp -- -D warnings
- name: Run clippy with azure feature
run: cargo clippy --features azure -- -D warnings
- name: Run clippy with http feature
run: cargo clippy --features http -- -D warnings
- name: Run clippy with all features
run: cargo clippy --all-features -- -D warnings
- name: Run clippy with all features and all targets
run: cargo clippy --all-features --all-targets -- -D warnings

# test doc links still work
#
# Note that since object_store is not part of the main workspace,
Expand Down Expand Up @@ -141,6 +108,7 @@ jobs:
echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.3.0)" >> $GITHUB_ENV
echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key --description "test key")
Expand Down
14 changes: 0 additions & 14 deletions .github/workflows/parquet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,3 @@ jobs:
run: |
cd parquet/pytest
pytest -v
clippy:
name: Clippy
runs-on: ubuntu-latest
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
- name: Setup Clippy
run: rustup component add clippy
- name: Run clippy
run: cargo clippy -p parquet --all-targets --all-features -- -D warnings
14 changes: 0 additions & 14 deletions .github/workflows/parquet_derive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,3 @@ jobs:
uses: ./.github/actions/setup-builder
- name: Test
run: cargo test -p parquet_derive

clippy:
name: Clippy
runs-on: ubuntu-latest
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
- name: Setup Clippy
run: rustup component add clippy
- name: Run clippy
run: cargo clippy -p parquet_derive --all-features -- -D warnings
32 changes: 24 additions & 8 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult,
ListResponse,
ListResponse, PartMetadata,
};
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
Expand Down Expand Up @@ -62,6 +62,7 @@ use std::sync::Arc;
const VERSION_HEADER: &str = "x-amz-version-id";
const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256";
const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-amz-meta-";
const ALGORITHM: &str = "x-amz-checksum-algorithm";

/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -349,10 +350,9 @@ impl<'a> Request<'a> {
let payload_sha256 = sha256.finish();

if let Some(Checksum::SHA256) = self.config.checksum {
self.builder = self.builder.header(
"x-amz-checksum-sha256",
BASE64_STANDARD.encode(payload_sha256),
);
self.builder = self
.builder
.header(SHA256_CHECKSUM, BASE64_STANDARD.encode(payload_sha256));
}
self.payload_sha256 = Some(payload_sha256);
}
Expand Down Expand Up @@ -534,8 +534,11 @@ impl S3Client {
location: &Path,
opts: PutMultipartOpts,
) -> Result<MultipartId> {
let response = self
.request(Method::POST, location)
let mut reqquest = self.request(Method::POST, location);
if let Some(algorithm) = self.config.checksum {
reqquest = reqquest.header(ALGORITHM, &algorithm.to_string().to_uppercase());
}
let response = reqquest
.query(&[("uploads", "")])
.with_encryption_headers()
.with_attributes(opts.attributes)
Expand Down Expand Up @@ -569,8 +572,21 @@ impl S3Client {
.idempotent(true)
.send()
.await?;
let checksum = response
.headers()
.get(SHA256_CHECKSUM)
.and_then(|v| v.to_str().ok())
.map(|v| v.to_string());

let e_tag = get_etag(response.headers()).context(MetadataSnafu)?;

let content_id = if self.config.checksum == Some(Checksum::SHA256) {
let meta = PartMetadata { e_tag, checksum };
quick_xml::se::to_string(&meta).unwrap()
} else {
e_tag
};

let content_id = get_etag(response.headers()).context(MetadataSnafu)?;
Ok(PartId { content_id })
}

Expand Down
60 changes: 60 additions & 0 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,66 @@ mod tests {

const NON_EXISTENT_NAME: &str = "nonexistentname";

#[tokio::test]
async fn write_multipart_file_with_signature() {
maybe_skip_integration!();

let store = AmazonS3Builder::from_env()
.with_checksum_algorithm(Checksum::SHA256)
.build()
.unwrap();

let str = "test.bin";
let path = Path::parse(str).unwrap();
let opts = PutMultipartOpts::default();
let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();

upload
.put_part(PutPayload::from(vec![0u8; 10_000_000]))
.await
.unwrap();
upload
.put_part(PutPayload::from(vec![0u8; 5_000_000]))
.await
.unwrap();

let res = upload.complete().await.unwrap();
assert!(res.e_tag.is_some(), "Should have valid etag");

store.delete(&path).await.unwrap();
}

#[tokio::test]
async fn write_multipart_file_with_signature_object_lock() {
maybe_skip_integration!();

let bucket = "test-object-lock";
let store = AmazonS3Builder::from_env()
.with_bucket_name(bucket)
.with_checksum_algorithm(Checksum::SHA256)
.build()
.unwrap();

let str = "test.bin";
let path = Path::parse(str).unwrap();
let opts = PutMultipartOpts::default();
let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();

upload
.put_part(PutPayload::from(vec![0u8; 10_000_000]))
.await
.unwrap();
upload
.put_part(PutPayload::from(vec![0u8; 5_000_000]))
.await
.unwrap();

let res = upload.complete().await.unwrap();
assert!(res.e_tag.is_some(), "Should have valid etag");

store.delete(&path).await.unwrap();
}

#[tokio::test]
async fn s3_test() {
maybe_skip_integration!();
Expand Down
27 changes: 24 additions & 3 deletions object_store/src/client/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,32 @@ pub struct CompleteMultipartUpload {
pub part: Vec<MultipartPart>,
}

#[derive(Serialize, Deserialize)]
pub(crate) struct PartMetadata {
pub e_tag: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub checksum: Option<String>,
}

impl From<Vec<PartId>> for CompleteMultipartUpload {
fn from(value: Vec<PartId>) -> Self {
let part = value
.into_iter()
.enumerate()
.map(|(part_number, part)| MultipartPart {
e_tag: part.content_id,
part_number: part_number + 1,
.map(|(part_idx, part)| {
let md = match quick_xml::de::from_str::<PartMetadata>(&part.content_id) {
Ok(md) => md,
// fallback to old way
Err(_) => PartMetadata {
e_tag: part.content_id.clone(),
checksum: None,
},
};
MultipartPart {
e_tag: md.e_tag,
part_number: part_idx + 1,
checksum_sha256: md.checksum,
}
})
.collect();
Self { part }
Expand All @@ -118,6 +136,9 @@ pub struct MultipartPart {
pub e_tag: String,
#[serde(rename = "PartNumber")]
pub part_number: usize,
#[serde(rename = "ChecksumSHA256")]
#[serde(skip_serializing_if = "Option::is_none")]
pub checksum_sha256: Option<String>,
}

#[derive(Debug, Deserialize)]
Expand Down
6 changes: 3 additions & 3 deletions object_store/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,13 @@ impl WriteMultipart {

/// Flush final chunk, and await completion of all in-flight requests
pub async fn finish(mut self) -> Result<PutResult> {
self.wait_for_capacity(0).await?;

if !self.buffer.is_empty() {
let part = std::mem::take(&mut self.buffer);
self.put_part(part.into())
self.upload.put_part(part.into()).await?;
}

self.wait_for_capacity(0).await?;

match self.upload.complete().await {
Err(e) => {
self.tasks.shutdown().await;
Expand Down
Loading