Skip to content

Commit

Permalink
cleanup redirected stdout crash before re-redirecting
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephen Bailey authored and Stephen Bailey committed Jul 26, 2024
1 parent 9269927 commit c443da5
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 18 deletions.
2 changes: 2 additions & 0 deletions py/desispec/io/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,8 @@ def backup_filename(filename):
Returns:
New filename.N, or filename if original file didn't already exist
if filename=='/dev/null' or filename doesn't exist, just return filename
"""
if filename == '/dev/null' or not os.path.exists(filename):
return filename
Expand Down
68 changes: 50 additions & 18 deletions py/desispec/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import sys
import time
import glob
import io
from contextlib import contextmanager
import logging
Expand Down Expand Up @@ -380,6 +381,13 @@ def stdouterr_redirected(to=None, comm=None, overwrite=False):
# The DESI loggers.
desi_loggers = desiutil.log._desiutil_log_root

def _rank_filename(base, rank):
"""
Return standard name for output file of individual rank.
Rank can be a wildcard to generate a glob string.
"""
return f"{base}-rank{rank}"

def _redirect(out_to, err_to):

# Flush the C-level buffers
Expand Down Expand Up @@ -424,6 +432,29 @@ def _redirect(out_to, err_to):
ch.setFormatter(formatter)
logger.addHandler(ch)

def _combine_individual_outputs(to, nproc=None):
"""
Combine individual {to}_{n} files into a single {to} file
If nproc is specified, assume there are exactly that many per-rank
files; otherwise glob to find out what is there.
"""
if nproc is None:
individual_files = sorted(glob.glob(_rank_filename(to, '*')))
else:
individual_files = [_rank_filename(to, p) for p in range(nproc)]

with open(to, "w") as outfile:
for p, fname in enumerate(individual_files):
outfile.write("================ Start of Process {} ================\n".format(p))
with open(fname) as infile:
outfile.write(infile.read())
outfile.write("================= End of Process {} =================\n\n".format(p))

# only remove input files after successfully finishing merging
for fname in individual_files:
os.remove(fname)

# redirect both stdout and stderr to the same file

if to is None:
Expand All @@ -432,22 +463,30 @@ def _redirect(out_to, err_to):
if rank == 0:
log = get_logger()
log.info("Begin log redirection to {} at {}".format(to, time.asctime()))
if not overwrite:
backup_filename(to)

#- all ranks wait for logfile backup
if comm is not None:
comm.barrier()

# Save the original file descriptors so we can restore them later
saved_fd_out = os.dup(fd_out)
saved_fd_err = os.dup(fd_err)

try:
pto = to
if to != "/dev/null":
pto = "{}_{}".format(to, rank)
# Determine individual per-rank output filenames and check if there are
# leftover per-rank files from a previous crash that need to be merged
# before proceeding. Leftovers might have come from a run with a different
# nproc, so don't enforce that here.
pto = to
if to != "/dev/null":
pto = _rank_filename(to, rank)
if rank == 0 and os.path.exists(pto):
_combine_individual_outputs(to)

# backup previous output file if needed
if rank == 0 and not overwrite:
backup_filename(to)

# all ranks wait for logfile backup
if comm is not None:
comm.barrier()

try:
# open python file, which creates low-level POSIX file
# descriptor.
file = open(pto, "w")
Expand Down Expand Up @@ -475,14 +514,7 @@ def _redirect(out_to, err_to):

# concatenate per-process files
if rank == 0 and to != "/dev/null":
with open(to, "w") as outfile:
for p in range(nproc):
outfile.write("================ Start of Process {} ================\n".format(p))
fname = "{}_{}".format(to, p)
with open(fname) as infile:
outfile.write(infile.read())
outfile.write("================= End of Process {} =================\n\n".format(p))
os.remove(fname)
_combine_individual_outputs(to, nproc)

if nproc > 1:
comm.barrier()
Expand Down

0 comments on commit c443da5

Please sign in to comment.