From f0f49b0ad52fab2ddda24219cd9698cb31016cae Mon Sep 17 00:00:00 2001 From: Arthur Prevot Date: Mon, 3 Jun 2024 18:19:17 +0200 Subject: [PATCH] fix load_pandas_cluster() for individual files. Previous commit description was wrong. It was about fixing saving pandas, not loading it. --- yaetos/env_dispatchers.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/yaetos/env_dispatchers.py b/yaetos/env_dispatchers.py index da95b80c..43f8c61b 100644 --- a/yaetos/env_dispatchers.py +++ b/yaetos/env_dispatchers.py @@ -149,11 +149,17 @@ def load_pandas_cluster(self, fname, file_type, globy, read_func, read_kwargs): import uuid bucket_name, bucket_fname, fname_parts = self.split_s3_path(fname) - local_path = 'tmp/s3_copy_' + fname_parts[-1] + '_' + str(uuid.uuid4()) + print(f"### ==== load_pandas_cluster 0", bucket_name, bucket_fname, fname_parts) + # local_path = 'tmp/s3_copy_' + fname_parts[-1] + '_' + str(uuid.uuid4()) + uuid_path = str(uuid.uuid4()) + local_path = 'tmp/s3_copy_' + uuid_path + '_' + fname_parts[-1] + '/' + fname_parts[-1] # First fname_parts[-1] is to show fname part in folder name for easier debugging in AWS. Second is to isolate fname part in sub folder. + local_folder = 'tmp/s3_copy_' + uuid_path + '_' + fname_parts[-1] # to be based on above + os.makedirs(local_folder, exist_ok=True) + print(f"### ==== load_pandas_cluster 1", local_path, local_folder) cp = CloudPath(fname) # TODO: add way to load it with specific profile_name or client, as in "s3c = boto3.Session(profile_name='default').client('s3')" if globy: cfiles = cp.glob(globy) - os.makedirs(local_path, exist_ok=True) + os.makedirs(local_path, exist_ok=True) # assuming it is folder. TODO: confirm in code. logger.info(f"Copying {len(cfiles)} files from S3 to local '{local_path}'") for cfile in cfiles: local_file_path = os.path.join(local_path, cfile.name) @@ -162,8 +168,11 @@ def load_pandas_cluster(self, fname, file_type, globy, read_func, read_kwargs): else: logger.info("Copying files from S3 '{}' to local '{}'. May take some time.".format(fname, local_path)) local_pathlib = cp.download_to(local_path) + # local_pathlib = cp.download_to(local_folder) + # logger.info(f"### ==== load_pandas_cluster, {local_path}, {local_pathlib}") + print(f"### ==== load_pandas_cluster", local_path, local_pathlib) local_path = local_path + '/' if local_pathlib.is_dir() else local_path - logger.info("File copy finished") + logger.info(f"File copy finished, to {local_path}") df = load_dfs(local_path, file_type, globy, read_func, read_kwargs) return df