Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup redirected stdout crash before re-redirecting #2311

Merged
merged 2 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions py/desispec/io/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,8 @@ def backup_filename(filename):

Returns:
New filename.N, or filename if original file didn't already exist

if filename=='/dev/null' or filename doesn't exist, just return filename
"""
if filename == '/dev/null' or not os.path.exists(filename):
return filename
Expand Down
68 changes: 50 additions & 18 deletions py/desispec/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import sys
import time
import glob
import io
from contextlib import contextmanager
import logging
Expand Down Expand Up @@ -380,6 +381,13 @@ def stdouterr_redirected(to=None, comm=None, overwrite=False):
# The DESI loggers.
desi_loggers = desiutil.log._desiutil_log_root

def _rank_filename(base, rank):
sbailey marked this conversation as resolved.
Show resolved Hide resolved
"""
Return standard name for output file of individual rank.
Rank can be a wildcard to generate a glob string.
"""
return f"{base}-rank{rank}"

def _redirect(out_to, err_to):

# Flush the C-level buffers
Expand Down Expand Up @@ -424,6 +432,29 @@ def _redirect(out_to, err_to):
ch.setFormatter(formatter)
logger.addHandler(ch)

def _combine_individual_outputs(to, nproc=None):
"""
Combine individual {to}_{n} files into a single {to} file

If nproc is specified, assume there are exactly that many per-rank
files; otherwise glob to find out what is there.
"""
if nproc is None:
individual_files = sorted(glob.glob(_rank_filename(to, '*')))
else:
individual_files = [_rank_filename(to, p) for p in range(nproc)]

with open(to, "w") as outfile:
for p, fname in enumerate(individual_files):
outfile.write("================ Start of Process {} ================\n".format(p))
with open(fname) as infile:
sbailey marked this conversation as resolved.
Show resolved Hide resolved
outfile.write(infile.read())
outfile.write("================= End of Process {} =================\n\n".format(p))

# only remove input files after successfully finishing merging
for fname in individual_files:
os.remove(fname)

# redirect both stdout and stderr to the same file

if to is None:
Expand All @@ -432,22 +463,30 @@ def _redirect(out_to, err_to):
if rank == 0:
log = get_logger()
log.info("Begin log redirection to {} at {}".format(to, time.asctime()))
if not overwrite:
backup_filename(to)

#- all ranks wait for logfile backup
if comm is not None:
comm.barrier()

# Save the original file descriptors so we can restore them later
saved_fd_out = os.dup(fd_out)
saved_fd_err = os.dup(fd_err)

try:
pto = to
if to != "/dev/null":
pto = "{}_{}".format(to, rank)
# Determine individual per-rank output filenames and check if there are
# leftover per-rank files from a previous crash that need to be merged
# before proceeding. Leftovers might have come from a run with a different
# nproc, so don't enforce that here.
pto = to
if to != "/dev/null":
pto = _rank_filename(to, rank)
if rank == 0 and os.path.exists(pto):
_combine_individual_outputs(to)

# backup previous output file if needed
if rank == 0 and not overwrite:
backup_filename(to)

# all ranks wait for logfile backup
if comm is not None:
comm.barrier()

try:
# open python file, which creates low-level POSIX file
# descriptor.
file = open(pto, "w")
Expand Down Expand Up @@ -475,14 +514,7 @@ def _redirect(out_to, err_to):

# concatenate per-process files
if rank == 0 and to != "/dev/null":
with open(to, "w") as outfile:
for p in range(nproc):
outfile.write("================ Start of Process {} ================\n".format(p))
fname = "{}_{}".format(to, p)
with open(fname) as infile:
outfile.write(infile.read())
outfile.write("================= End of Process {} =================\n\n".format(p))
os.remove(fname)
_combine_individual_outputs(to, nproc)

if nproc > 1:
comm.barrier()
Expand Down
Loading