Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix S3 bugs and test writing to S3 #16

Merged
merged 7 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ jobs:
- name: Build
run: cmake --build ${{github.workspace}}/build --config ${{env.BUILD_TYPE}}

- name: Test
- name: Test C++
working-directory: ${{github.workspace}}/build
env:
ZARR_S3_ENDPOINT: ${{ env.MINIO_URL }}
Expand All @@ -128,10 +128,19 @@ jobs:
ZARR_S3_SECRET_ACCESS_KEY: ${{ env.MINIO_SECRET_KEY }}
run: ctest -C ${{env.BUILD_TYPE}} -L s3 --output-on-failure

- name: Install dependencies
run: python -m pip install -U pip "pybind11[global]" cmake build numpy pytest

- name: Build and install Python bindings
run: python -m pip install ".[testing]"

- name: Test Python
run: python -m pytest -v -k test_stream_data_to_s3


test_python:
name: Test Python on ${{ matrix.platform }}
runs-on: ${{ matrix.platform }}
timeout-minutes: 20
jeskesen marked this conversation as resolved.
Show resolved Hide resolved
strategy:
fail-fast: false
matrix:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ testing = [
"s3fs",
"tifffile",
"zarr",
"python-dotenv",
]

[tool.black]
Expand Down
60 changes: 38 additions & 22 deletions python/acquire-zarr-py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ dimension_type_to_str(ZarrDimensionType t)
return "UNKNOWN";
}
}

} // namespace

class PyZarrS3Settings
Expand Down Expand Up @@ -163,10 +162,10 @@ class PyZarrCompressionSettings
}

private:
ZarrCompressor compressor_;
ZarrCompressionCodec codec_;
uint8_t level_;
uint8_t shuffle_;
ZarrCompressor compressor_{ ZarrCompressor_None };
ZarrCompressionCodec codec_{ ZarrCompressionCodec_None };
uint8_t level_{ 0 };
uint8_t shuffle_{ 0 };
};

class PyZarrDimensionProperties
Expand Down Expand Up @@ -218,10 +217,10 @@ class PyZarrStreamSettings

std::vector<PyZarrDimensionProperties> dimensions;

std::string store_path() const { return store_path_; }
const std::string& store_path() const { return store_path_; }
void set_store_path(const std::string& path) { store_path_ = path; }

std::optional<std::string> custom_metadata() const
const std::optional<std::string>& custom_metadata() const
{
return custom_metadata_;
}
Expand All @@ -230,13 +229,13 @@ class PyZarrStreamSettings
custom_metadata_ = metadata;
}

std::optional<PyZarrS3Settings> s3() const { return s3_settings_; }
const std::optional<PyZarrS3Settings>& s3() const { return s3_settings_; }
void set_s3(const std::optional<PyZarrS3Settings>& settings)
{
s3_settings_ = settings;
}

std::optional<PyZarrCompressionSettings> compression() const
const std::optional<PyZarrCompressionSettings>& compression() const
{
return compression_settings_;
}
Expand Down Expand Up @@ -287,21 +286,28 @@ class PyZarrStream
.version = settings.version(),
};

auto store_path = settings.store_path();
stream_settings.store_path = store_path.c_str();
store_path_ = settings.store_path();
stream_settings.store_path = store_path_.c_str();

std::string metadata;
if (settings.custom_metadata()) {
metadata = settings.custom_metadata().value();
stream_settings.custom_metadata = metadata.c_str();
custom_metadata_ = settings.custom_metadata().value();
stream_settings.custom_metadata = custom_metadata_.c_str();
}

if (settings.s3().has_value()) {
s3_settings.endpoint = settings.s3()->endpoint().c_str();
s3_settings.bucket_name = settings.s3()->bucket_name().c_str();
s3_settings.access_key_id = settings.s3()->access_key_id().c_str();
s3_settings.secret_access_key =
settings.s3()->secret_access_key().c_str();
const auto& s3 = settings.s3().value();
s3_endpoint_ = s3.endpoint();
s3_settings.endpoint = s3_endpoint_.c_str();

s3_bucket_name_ = s3.bucket_name();
s3_settings.bucket_name = s3_bucket_name_.c_str();

s3_access_key_id_ = s3.access_key_id();
s3_settings.access_key_id = s3_access_key_id_.c_str();

s3_secret_access_key_ = s3.secret_access_key();
s3_settings.secret_access_key = s3_secret_access_key_.c_str();

stream_settings.s3_settings = &s3_settings;
}

Expand All @@ -315,14 +321,14 @@ class PyZarrStream
}

const auto& dims = settings.dimensions;
dimension_names_.resize(dims.size());

