From ae2f2d89e6c0b65ff1b04463524ecb73833da572 Mon Sep 17 00:00:00 2001 From: papanikos Date: Wed, 24 Nov 2021 11:04:02 +0100 Subject: [PATCH] write to cache, compact log messages --- src/cirtap/core.py | 30 +++++++++++------------ src/cirtap/mirror.py | 57 +++++++++++++++++++++++++++----------------- 2 files changed, 50 insertions(+), 37 deletions(-) diff --git a/src/cirtap/core.py b/src/cirtap/core.py index 844f64b..2185ce4 100644 --- a/src/cirtap/core.py +++ b/src/cirtap/core.py @@ -229,27 +229,27 @@ def mirror( except: _logger.debug("Failed to send email") - # Testing - # ten_targets = [ - # "100053.5", - # "100.11", - # "100053.4", - # "100.9", - # "1123738.3", - # "1000562.3", - # "100053.8", - # "469009.4", - # "1309411.5", - # "100053.6", - # ] - # genome_jobs = [job for job in genome_jobs if job in ten_targets] +# Testing +# ten_targets = [ +# "100053.5", +# "1006155.5", # Missing +# "2030827.47", # Permission denied +# "100.9", +# "1123738.3", +# "1000562.3", +# "100053.8", +# "469009.4", +# "1309411.5", +# "100053.6", +# ] +# genome_jobs = [job for job in genome_jobs if job in ten_targets] try: if ( len(genome_jobs) != 0 and check_genomes is True ) or force_check is True: finished_jobs = mirror_genomes_dir( - genome_jobs, genomes_dir, jobs, progress_bar=progress + genome_jobs, genomes_dir, cache_dir, jobs, progress_bar=progress ) new_genomes_processed = processed_genomes.union(set(finished_jobs)) processed_genomes_txt = cache_dir / pathlib.Path( diff --git a/src/cirtap/mirror.py b/src/cirtap/mirror.py index 69243e8..c3376bd 100644 --- a/src/cirtap/mirror.py +++ b/src/cirtap/mirror.py @@ -186,19 +186,17 @@ def sync_single_dir(genomes_dir, genome_id, attempts=3, write_info=True): break - # Specific handling of the '550 - No such file or directory' + # Specific handling of the '550 - Missing dir or permission denied' # Breaks and returns the genome_id except ftplib.error_perm as ftp_err: if str(ftp_err).startswith("550"): - _logger.debug( - "Skipping {} - No remote dir found".format(genome_id) - ) + _logger.debug("Skipping {} ( {} )".format(genome_id, ftp_err)) break # Try to catch CTRL-C if user doesn't want to proceed except KeyboardInterrupt: - _logger.debug("Ctrl+C signal detected") - _logger.debug( + _logger.error("Ctrl+C signal detected") + _logger.error( "Removing directory that might contain corrupted files " "at {}".format(local_dirpath) ) @@ -210,19 +208,22 @@ def sync_single_dir(genomes_dir, genome_id, attempts=3, write_info=True): # Capture text in the generic Exception class except Exception as e: if attempt == attempts: - _logger.debug("Failed syncing {}".format(genome_id)) - _logger.debug( + _logger.error("Failed syncing {}".format(genome_id)) + _logger.error( "Removing directory that might contain corrupted files " "at {}".format(local_dirpath) ) shutil.rmtree(local_dirpath.resolve()) raise else: - _logger.debug("Failed syncing {}".format(genome_id)) - _logger.debug("Error was : {}".format(e)) + _logger.warning( + "Attempt {} / {} for {} failed ( {} )".format( + attempt, attempts, genome_id, e + ) + ) attempt += 1 - _logger.debug( - "Sleeping for {} s before retrying".format(attempt * 60) + _logger.warning( + "Sleeping for {}s before retrying".format(attempt * 60) ) time.sleep(attempt * 60) @@ -253,21 +254,33 @@ def create_genome_jobs(genome_summary, genomes_dir, processed_genomes=None): def mirror_genomes_dir( - all_genome_jobs, local_genomes_dir, procs=1, progress_bar=True + all_genome_jobs, local_genomes_dir, cache_dir, procs=1, progress_bar=True, ): parallel_sync = partial( - sync_single_dir, local_genomes_dir, write_info=True, retries=3 + sync_single_dir, local_genomes_dir, write_info=True, attempts=3 ) results = [] - with mp.Pool(processes=procs) as pool: - if progress_bar: - pbar = tqdm(total=len(all_genome_jobs)) - for res in pool.imap_unordered(parallel_sync, all_genome_jobs): - pbar.update() - results.append(res) + try: + with mp.Pool(processes=procs) as pool: + if progress_bar: + pbar = tqdm(total=len(all_genome_jobs)) + for res in pool.imap_unordered(parallel_sync, all_genome_jobs): + pbar.update() + results.append(res) + else: + for res in pool.imap_unordered(parallel_sync, all_genome_jobs): + results.append(res) + except Exception: + _logger.error("An error occured. Writing processed_genomes to .cache") + processed_genomes_txt = cache_dir / pathlib.Path("processed_genomes.txt") + if processed_genomes_txt.exists(): + open_mode = 'a' else: - for res in pool.imap_unordered(parallel_sync, all_genome_jobs): - results.append(res) + open_mode = 'w' + with open(processed_genomes_txt, mode=open_mode) as fout: + for genome_id in results: + fout.write("{}\n".format(genome_id)) + raise return results