Skip to content

Commit

Permalink
write to cache, compact log messages
Browse files Browse the repository at this point in the history
  • Loading branch information
papanikos committed Nov 24, 2021
1 parent c7dc308 commit ae2f2d8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 37 deletions.
30 changes: 15 additions & 15 deletions src/cirtap/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,27 +229,27 @@ def mirror(
except:
_logger.debug("Failed to send email")

# Testing
# ten_targets = [
# "100053.5",
# "100.11",
# "100053.4",
# "100.9",
# "1123738.3",
# "1000562.3",
# "100053.8",
# "469009.4",
# "1309411.5",
# "100053.6",
# ]
# genome_jobs = [job for job in genome_jobs if job in ten_targets]
# Testing
# ten_targets = [
# "100053.5",
# "1006155.5", # Missing
# "2030827.47", # Permission denied
# "100.9",
# "1123738.3",
# "1000562.3",
# "100053.8",
# "469009.4",
# "1309411.5",
# "100053.6",
# ]
# genome_jobs = [job for job in genome_jobs if job in ten_targets]

try:
if (
len(genome_jobs) != 0 and check_genomes is True
) or force_check is True:
finished_jobs = mirror_genomes_dir(
genome_jobs, genomes_dir, jobs, progress_bar=progress
genome_jobs, genomes_dir, cache_dir, jobs, progress_bar=progress
)
new_genomes_processed = processed_genomes.union(set(finished_jobs))
processed_genomes_txt = cache_dir / pathlib.Path(
Expand Down
57 changes: 35 additions & 22 deletions src/cirtap/mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,17 @@ def sync_single_dir(genomes_dir, genome_id, attempts=3, write_info=True):

break

# Specific handling of the '550 - No such file or directory'
# Specific handling of the '550 - Missing dir or permission denied'
# Breaks and returns the genome_id
except ftplib.error_perm as ftp_err:
if str(ftp_err).startswith("550"):
_logger.debug(
"Skipping {} - No remote dir found".format(genome_id)
)
_logger.debug("Skipping {} ( {} )".format(genome_id, ftp_err))
break

# Try to catch CTRL-C if user doesn't want to proceed
except KeyboardInterrupt:
_logger.debug("Ctrl+C signal detected")
_logger.debug(
_logger.error("Ctrl+C signal detected")
_logger.error(
"Removing directory that might contain corrupted files "
"at {}".format(local_dirpath)
)
Expand All @@ -210,19 +208,22 @@ def sync_single_dir(genomes_dir, genome_id, attempts=3, write_info=True):
# Capture text in the generic Exception class
except Exception as e:
if attempt == attempts:
_logger.debug("Failed syncing {}".format(genome_id))
_logger.debug(
_logger.error("Failed syncing {}".format(genome_id))
_logger.error(
"Removing directory that might contain corrupted files "
"at {}".format(local_dirpath)
)
shutil.rmtree(local_dirpath.resolve())
raise
else:
_logger.debug("Failed syncing {}".format(genome_id))
_logger.debug("Error was : {}".format(e))
_logger.warning(
"Attempt {} / {} for {} failed ( {} )".format(
attempt, attempts, genome_id, e
)
)
attempt += 1
_logger.debug(
"Sleeping for {} s before retrying".format(attempt * 60)
_logger.warning(
"Sleeping for {}s before retrying".format(attempt * 60)
)
time.sleep(attempt * 60)

Expand Down Expand Up @@ -253,21 +254,33 @@ def create_genome_jobs(genome_summary, genomes_dir, processed_genomes=None):


def mirror_genomes_dir(
all_genome_jobs, local_genomes_dir, procs=1, progress_bar=True
all_genome_jobs, local_genomes_dir, cache_dir, procs=1, progress_bar=True,
):

parallel_sync = partial(
sync_single_dir, local_genomes_dir, write_info=True, retries=3
sync_single_dir, local_genomes_dir, write_info=True, attempts=3
)

results = []
with mp.Pool(processes=procs) as pool:
if progress_bar:
pbar = tqdm(total=len(all_genome_jobs))
for res in pool.imap_unordered(parallel_sync, all_genome_jobs):
pbar.update()
results.append(res)
try:
with mp.Pool(processes=procs) as pool:
if progress_bar:
pbar = tqdm(total=len(all_genome_jobs))
for res in pool.imap_unordered(parallel_sync, all_genome_jobs):
pbar.update()
results.append(res)
else:
for res in pool.imap_unordered(parallel_sync, all_genome_jobs):
results.append(res)
except Exception:
_logger.error("An error occured. Writing processed_genomes to .cache")
processed_genomes_txt = cache_dir / pathlib.Path("processed_genomes.txt")
if processed_genomes_txt.exists():
open_mode = 'a'
else:
for res in pool.imap_unordered(parallel_sync, all_genome_jobs):
results.append(res)
open_mode = 'w'
with open(processed_genomes_txt, mode=open_mode) as fout:
for genome_id in results:
fout.write("{}\n".format(genome_id))
raise
return results

0 comments on commit ae2f2d8

Please sign in to comment.