Skip to content

Commit

Permalink
fix load_pandas_cluster() for individual files. Previous commit descr…
Browse files Browse the repository at this point in the history
…iption was wrong. It was about fixing saving pandas, not loading it.
  • Loading branch information
arthurprevot committed Jun 3, 2024
1 parent 7e39789 commit f0f49b0
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions yaetos/env_dispatchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,17 @@ def load_pandas_cluster(self, fname, file_type, globy, read_func, read_kwargs):
import uuid

bucket_name, bucket_fname, fname_parts = self.split_s3_path(fname)
local_path = 'tmp/s3_copy_' + fname_parts[-1] + '_' + str(uuid.uuid4())
print(f"### ==== load_pandas_cluster 0", bucket_name, bucket_fname, fname_parts)
# local_path = 'tmp/s3_copy_' + fname_parts[-1] + '_' + str(uuid.uuid4())
uuid_path = str(uuid.uuid4())
local_path = 'tmp/s3_copy_' + uuid_path + '_' + fname_parts[-1] + '/' + fname_parts[-1] # First fname_parts[-1] is to show fname part in folder name for easier debugging in AWS. Second is to isolate fname part in sub folder.
local_folder = 'tmp/s3_copy_' + uuid_path + '_' + fname_parts[-1] # to be based on above
os.makedirs(local_folder, exist_ok=True)
print(f"### ==== load_pandas_cluster 1", local_path, local_folder)
cp = CloudPath(fname) # TODO: add way to load it with specific profile_name or client, as in "s3c = boto3.Session(profile_name='default').client('s3')"
if globy:
cfiles = cp.glob(globy)
os.makedirs(local_path, exist_ok=True)
os.makedirs(local_path, exist_ok=True) # assuming it is folder. TODO: confirm in code.
logger.info(f"Copying {len(cfiles)} files from S3 to local '{local_path}'")
for cfile in cfiles:
local_file_path = os.path.join(local_path, cfile.name)
Expand All @@ -162,8 +168,11 @@ def load_pandas_cluster(self, fname, file_type, globy, read_func, read_kwargs):
else:
logger.info("Copying files from S3 '{}' to local '{}'. May take some time.".format(fname, local_path))
local_pathlib = cp.download_to(local_path)
# local_pathlib = cp.download_to(local_folder)
# logger.info(f"### ==== load_pandas_cluster, {local_path}, {local_pathlib}")
print(f"### ==== load_pandas_cluster", local_path, local_pathlib)
local_path = local_path + '/' if local_pathlib.is_dir() else local_path
logger.info("File copy finished")
logger.info(f"File copy finished, to {local_path}")
df = load_dfs(local_path, file_type, globy, read_func, read_kwargs)
return df

Expand Down

0 comments on commit f0f49b0

Please sign in to comment.