From e2ac5454a9af5ea043b414011c691dde0acc219d Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Wed, 30 Oct 2024 17:13:21 -0400 Subject: [PATCH 1/7] Fix bugs in multipart upload --- src/streaming/s3.sink.cpp | 20 ++-- tests/unit-tests/CMakeLists.txt | 1 + tests/unit-tests/s3-sink-write-multipart.cpp | 112 +++++++++++++++++++ 3 files changed, 122 insertions(+), 11 deletions(-) create mode 100644 tests/unit-tests/s3-sink-write-multipart.cpp diff --git a/src/streaming/s3.sink.cpp b/src/streaming/s3.sink.cpp index 04ea3c7..81150ab 100644 --- a/src/streaming/s3.sink.cpp +++ b/src/streaming/s3.sink.cpp @@ -128,17 +128,13 @@ zarr::S3Sink::is_multipart_upload_() const void zarr::S3Sink::create_multipart_upload_() { - if (!is_multipart_upload_()) { - multipart_upload_ = {}; - } - - if (!multipart_upload_->upload_id.empty()) { - return; - } + multipart_upload_ = MultiPartUpload{}; + auto connection = connection_pool_->get_connection(); multipart_upload_->upload_id = - connection_pool_->get_connection()->create_multipart_object(bucket_name_, - object_key_); + connection->create_multipart_object(bucket_name_, object_key_); + + connection_pool_->return_connection(std::move(connection)); } bool @@ -148,9 +144,11 @@ zarr::S3Sink::flush_part_() return false; } - auto connection = connection_pool_->get_connection(); + if (!is_multipart_upload_()) { + create_multipart_upload_(); + } - create_multipart_upload_(); + auto connection = connection_pool_->get_connection(); bool retval = false; try { diff --git a/tests/unit-tests/CMakeLists.txt b/tests/unit-tests/CMakeLists.txt index aefbc34..6069b73 100644 --- a/tests/unit-tests/CMakeLists.txt +++ b/tests/unit-tests/CMakeLists.txt @@ -14,6 +14,7 @@ set(tests s3-connection-upload-multipart-object file-sink-write s3-sink-write + s3-sink-write-multipart sink-creator-make-metadata-sinks sink-creator-make-data-sinks array-writer-downsample-writer-config diff --git a/tests/unit-tests/s3-sink-write-multipart.cpp b/tests/unit-tests/s3-sink-write-multipart.cpp new file mode 100644 index 0000000..a335281 --- /dev/null +++ b/tests/unit-tests/s3-sink-write-multipart.cpp @@ -0,0 +1,112 @@ +#include "s3.sink.hh" +#include "unit.test.macros.hh" + +#include +#include + +namespace { +bool +get_credentials(std::string& endpoint, + std::string& bucket_name, + std::string& access_key_id, + std::string& secret_access_key) +{ + char* env = nullptr; + if (!(env = std::getenv("ZARR_S3_ENDPOINT"))) { + LOG_ERROR("ZARR_S3_ENDPOINT not set."); + return false; + } + endpoint = env; + + if (!(env = std::getenv("ZARR_S3_BUCKET_NAME"))) { + LOG_ERROR("ZARR_S3_BUCKET_NAME not set."); + return false; + } + bucket_name = env; + + if (!(env = std::getenv("ZARR_S3_ACCESS_KEY_ID"))) { + LOG_ERROR("ZARR_S3_ACCESS_KEY_ID not set."); + return false; + } + access_key_id = env; + + if (!(env = std::getenv("ZARR_S3_SECRET_ACCESS_KEY"))) { + LOG_ERROR("ZARR_S3_SECRET_ACCESS_KEY not set."); + return false; + } + secret_access_key = env; + + return true; +} +} // namespace + +int +main() +{ + std::string s3_endpoint, bucket_name, s3_access_key_id, + s3_secret_access_key; + + if (!get_credentials( + s3_endpoint, bucket_name, s3_access_key_id, s3_secret_access_key)) { + LOG_WARNING("Failed to get credentials. Skipping test."); + return 0; + } + + int retval = 1; + const std::string object_name = "test-object"; + + try { + auto pool = std::make_shared( + 1, s3_endpoint, s3_access_key_id, s3_secret_access_key); + + auto conn = pool->get_connection(); + if (!conn->is_connection_valid()) { + LOG_ERROR("Failed to connect to S3."); + return 1; + } + CHECK(conn->bucket_exists(bucket_name)); + CHECK(conn->delete_object(bucket_name, object_name)); + CHECK(!conn->object_exists(bucket_name, object_name)); + + pool->return_connection(std::move(conn)); + + std::vector data((5 << 20) + 1, std::byte{ 0 }); + { + auto sink = + std::make_unique(bucket_name, object_name, pool); + CHECK(sink->write(0, data)); + CHECK(zarr::finalize_sink(std::move(sink))); + } + + conn = pool->get_connection(); + CHECK(conn->object_exists(bucket_name, object_name)); + pool->return_connection(std::move(conn)); + + // Verify the object size. + { + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https://"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + + minio::s3::Client client(url, &provider); + minio::s3::StatObjectArgs args; + args.bucket = bucket_name; + args.object = object_name; + + minio::s3::StatObjectResponse resp = client.StatObject(args); + EXPECT_EQ(int, data.size(), resp.size); + } + + // cleanup + conn = pool->get_connection(); + CHECK(conn->delete_object(bucket_name, object_name)); + + retval = 0; + } catch (const std::exception& e) { + LOG_ERROR("Exception: ", e.what()); + } + + return retval; +} \ No newline at end of file From c8b16539d7d5c57658843ec4b994a4afd30156b2 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Thu, 31 Oct 2024 14:16:47 -0400 Subject: [PATCH 2/7] Test S3 streaming --- python/acquire-zarr-py.cpp | 60 +++++++++++++++++++++++-------------- python/tests/test_stream.py | 52 ++++++++++++++++++-------------- 2 files changed, 67 insertions(+), 45 deletions(-) diff --git a/python/acquire-zarr-py.cpp b/python/acquire-zarr-py.cpp index 3683d4f..d8dbaed 100644 --- a/python/acquire-zarr-py.cpp +++ b/python/acquire-zarr-py.cpp @@ -89,7 +89,6 @@ dimension_type_to_str(ZarrDimensionType t) return "UNKNOWN"; } } - } // namespace class PyZarrS3Settings @@ -163,10 +162,10 @@ class PyZarrCompressionSettings } private: - ZarrCompressor compressor_; - ZarrCompressionCodec codec_; - uint8_t level_; - uint8_t shuffle_; + ZarrCompressor compressor_{ ZarrCompressor_None }; + ZarrCompressionCodec codec_{ ZarrCompressionCodec_None }; + uint8_t level_{ 0 }; + uint8_t shuffle_{ 0 }; }; class PyZarrDimensionProperties @@ -218,10 +217,10 @@ class PyZarrStreamSettings std::vector dimensions; - std::string store_path() const { return store_path_; } + const std::string& store_path() const { return store_path_; } void set_store_path(const std::string& path) { store_path_ = path; } - std::optional custom_metadata() const + const std::optional& custom_metadata() const { return custom_metadata_; } @@ -230,13 +229,13 @@ class PyZarrStreamSettings custom_metadata_ = metadata; } - std::optional s3() const { return s3_settings_; } + const std::optional& s3() const { return s3_settings_; } void set_s3(const std::optional& settings) { s3_settings_ = settings; } - std::optional compression() const + const std::optional& compression() const { return compression_settings_; } @@ -287,21 +286,28 @@ class PyZarrStream .version = settings.version(), }; - auto store_path = settings.store_path(); - stream_settings.store_path = store_path.c_str(); + store_path_ = settings.store_path(); + stream_settings.store_path = store_path_.c_str(); - std::string metadata; if (settings.custom_metadata()) { - metadata = settings.custom_metadata().value(); - stream_settings.custom_metadata = metadata.c_str(); + custom_metadata_ = settings.custom_metadata().value(); + stream_settings.custom_metadata = custom_metadata_.c_str(); } if (settings.s3().has_value()) { - s3_settings.endpoint = settings.s3()->endpoint().c_str(); - s3_settings.bucket_name = settings.s3()->bucket_name().c_str(); - s3_settings.access_key_id = settings.s3()->access_key_id().c_str(); - s3_settings.secret_access_key = - settings.s3()->secret_access_key().c_str(); + const auto& s3 = settings.s3().value(); + s3_endpoint_ = s3.endpoint(); + s3_settings.endpoint = s3_endpoint_.c_str(); + + s3_bucket_name_ = s3.bucket_name(); + s3_settings.bucket_name = s3_bucket_name_.c_str(); + + s3_access_key_id_ = s3.access_key_id(); + s3_settings.access_key_id = s3_access_key_id_.c_str(); + + s3_secret_access_key_ = s3.secret_access_key(); + s3_settings.secret_access_key = s3_secret_access_key_.c_str(); + stream_settings.s3_settings = &s3_settings; } @@ -315,14 +321,14 @@ class PyZarrStream } const auto& dims = settings.dimensions; + dimension_names_.resize(dims.size()); std::vector dimension_props; - std::vector dimension_names(dims.size()); for (auto i = 0; i < dims.size(); ++i) { const auto& dim = dims[i]; - dimension_names[i] = dim.name(); + dimension_names_[i] = dim.name(); ZarrDimensionProperties properties{ - .name = dimension_names[i].c_str(), + .name = dimension_names_[i].c_str(), .type = dim.type(), .array_size_px = dim.array_size_px(), .chunk_size_px = dim.chunk_size_px(), @@ -372,6 +378,16 @@ class PyZarrStream std::unique_ptr; ZarrStreamPtr stream_; + + std::string store_path_; + std::string custom_metadata_; + + std::vector dimension_names_; + + std::string s3_endpoint_; + std::string s3_bucket_name_; + std::string s3_access_key_id_; + std::string s3_secret_access_key_; }; PYBIND11_MODULE(acquire_zarr, m) diff --git a/python/tests/test_stream.py b/python/tests/test_stream.py index c96d8cf..d616cd0 100644 --- a/python/tests/test_stream.py +++ b/python/tests/test_stream.py @@ -1,9 +1,6 @@ #!/usr/bin/env python3 import dotenv - -dotenv.load_dotenv() - import json from pathlib import Path import os @@ -65,6 +62,25 @@ def settings(): return s +@pytest.fixture(scope="module") +def s3_settings(): + dotenv.load_dotenv() + if ( + "ZARR_S3_ENDPOINT" not in os.environ + or "ZARR_S3_BUCKET_NAME" not in os.environ + or "ZARR_S3_ACCESS_KEY_ID" not in os.environ + or "ZARR_S3_SECRET_ACCESS_KEY" not in os.environ + ): + return None + + yield S3Settings( + endpoint=os.environ["ZARR_S3_ENDPOINT"], + bucket_name=os.environ["ZARR_S3_BUCKET_NAME"], + access_key_id=os.environ["ZARR_S3_ACCESS_KEY_ID"], + secret_access_key=os.environ["ZARR_S3_SECRET_ACCESS_KEY"], + ) + + @pytest.fixture(scope="function") def store_path(tmp_path): yield tmp_path @@ -143,17 +159,6 @@ def get_directory_store(version: ZarrVersion, store_path: str): else: return zarr.DirectoryStoreV3(store_path) -def make_s3_settings(store_path: str): - if "ZARR_S3_ENDPOINT" not in os.environ or "ZARR_S3_BUCKET_NAME" not in os.environ or "ZARR_S3_ACCESS_KEY_ID" not in os.environ or "ZARR_S3_SECRET_ACCESS_KEY" not in os.environ: - return None - - return S3Settings( - endpoint=os.environ["ZARR_S3_ENDPOINT"], - bucket_name=os.environ["ZARR_S3_BUCKET_NAME"], - access_key_id=os.environ["ZARR_S3_ACCESS_KEY_ID"], - secret_access_key=os.environ["ZARR_S3_SECRET_ACCESS_KEY"], - ) - @pytest.mark.parametrize( ("version",), @@ -314,19 +319,17 @@ def test_stream_data_to_filesystem( ), ], ) -@pytest.mark.skip(reason="Temporary; needs debugging") def test_stream_data_to_s3( settings: StreamSettings, - store_path: Path, + s3_settings: Optional[S3Settings], request: pytest.FixtureRequest, version: ZarrVersion, compression_codec: Optional[CompressionCodec], ): - s3_settings = make_s3_settings(store_path) if s3_settings is None: pytest.skip("S3 settings not set") - settings.store_path = str(store_path / f"{request.node.name}.zarr") + settings.store_path = f"{request.node.name}.zarr" settings.version = version settings.s3 = s3_settings if compression_codec is not None: @@ -352,17 +355,21 @@ def test_stream_data_to_s3( ) stream.append(data) - del stream # close the stream, flush the files + del stream # close the stream, flush the data s3 = s3fs.S3FileSystem( key=settings.s3.access_key_id, - secret=settings.s3_secret_access_key, - client_kwargs={"endpoint_url": settings.s3_endpoint}, + secret=settings.s3.secret_access_key, + client_kwargs={"endpoint_url": settings.s3.endpoint}, ) store = s3fs.S3Map( root=f"{s3_settings.bucket_name}/{settings.store_path}", s3=s3 ) - cache = zarr.LRUStoreCache(store, max_size=2**28) + cache = ( + zarr.LRUStoreCache(store, max_size=2**28) + if version == ZarrVersion.V2 + else zarr.LRUStoreCacheV3(store, max_size=2**28) + ) group = zarr.group(store=cache) data = group["0"] @@ -387,4 +394,3 @@ def test_stream_data_to_s3( # cleanup s3.rm(store.root, recursive=True) - \ No newline at end of file From 7273c7c32fcb055db613349adf75a3daef3c4d9e Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Thu, 31 Oct 2024 14:24:36 -0400 Subject: [PATCH 3/7] Run S3 tests --- .github/workflows/test.yml | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4557686..5a8ab02 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -131,7 +131,6 @@ jobs: test_python: name: Test Python on ${{ matrix.platform }} runs-on: ${{ matrix.platform }} - timeout-minutes: 20 strategy: fail-fast: false matrix: @@ -139,6 +138,14 @@ jobs: - "ubuntu-latest" - "windows-latest" - "macos-latest" + env: + MINIO_ROOT_USER: admin + MINIO_ROOT_PASSWORD: password + MINIO_URL: http://localhost:9000 + MINIO_ALIAS: myminio + MINIO_BUCKET: acquire-test + MINIO_ACCESS_KEY: acquire + MINIO_SECRET_KEY: 12345678 steps: - uses: actions/checkout@v3 @@ -151,6 +158,26 @@ jobs: with: python-version: "3.10" + - name: Install minio and mcli + run: | + apt update && apt install -y tmux wget + wget https://dl.min.io/server/minio/release/linux-amd64/minio -O /usr/local/bin/minio + wget https://dl.min.io/client/mc/release/linux-amd64/mc -O /usr/local/bin/mcli + chmod +x /usr/local/bin/minio + chmod +x /usr/local/bin/mcli + + - name: Start minio in tmux + run: | + tmux new -d -s minio + tmux send-keys -t minio "MINIO_ROOT_USER=$MINIO_ROOT_USER MINIO_ROOT_PASSWORD=$MINIO_ROOT_PASSWORD minio server /tmp/minio --console-address :9001" Enter + sleep 5 + mcli alias set $MINIO_ALIAS $MINIO_URL $MINIO_ROOT_USER $MINIO_ROOT_PASSWORD + mcli admin user svcacct add --access-key $MINIO_ACCESS_KEY --secret-key $MINIO_SECRET_KEY $MINIO_ALIAS $MINIO_ROOT_USER + + - name: Create a bucket + run: | + mcli mb $MINIO_ALIAS/$MINIO_BUCKET + - name: Install vcpkg run: | git clone https://github.com/microsoft/vcpkg.git @@ -167,4 +194,9 @@ jobs: run: python -m pip install ".[testing]" - name: Run tests + env: + ZARR_S3_ENDPOINT: ${{ env.MINIO_URL }} + ZARR_S3_BUCKET_NAME: ${{ env.MINIO_BUCKET }} + ZARR_S3_ACCESS_KEY_ID: ${{ env.MINIO_ACCESS_KEY }} + ZARR_S3_SECRET_ACCESS_KEY: ${{ env.MINIO_SECRET_KEY }} run: python -m pytest -v From 50684f314b47c3b8f872269bf8d8ce6e22dd622f Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Thu, 31 Oct 2024 14:36:44 -0400 Subject: [PATCH 4/7] Test S3 in Python from dedicated S3 test job --- .github/workflows/test.yml | 34 ++++++++-------------------------- 1 file changed, 8 insertions(+), 26 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5a8ab02..0173fc5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -119,7 +119,7 @@ jobs: - name: Build run: cmake --build ${{github.workspace}}/build --config ${{env.BUILD_TYPE}} - - name: Test + - name: Test C++ working-directory: ${{github.workspace}}/build env: ZARR_S3_ENDPOINT: ${{ env.MINIO_URL }} @@ -128,6 +128,13 @@ jobs: ZARR_S3_SECRET_ACCESS_KEY: ${{ env.MINIO_SECRET_KEY }} run: ctest -C ${{env.BUILD_TYPE}} -L s3 --output-on-failure + - name: Install dependencies + run: python -m pip install -U pip "pybind11[global]" cmake build numpy pytest ".[testing]" + + - name: Test Python + run: python -m pytest -v -k test_stream_data_to_s3 + + test_python: name: Test Python on ${{ matrix.platform }} runs-on: ${{ matrix.platform }} @@ -158,26 +165,6 @@ jobs: with: python-version: "3.10" - - name: Install minio and mcli - run: | - apt update && apt install -y tmux wget - wget https://dl.min.io/server/minio/release/linux-amd64/minio -O /usr/local/bin/minio - wget https://dl.min.io/client/mc/release/linux-amd64/mc -O /usr/local/bin/mcli - chmod +x /usr/local/bin/minio - chmod +x /usr/local/bin/mcli - - - name: Start minio in tmux - run: | - tmux new -d -s minio - tmux send-keys -t minio "MINIO_ROOT_USER=$MINIO_ROOT_USER MINIO_ROOT_PASSWORD=$MINIO_ROOT_PASSWORD minio server /tmp/minio --console-address :9001" Enter - sleep 5 - mcli alias set $MINIO_ALIAS $MINIO_URL $MINIO_ROOT_USER $MINIO_ROOT_PASSWORD - mcli admin user svcacct add --access-key $MINIO_ACCESS_KEY --secret-key $MINIO_SECRET_KEY $MINIO_ALIAS $MINIO_ROOT_USER - - - name: Create a bucket - run: | - mcli mb $MINIO_ALIAS/$MINIO_BUCKET - - name: Install vcpkg run: | git clone https://github.com/microsoft/vcpkg.git @@ -194,9 +181,4 @@ jobs: run: python -m pip install ".[testing]" - name: Run tests - env: - ZARR_S3_ENDPOINT: ${{ env.MINIO_URL }} - ZARR_S3_BUCKET_NAME: ${{ env.MINIO_BUCKET }} - ZARR_S3_ACCESS_KEY_ID: ${{ env.MINIO_ACCESS_KEY }} - ZARR_S3_SECRET_ACCESS_KEY: ${{ env.MINIO_SECRET_KEY }} run: python -m pytest -v From aedcd1a6eaf24327a60ae6b1639112c7aaaf6076 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Thu, 31 Oct 2024 15:01:16 -0400 Subject: [PATCH 5/7] Fix s3_settings fixture --- python/tests/test_stream.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/python/tests/test_stream.py b/python/tests/test_stream.py index d616cd0..218dbd1 100644 --- a/python/tests/test_stream.py +++ b/python/tests/test_stream.py @@ -71,14 +71,14 @@ def s3_settings(): or "ZARR_S3_ACCESS_KEY_ID" not in os.environ or "ZARR_S3_SECRET_ACCESS_KEY" not in os.environ ): - return None - - yield S3Settings( - endpoint=os.environ["ZARR_S3_ENDPOINT"], - bucket_name=os.environ["ZARR_S3_BUCKET_NAME"], - access_key_id=os.environ["ZARR_S3_ACCESS_KEY_ID"], - secret_access_key=os.environ["ZARR_S3_SECRET_ACCESS_KEY"], - ) + yield None + else: + yield S3Settings( + endpoint=os.environ["ZARR_S3_ENDPOINT"], + bucket_name=os.environ["ZARR_S3_BUCKET_NAME"], + access_key_id=os.environ["ZARR_S3_ACCESS_KEY_ID"], + secret_access_key=os.environ["ZARR_S3_SECRET_ACCESS_KEY"], + ) @pytest.fixture(scope="function") From 8dfe43fd9a69dacddda8263869f04ddd03433c43 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Mon, 4 Nov 2024 09:54:53 -0500 Subject: [PATCH 6/7] Replace brackets in store path on S3 --- python/tests/test_stream.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/tests/test_stream.py b/python/tests/test_stream.py index 218dbd1..fdf2b4e 100644 --- a/python/tests/test_stream.py +++ b/python/tests/test_stream.py @@ -14,6 +14,7 @@ import pytest import zarr from numcodecs import blosc +import re import s3fs from acquire_zarr import ( @@ -26,6 +27,7 @@ Dimension, DimensionType, ZarrVersion, + DataType, ) @@ -329,9 +331,10 @@ def test_stream_data_to_s3( if s3_settings is None: pytest.skip("S3 settings not set") - settings.store_path = f"{request.node.name}.zarr" + settings.store_path = f"{request.node.name}.zarr".replace("[", "").replace("]", "") settings.version = version settings.s3 = s3_settings + settings.data_type = DataType.UINT16 if compression_codec is not None: settings.compression = CompressionSettings( compressor=Compressor.BLOSC1, From ec2bb38b996bffd1bae0cad0ad715ee0752b01f6 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Mon, 4 Nov 2024 10:05:33 -0500 Subject: [PATCH 7/7] add python-dotenv to testing dependencies --- .github/workflows/test.yml | 13 ++++--------- pyproject.toml | 1 + 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0173fc5..968192a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -129,7 +129,10 @@ jobs: run: ctest -C ${{env.BUILD_TYPE}} -L s3 --output-on-failure - name: Install dependencies - run: python -m pip install -U pip "pybind11[global]" cmake build numpy pytest ".[testing]" + run: python -m pip install -U pip "pybind11[global]" cmake build numpy pytest + + - name: Build and install Python bindings + run: python -m pip install ".[testing]" - name: Test Python run: python -m pytest -v -k test_stream_data_to_s3 @@ -145,14 +148,6 @@ jobs: - "ubuntu-latest" - "windows-latest" - "macos-latest" - env: - MINIO_ROOT_USER: admin - MINIO_ROOT_PASSWORD: password - MINIO_URL: http://localhost:9000 - MINIO_ALIAS: myminio - MINIO_BUCKET: acquire-test - MINIO_ACCESS_KEY: acquire - MINIO_SECRET_KEY: 12345678 steps: - uses: actions/checkout@v3 diff --git a/pyproject.toml b/pyproject.toml index a9f6537..8ff9a56 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ testing = [ "s3fs", "tifffile", "zarr", + "python-dotenv", ] [tool.black]