diff --git a/py/desispec/io/util.py b/py/desispec/io/util.py index c9857fadc..320ea8bac 100644 --- a/py/desispec/io/util.py +++ b/py/desispec/io/util.py @@ -1134,6 +1134,8 @@ def backup_filename(filename): Returns: New filename.N, or filename if original file didn't already exist + + if filename=='/dev/null' or filename doesn't exist, just return filename """ if filename == '/dev/null' or not os.path.exists(filename): return filename diff --git a/py/desispec/parallel.py b/py/desispec/parallel.py index 7a5083391..ad5e97692 100644 --- a/py/desispec/parallel.py +++ b/py/desispec/parallel.py @@ -10,6 +10,7 @@ import os import sys import time +import glob import io from contextlib import contextmanager import logging @@ -380,6 +381,13 @@ def stdouterr_redirected(to=None, comm=None, overwrite=False): # The DESI loggers. desi_loggers = desiutil.log._desiutil_log_root + def _rank_filename(base, rank): + """ + Return standard name for output file of individual rank. + Rank can be a wildcard to generate a glob string. + """ + return f"{base}-rank{rank}" + def _redirect(out_to, err_to): # Flush the C-level buffers @@ -424,6 +432,29 @@ def _redirect(out_to, err_to): ch.setFormatter(formatter) logger.addHandler(ch) + def _combine_individual_outputs(to, nproc=None): + """ + Combine individual {to}_{n} files into a single {to} file + + If nproc is specified, assume there are exactly that many per-rank + files; otherwise glob to find out what is there. + """ + if nproc is None: + individual_files = sorted(glob.glob(_rank_filename(to, '*'))) + else: + individual_files = [_rank_filename(to, p) for p in range(nproc)] + + with open(to, "w") as outfile: + for p, fname in enumerate(individual_files): + outfile.write("================ Start of Process {} ================\n".format(p)) + with open(fname) as infile: + outfile.write(infile.read()) + outfile.write("================= End of Process {} =================\n\n".format(p)) + + # only remove input files after successfully finishing merging + for fname in individual_files: + os.remove(fname) + # redirect both stdout and stderr to the same file if to is None: @@ -432,22 +463,30 @@ def _redirect(out_to, err_to): if rank == 0: log = get_logger() log.info("Begin log redirection to {} at {}".format(to, time.asctime())) - if not overwrite: - backup_filename(to) - - #- all ranks wait for logfile backup - if comm is not None: - comm.barrier() # Save the original file descriptors so we can restore them later saved_fd_out = os.dup(fd_out) saved_fd_err = os.dup(fd_err) - try: - pto = to - if to != "/dev/null": - pto = "{}_{}".format(to, rank) + # Determine individual per-rank output filenames and check if there are + # leftover per-rank files from a previous crash that need to be merged + # before proceeding. Leftovers might have come from a run with a different + # nproc, so don't enforce that here. + pto = to + if to != "/dev/null": + pto = _rank_filename(to, rank) + if rank == 0 and os.path.exists(pto): + _combine_individual_outputs(to) + + # backup previous output file if needed + if rank == 0 and not overwrite: + backup_filename(to) + + # all ranks wait for logfile backup + if comm is not None: + comm.barrier() + try: # open python file, which creates low-level POSIX file # descriptor. file = open(pto, "w") @@ -475,14 +514,7 @@ def _redirect(out_to, err_to): # concatenate per-process files if rank == 0 and to != "/dev/null": - with open(to, "w") as outfile: - for p in range(nproc): - outfile.write("================ Start of Process {} ================\n".format(p)) - fname = "{}_{}".format(to, p) - with open(fname) as infile: - outfile.write(infile.read()) - outfile.write("================= End of Process {} =================\n\n".format(p)) - os.remove(fname) + _combine_individual_outputs(to, nproc) if nproc > 1: comm.barrier()