std::vector<ZarrDimensionProperties> dimension_props;
std::vector<std::string> dimension_names(dims.size());
for (auto i = 0; i < dims.size(); ++i) {
const auto& dim = dims[i];
dimension_names[i] = dim.name();
dimension_names_[i] = dim.name();
ZarrDimensionProperties properties{
.name = dimension_names[i].c_str(),
.name = dimension_names_[i].c_str(),
.type = dim.type(),
.array_size_px = dim.array_size_px(),
.chunk_size_px = dim.chunk_size_px(),
Expand Down Expand Up @@ -372,6 +378,16 @@ class PyZarrStream
std::unique_ptr<ZarrStream, decltype(ZarrStreamDeleter)>;

ZarrStreamPtr stream_;

std::string store_path_;
std::string custom_metadata_;

std::vector<std::string> dimension_names_;

std::string s3_endpoint_;
std::string s3_bucket_name_;
std::string s3_access_key_id_;
std::string s3_secret_access_key_;
};

PYBIND11_MODULE(acquire_zarr, m)
Expand Down
55 changes: 32 additions & 23 deletions python/tests/test_stream.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
#!/usr/bin/env python3

import dotenv

dotenv.load_dotenv()

import json
from pathlib import Path
import os
Expand All @@ -17,6 +14,7 @@
import pytest
import zarr
from numcodecs import blosc
import re
import s3fs

from acquire_zarr import (
Expand All @@ -29,6 +27,7 @@
Dimension,
DimensionType,
ZarrVersion,
DataType,
)


Expand Down Expand Up @@ -65,6 +64,25 @@ def settings():
return s


@pytest.fixture(scope="module")
def s3_settings():
dotenv.load_dotenv()
if (
"ZARR_S3_ENDPOINT" not in os.environ
or "ZARR_S3_BUCKET_NAME" not in os.environ
or "ZARR_S3_ACCESS_KEY_ID" not in os.environ
or "ZARR_S3_SECRET_ACCESS_KEY" not in os.environ
):
yield None
else:
yield S3Settings(
endpoint=os.environ["ZARR_S3_ENDPOINT"],
bucket_name=os.environ["ZARR_S3_BUCKET_NAME"],
access_key_id=os.environ["ZARR_S3_ACCESS_KEY_ID"],
secret_access_key=os.environ["ZARR_S3_SECRET_ACCESS_KEY"],
)


@pytest.fixture(scope="function")
def store_path(tmp_path):
yield tmp_path
Expand Down Expand Up @@ -143,17 +161,6 @@ def get_directory_store(version: ZarrVersion, store_path: str):
else:
return zarr.DirectoryStoreV3(store_path)

def make_s3_settings(store_path: str):
if "ZARR_S3_ENDPOINT" not in os.environ or "ZARR_S3_BUCKET_NAME" not in os.environ or "ZARR_S3_ACCESS_KEY_ID" not in os.environ or "ZARR_S3_SECRET_ACCESS_KEY" not in os.environ:
return None

return S3Settings(
endpoint=os.environ["ZARR_S3_ENDPOINT"],
bucket_name=os.environ["ZARR_S3_BUCKET_NAME"],
access_key_id=os.environ["ZARR_S3_ACCESS_KEY_ID"],
secret_access_key=os.environ["ZARR_S3_SECRET_ACCESS_KEY"],
)


@pytest.mark.parametrize(
("version",),
Expand Down Expand Up @@ -314,21 +321,20 @@ def test_stream_data_to_filesystem(
),
],
)
@pytest.mark.skip(reason="Temporary; needs debugging")
def test_stream_data_to_s3(
settings: StreamSettings,
store_path: Path,
s3_settings: Optional[S3Settings],
request: pytest.FixtureRequest,
version: ZarrVersion,
compression_codec: Optional[CompressionCodec],
):
s3_settings = make_s3_settings(store_path)
if s3_settings is None:
pytest.skip("S3 settings not set")

settings.store_path = str(store_path / f"{request.node.name}.zarr")
settings.store_path = f"{request.node.name}.zarr".replace("[", "").replace("]", "")
settings.version = version
settings.s3 = s3_settings
settings.data_type = DataType.UINT16
if compression_codec is not None:
settings.compression = CompressionSettings(
compressor=Compressor.BLOSC1,
Expand All @@ -352,17 +358,21 @@ def test_stream_data_to_s3(
)
stream.append(data)

del stream # close the stream, flush the files
del stream # close the stream, flush the data

s3 = s3fs.S3FileSystem(
key=settings.s3.access_key_id,
secret=settings.s3_secret_access_key,
client_kwargs={"endpoint_url": settings.s3_endpoint},
secret=settings.s3.secret_access_key,
client_kwargs={"endpoint_url": settings.s3.endpoint},
)
store = s3fs.S3Map(
root=f"{s3_settings.bucket_name}/{settings.store_path}", s3=s3
)
cache = zarr.LRUStoreCache(store, max_size=2**28)
cache = (
zarr.LRUStoreCache(store, max_size=2**28)
if version == ZarrVersion.V2
jeskesen marked this conversation as resolved.
Show resolved Hide resolved
else zarr.LRUStoreCacheV3(store, max_size=2**28)
)
group = zarr.group(store=cache)

data = group["0"]
Expand All @@ -387,4 +397,3 @@ def test_stream_data_to_s3(

# cleanup
s3.rm(store.root, recursive=True)

20 changes: 9 additions & 11 deletions src/streaming/s3.sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,13 @@ zarr::S3Sink::is_multipart_upload_() const
void
zarr::S3Sink::create_multipart_upload_()
{
if (!is_multipart_upload_()) {
multipart_upload_ = {};
}

if (!multipart_upload_->upload_id.empty()) {
return;
}
multipart_upload_ = MultiPartUpload{};

auto connection = connection_pool_->get_connection();
multipart_upload_->upload_id =
connection_pool_->get_connection()->create_multipart_object(bucket_name_,
object_key_);
connection->create_multipart_object(bucket_name_, object_key_);

connection_pool_->return_connection(std::move(connection));
}

bool
Expand All @@ -148,9 +144,11 @@ zarr::S3Sink::flush_part_()
return false;
}

auto connection = connection_pool_->get_connection();
if (!is_multipart_upload_()) {
create_multipart_upload_();
}

create_multipart_upload_();
auto connection = connection_pool_->get_connection();

bool retval = false;
try {
Expand Down
1 change: 1 addition & 0 deletions tests/unit-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ set(tests
s3-connection-upload-multipart-object
file-sink-write
s3-sink-write
s3-sink-write-multipart
sink-creator-make-metadata-sinks
sink-creator-make-data-sinks
array-writer-downsample-writer-config
Expand Down
Loading
Loading