From 2fcd32d5496c8517416fe3ce6057319b0929e6dc Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 18 Dec 2024 15:05:35 +0200 Subject: [PATCH 1/7] init --- .../main/java/com/logicalclocks/hsfs/StorageConnector.java | 7 ++++++- python/hsfs/storage_connector.py | 4 +++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java index 91bd36819..3f6a64c62 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java @@ -124,6 +124,9 @@ public static class S3Connector extends StorageConnector { @Getter @Setter protected String bucket; + @Getter @Setter + protected String path; + @Getter @Setter protected String region; @@ -138,7 +141,9 @@ public static class S3Connector extends StorageConnector { @JsonIgnore public String getPath(String subPath) { - return "s3://" + bucket + "/" + (Strings.isNullOrEmpty(subPath) ? "" : subPath); + return "s3://" + bucket + + (Strings.isNullOrEmpty(path) ? "" : "/" + path) + "/" + + (Strings.isNullOrEmpty(subPath) ? "" : subPath); } @Override diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 57c45167b..23f54d202 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -276,6 +276,7 @@ def __init__( server_encryption_algorithm: Optional[str] = None, server_encryption_key: Optional[str] = None, bucket: Optional[str] = None, + path: Optional[str] = None, region: Optional[str] = None, session_token: Optional[str] = None, iam_role: Optional[str] = None, @@ -290,6 +291,7 @@ def __init__( self._server_encryption_algorithm = server_encryption_algorithm self._server_encryption_key = server_encryption_key self._bucket = bucket + self._path = path self._region = region self._session_token = session_token self._iam_role = iam_role @@ -340,7 +342,7 @@ def iam_role(self) -> Optional[str]: @property def path(self) -> Optional[str]: """If the connector refers to a path (e.g. S3) - return the path of the connector""" - return "s3://" + self._bucket + return "s3://" + self._bucket + ("/" + self._path if self._path else "") @property def arguments(self) -> Optional[Dict[str, Any]]: From 8f031dc33118234b23d6bbe8575adc5c9c0ab6a2 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Dec 2024 12:24:45 +0200 Subject: [PATCH 2/7] add simple unit test --- python/tests/test_storage_connector.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/python/tests/test_storage_connector.py b/python/tests/test_storage_connector.py index 1811a86f1..235db7836 100644 --- a/python/tests/test_storage_connector.py +++ b/python/tests/test_storage_connector.py @@ -113,7 +113,19 @@ def test_default_path(self, mocker): ) sc.read(data_format="csv") # assert - assert "s3://test-bucket" in mock_engine_read.call_args[0][3] + assert "" == mock_engine_read.call_args[0][3] + + def test_get_path(self, mocker): + mocker.patch("hsfs.engine.get_instance", return_value=spark.Engine()) + sc = storage_connector.S3Connector( + id=1, name="test_connector", featurestore_id=1, bucket="test-bucket", path="abc/def" + ) + + # act + result = sc._get_path("some/location") + + # assert + assert "s3://test-bucket/abc/def/some/location" == result class TestRedshiftConnector: From f2bbc87f5f2c8f27950d7a97847a8c2ce8709faa Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Dec 2024 14:28:41 +0200 Subject: [PATCH 3/7] tests --- .../hsfs/spark/TestStorageConnector.java | 28 +++++++++++++++++++ python/tests/test_storage_connector.py | 2 +- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestStorageConnector.java b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestStorageConnector.java index f4bd0c591..e207484bf 100644 --- a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestStorageConnector.java +++ b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestStorageConnector.java @@ -19,6 +19,7 @@ import com.logicalclocks.hsfs.FeatureStoreException; import com.logicalclocks.hsfs.StorageConnectorType; +import com.logicalclocks.hsfs.StorageConnector.S3Connector; import com.logicalclocks.hsfs.metadata.HopsworksClient; import com.logicalclocks.hsfs.metadata.HopsworksHttpClient; import com.logicalclocks.hsfs.metadata.Option; @@ -245,6 +246,33 @@ void testDefaultPathS3() throws FeatureStoreException, IOException { // reset SparkEngine.setInstance(null); } + + @Test + void testGetPath() throws FeatureStoreException, IOException { + // Arrange + S3Connector connector = new S3Connector(); + connector.setBucket("testBucket"); + + // Act + String path = connector.getPath("some/location"); + + // Assert + Assertions.assertEquals("s3://testBucket/some/location", path); + } + + @Test + void testGetPathStorageConnectorWithPath() throws FeatureStoreException, IOException { + // Arrange + S3Connector connector = new S3Connector(); + connector.setBucket("testBucket"); + connector.setPath("abc/def"); + + // Act + String path = connector.getPath("some/location"); + + // Assert + Assertions.assertEquals("s3://testBucket/abc/def/some/location", path); + } } @Nested diff --git a/python/tests/test_storage_connector.py b/python/tests/test_storage_connector.py index 235db7836..5dc96a546 100644 --- a/python/tests/test_storage_connector.py +++ b/python/tests/test_storage_connector.py @@ -114,7 +114,7 @@ def test_default_path(self, mocker): sc.read(data_format="csv") # assert assert "" == mock_engine_read.call_args[0][3] - + def test_get_path(self, mocker): mocker.patch("hsfs.engine.get_instance", return_value=spark.Engine()) sc = storage_connector.S3Connector( From 836506a08a7f2d38fe1511dd42e831d4ba05f30a Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Dec 2024 14:35:11 +0200 Subject: [PATCH 4/7] python test change --- python/tests/test_storage_connector.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/python/tests/test_storage_connector.py b/python/tests/test_storage_connector.py index 5dc96a546..a1f7e77c3 100644 --- a/python/tests/test_storage_connector.py +++ b/python/tests/test_storage_connector.py @@ -113,9 +113,21 @@ def test_default_path(self, mocker): ) sc.read(data_format="csv") # assert - assert "" == mock_engine_read.call_args[0][3] + assert "s3://test-bucket" in mock_engine_read.call_args[0][3] def test_get_path(self, mocker): + mocker.patch("hsfs.engine.get_instance", return_value=spark.Engine()) + sc = storage_connector.S3Connector( + id=1, name="test_connector", featurestore_id=1, bucket="test-bucket" + ) + + # act + result = sc._get_path("some/location") + + # assert + assert "s3://test-bucket/some/location" == result + + def test_get_path_storage_connector_with_path(self, mocker): mocker.patch("hsfs.engine.get_instance", return_value=spark.Engine()) sc = storage_connector.S3Connector( id=1, name="test_connector", featurestore_id=1, bucket="test-bucket", path="abc/def" From 6e9b868df75839e2892596318ffe1830ca0210af Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Dec 2024 15:11:15 +0200 Subject: [PATCH 5/7] changes to paths --- python/hsfs/storage_connector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 23f54d202..07a1dd0a6 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -342,7 +342,7 @@ def iam_role(self) -> Optional[str]: @property def path(self) -> Optional[str]: """If the connector refers to a path (e.g. S3) - return the path of the connector""" - return "s3://" + self._bucket + ("/" + self._path if self._path else "") + return os.path.join("s3://" + self._bucket, *os.path.split(self._path if self._path else "")) @property def arguments(self) -> Optional[Dict[str, Any]]: @@ -435,7 +435,7 @@ def read( ) def _get_path(self, sub_path: str) -> str: - return os.path.join(self.path, sub_path) + return os.path.join(self.path, *os.path.split(sub_path)) class RedshiftConnector(StorageConnector): From 505ef7a44fbd9b308cafa1f27deb38204fa9c2f2 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Dec 2024 15:25:42 +0200 Subject: [PATCH 6/7] use posixpath --- python/hsfs/storage_connector.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 07a1dd0a6..cba166738 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -22,6 +22,7 @@ import warnings from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, TypeVar, Union +import posixpath import humps import pandas as pd @@ -342,7 +343,7 @@ def iam_role(self) -> Optional[str]: @property def path(self) -> Optional[str]: """If the connector refers to a path (e.g. S3) - return the path of the connector""" - return os.path.join("s3://" + self._bucket, *os.path.split(self._path if self._path else "")) + return posixpath.join("s3://" + self._bucket, *os.path.split(self._path if self._path else "")) @property def arguments(self) -> Optional[Dict[str, Any]]: @@ -435,7 +436,7 @@ def read( ) def _get_path(self, sub_path: str) -> str: - return os.path.join(self.path, *os.path.split(sub_path)) + return posixpath.join(self.path, *os.path.split(sub_path)) class RedshiftConnector(StorageConnector): From 72b54e04da2a5b52220714336c8244c1e0f9f841 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Dec 2024 15:27:20 +0200 Subject: [PATCH 7/7] ruff --- python/hsfs/storage_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index cba166738..01a801be4 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -18,11 +18,11 @@ import base64 import logging import os +import posixpath import re import warnings from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, TypeVar, Union -import posixpath import humps import pandas as pd