From 90b08ef7143a78c05e6eb18d7e38a2cb5b508553 Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Thu, 19 Dec 2024 13:55:16 +0100 Subject: [PATCH] [FSTORE-1522] [APPEND] Fix hsml deduplication (#439) * Fix pandas imports in hopsworks_common.util * Fix download * Fix kafka topic validation * Fix import HAS_PANDAS * Remove kafka validation test for exception raising * Ruff --- python/hopsworks_common/core/dataset_api.py | 36 ++++++++++----------- python/hopsworks_common/kafka_topic.py | 5 --- python/hopsworks_common/util.py | 10 ++---- python/tests/test_kafka_topic.py | 18 ----------- 4 files changed, 19 insertions(+), 50 deletions(-) diff --git a/python/hopsworks_common/core/dataset_api.py b/python/hopsworks_common/core/dataset_api.py index 644412d32..bbce0b79d 100644 --- a/python/hopsworks_common/core/dataset_api.py +++ b/python/hopsworks_common/core/dataset_api.py @@ -100,28 +100,26 @@ def download( # Build the path to download the file on the local fs and return to the user, it should be absolute for consistency # Download in CWD if local_path not specified if local_path is None: - local_path = os.path.join(os.getcwd(), os.path.basename(path)) + local_path = os.getcwd() # If local_path specified, ensure it is absolute - else: - if os.path.isabs(local_path): - local_path = os.path.join(local_path, os.path.basename(path)) - else: - local_path = os.path.join( - os.getcwd(), local_path, os.path.basename(path) - ) + elif not os.path.isabs(local_path): + local_path = os.path.join(os.getcwd(), local_path) - if os.path.exists(local_path): - if overwrite: - if os.path.isfile: - os.remove(local_path) - else: - shutil.rmtree(local_path) - else: - raise IOError( - "{} already exists, set overwrite=True to overwrite it".format( - local_path - ) + # If local_path is a directory, download into the directory + if os.path.isdir(local_path): + local_path = os.path.join(local_path, os.path.basename(path)) + + if overwrite: + if os.path.isfile(local_path): + os.remove(local_path) + elif os.path.isdir(local_path): + shutil.rmtree(local_path) + elif os.path.exists(local_path): + raise IOError( + "{} already exists, set overwrite=True to overwrite it".format( + local_path ) + ) file_size = int(self._get(path)["attributes"]["size"]) with _client._send_request( diff --git a/python/hopsworks_common/kafka_topic.py b/python/hopsworks_common/kafka_topic.py index c056917b7..0bf3e65df 100644 --- a/python/hopsworks_common/kafka_topic.py +++ b/python/hopsworks_common/kafka_topic.py @@ -84,11 +84,6 @@ def _validate_topic_config(cls, name, num_replicas, num_partitions): ) ) num_partitions = KAFKA_TOPIC.NUM_PARTITIONS - else: - if num_replicas is not None or num_partitions is not None: - raise ValueError( - "Number of replicas or partitions cannot be changed in existing kafka topics." - ) elif name is None or name == KAFKA_TOPIC.NONE: num_replicas = None num_partitions = None diff --git a/python/hopsworks_common/util.py b/python/hopsworks_common/util.py index 27a3ff8eb..90713c931 100644 --- a/python/hopsworks_common/util.py +++ b/python/hopsworks_common/util.py @@ -27,7 +27,6 @@ import time from datetime import date, datetime, timezone from typing import ( - TYPE_CHECKING, Any, Callable, Dict, @@ -47,7 +46,7 @@ from six import string_types -if TYPE_CHECKING: +if HAS_PANDAS: import pandas as pd @@ -74,9 +73,6 @@ def convert(self, obj): import numpy as np - if HAS_PANDAS: - import pandas as pd - def encode_binary(x): return base64.encodebytes(x).decode("ascii") @@ -88,7 +84,7 @@ def encode_binary(x): else: return obj.tolist(), True - if isinstance(obj, datetime.date) or ( + if isinstance(obj, datetime) or ( HAS_PANDAS and isinstance(obj, pd.Timestamp) ): return obj.isoformat(), True @@ -520,8 +516,6 @@ def _handle_tensor_input(input_tensor): def _handle_dataframe_input(input_ex): - if HAS_PANDAS: - import pandas as pd if HAS_PANDAS and isinstance(input_ex, pd.DataFrame): if not input_ex.empty: return input_ex.iloc[0].tolist() diff --git a/python/tests/test_kafka_topic.py b/python/tests/test_kafka_topic.py index fa91388a9..79a306e25 100644 --- a/python/tests/test_kafka_topic.py +++ b/python/tests/test_kafka_topic.py @@ -17,7 +17,6 @@ import copy import humps -import pytest from hopsworks_common import kafka_topic from hopsworks_common.constants import KAFKA_TOPIC @@ -160,23 +159,6 @@ def test_validate_topic_config_existing_with_name_only(self, backend_fixtures): assert num_repl is None assert num_part is None - def test_validate_topic_config_existing_with_name_and_config( - self, backend_fixtures - ): - # Arrange - json = backend_fixtures["kafka_topic"]["get_existing_with_name_and_config"][ - "response" - ]["kafka_topic_dto"] - - # Act - with pytest.raises(ValueError) as e_info: - num_repl, num_part = kafka_topic.KafkaTopic._validate_topic_config( - json["name"], json["num_replicas"], json["num_partitions"] - ) - - # Assert - assert "Number of replicas or partitions cannot be changed" in str(e_info.value) - def test_validate_topic_config_none(self, backend_fixtures): # Arrange json = backend_fixtures["kafka_topic"]["get_none"]["response"][