diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java index 91bd36819..3f6a64c62 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java @@ -124,6 +124,9 @@ public static class S3Connector extends StorageConnector { @Getter @Setter protected String bucket; + @Getter @Setter + protected String path; + @Getter @Setter protected String region; @@ -138,7 +141,9 @@ public static class S3Connector extends StorageConnector { @JsonIgnore public String getPath(String subPath) { - return "s3://" + bucket + "/" + (Strings.isNullOrEmpty(subPath) ? "" : subPath); + return "s3://" + bucket + + (Strings.isNullOrEmpty(path) ? "" : "/" + path) + "/" + + (Strings.isNullOrEmpty(subPath) ? "" : subPath); } @Override diff --git a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestStorageConnector.java b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestStorageConnector.java index f4bd0c591..e207484bf 100644 --- a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestStorageConnector.java +++ b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestStorageConnector.java @@ -19,6 +19,7 @@ import com.logicalclocks.hsfs.FeatureStoreException; import com.logicalclocks.hsfs.StorageConnectorType; +import com.logicalclocks.hsfs.StorageConnector.S3Connector; import com.logicalclocks.hsfs.metadata.HopsworksClient; import com.logicalclocks.hsfs.metadata.HopsworksHttpClient; import com.logicalclocks.hsfs.metadata.Option; @@ -245,6 +246,33 @@ void testDefaultPathS3() throws FeatureStoreException, IOException { // reset SparkEngine.setInstance(null); } + + @Test + void testGetPath() throws FeatureStoreException, IOException { + // Arrange + S3Connector connector = new S3Connector(); + connector.setBucket("testBucket"); + + // Act + String path = connector.getPath("some/location"); + + // Assert + Assertions.assertEquals("s3://testBucket/some/location", path); + } + + @Test + void testGetPathStorageConnectorWithPath() throws FeatureStoreException, IOException { + // Arrange + S3Connector connector = new S3Connector(); + connector.setBucket("testBucket"); + connector.setPath("abc/def"); + + // Act + String path = connector.getPath("some/location"); + + // Assert + Assertions.assertEquals("s3://testBucket/abc/def/some/location", path); + } } @Nested diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 57c45167b..01a801be4 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -18,6 +18,7 @@ import base64 import logging import os +import posixpath import re import warnings from abc import ABC, abstractmethod @@ -276,6 +277,7 @@ def __init__( server_encryption_algorithm: Optional[str] = None, server_encryption_key: Optional[str] = None, bucket: Optional[str] = None, + path: Optional[str] = None, region: Optional[str] = None, session_token: Optional[str] = None, iam_role: Optional[str] = None, @@ -290,6 +292,7 @@ def __init__( self._server_encryption_algorithm = server_encryption_algorithm self._server_encryption_key = server_encryption_key self._bucket = bucket + self._path = path self._region = region self._session_token = session_token self._iam_role = iam_role @@ -340,7 +343,7 @@ def iam_role(self) -> Optional[str]: @property def path(self) -> Optional[str]: """If the connector refers to a path (e.g. S3) - return the path of the connector""" - return "s3://" + self._bucket + return posixpath.join("s3://" + self._bucket, *os.path.split(self._path if self._path else "")) @property def arguments(self) -> Optional[Dict[str, Any]]: @@ -433,7 +436,7 @@ def read( ) def _get_path(self, sub_path: str) -> str: - return os.path.join(self.path, sub_path) + return posixpath.join(self.path, *os.path.split(sub_path)) class RedshiftConnector(StorageConnector): diff --git a/python/tests/test_storage_connector.py b/python/tests/test_storage_connector.py index 1811a86f1..a1f7e77c3 100644 --- a/python/tests/test_storage_connector.py +++ b/python/tests/test_storage_connector.py @@ -115,6 +115,30 @@ def test_default_path(self, mocker): # assert assert "s3://test-bucket" in mock_engine_read.call_args[0][3] + def test_get_path(self, mocker): + mocker.patch("hsfs.engine.get_instance", return_value=spark.Engine()) + sc = storage_connector.S3Connector( + id=1, name="test_connector", featurestore_id=1, bucket="test-bucket" + ) + + # act + result = sc._get_path("some/location") + + # assert + assert "s3://test-bucket/some/location" == result + + def test_get_path_storage_connector_with_path(self, mocker): + mocker.patch("hsfs.engine.get_instance", return_value=spark.Engine()) + sc = storage_connector.S3Connector( + id=1, name="test_connector", featurestore_id=1, bucket="test-bucket", path="abc/def" + ) + + # act + result = sc._get_path("some/location") + + # assert + assert "s3://test-bucket/abc/def/some/location" == result + class TestRedshiftConnector: def test_from_response_json(self, backend_fixtures):