Skip to content

Commit

Permalink
Augmentation of gfal-cp debuggin for StageOutImpl and GFAL2Impl scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
anpicci committed Oct 16, 2024
1 parent 33d304f commit eebab42
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 29 deletions.
109 changes: 89 additions & 20 deletions src/python/WMCore/Storage/Backends/GFAL2Impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def __init__(self, stagein=False):
# GFAL2 is not build under COMP environment and it had failures with mixed environment.
self.setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '%s'"
self.removeCommand = self.setups % '. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 %s '
self.copyCommand = self.setups % '. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p %(checksum)s %(options)s %(source)s %(destination)s'
self.copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure %(checksum)s %(options)s %(source)s %(destination)s'
self.copyCommand = self.setups % ('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + self.copyOpts)

Expand Down Expand Up @@ -79,21 +78,16 @@ def createRemoveFileCommand(self, pfn):
else:
return self.removeCommand % self.createFinalPFN(pfn)

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def buildCopyCommandDict(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
_createStageOutCommand_
Build a gfal-copy command
Build the gfal-cp command for stageOut
gfal-copy options used:
-t maximum time for the operation to terminate
-T global timeout for the transfer operation
-p if the destination directory does not exist, create it
-K checksum algorithm to use, or algorithm:value
-v enable the verbose mode (-v for warning level)
--abort-on-failure abort the whole copy as soon as one failure is encountered
:sourcePFN: str, PFN of the source file
:targetPFN: str, destination PFN
:options: str, additional options for gfal-cp
:checksums: dict, collect checksums according to the algorithms saved as keys
"""
result = "#!/bin/bash\n"


copyCommandDict = {'checksum': '', 'options': '', 'source': '', 'destination': ''}

useChecksum = (checksums is not None and 'adler32' in checksums and not self.stageIn)
Expand All @@ -113,27 +107,102 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No
copyCommandDict['checksum'] = "-K adler32"

copyCommandDict['options'] = ' '.join(unknown)

copyCommandDict['source'] = self.createFinalPFN(sourcePFN)
copyCommandDict['destination'] = self.createFinalPFN(targetPFN)

return copyCommandDict

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
Create gfal-cp command for stageOut
:sourcePFN: str, PFN of the source file
:targetPFN: str, destination PFN
:options: str, additional options for gfal-cp
:checksums: dict, collect checksums according to the algorithms saved as keys
"""


copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums)
copyCommand = self.copyCommand % copyCommandDict
result += copyCommand
result = "#!/bin/bash\n" + copyCommand

if _CheckExitCodeOption:
result += """
EXIT_STATUS=$?
echo "gfal-copy exit status: $EXIT_STATUS"
if [[ $EXIT_STATUS != 0 ]]; then
echo "ERROR: gfal-copy exited with $EXIT_STATUS"
echo "Cleaning up failed file:"
%s
echo "ERROR: gfal-copy exited with $EXIT_STATUS"
echo "Cleaning up failed file:"
{remove_command}
fi
exit $EXIT_STATUS
""" % self.createRemoveFileCommand(targetPFN)

""".format(
remove_command=self.createRemoveFileCommand(targetPFN)
)

return result

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
Debug a failed gfal-cp command for stageOut, without re-running it,
providing information on the environment and the certifications
:sourcePFN: str, PFN of the source file
:targetPFN: str, destination PFN
:options: str, additional options for gfal-cp
:checksums: dict, collect checksums according to the algorithms saved as keys
"""

copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums)
copyCommand = self.copyCommand % copyCommandDict

result = "#!/bin/bash\n"
result += """
echo
echo
echo "-----------------------------------------------------------"
echo "==========================================================="
echo
echo "Debugging information on failing gfal-copy command"
echo
echo "Current date and time: $(date +"%Y-%m-%d %H:%M:%S")"
echo "gfal-copy command which failed: {copy_command}"
echo "Hostname: $(hostname -f)"
echo "OS: $(uname -r -s)"
echo
echo "GFAL environment variables:"
env | grep ^GFAL
echo
echo "PYTHON environment variables:"
env | grep ^PYTHON
echo
echo "LD_* environment variables:"
env | grep ^LD_
echo
echo "gfal-copy location: $(which gfal-copy)"
echo "Source PFN: {source}"
echo "Target PFN: {destination}"
echo
echo
echo "Information for credentials in the environment"
echo "Bearer token content: $BEARER_TOKEN"
echo "Bearer token file: $BEARER_TOKEN_FILE"
echo "httokendecode path: $(which httokendecode)"
echo "httokendecode: $httokendecode"
echo
echo "VOMS proxy info:"
voms-proxy-info -all
echo "==========================================================="
echo "-----------------------------------------------------------"
echo
""".format(
copy_command=copyCommand,
source=copyCommandDict['source'],
destination=copyCommandDict['destination'],
)
return result

def removeFile(self, pfnToRemove):
"""
_removeFile_
Expand Down
30 changes: 21 additions & 9 deletions src/python/WMCore/Storage/StageOutImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def executeCommand(self, command):
msg = "Command exited with status: %s\nOutput message: %s" % (exitCode, output)
logging.info(msg)
except Exception as ex:
raise StageOutError(str(ex), Command=command, ExitCode=60311)
raise StageOutError(str(ex), Command=command, ExitCode=60311) from ex

if exitCode:
msg = "Command exited non-zero, ExitCode:%s\nOutput: %s " % (exitCode, output)
Expand Down Expand Up @@ -131,7 +131,6 @@ def createOutputDirectory(self, targetPFN):
If no directory is required, do not implement this method
"""
pass

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
Expand All @@ -143,6 +142,13 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No
"""
raise NotImplementedError("StageOutImpl.createStageOutCommand")

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
Build a shell command that will report in the logs the details about
failing stageOut commands
"""
raise NotImplementedError("StageOutImpl.createDebuggingCommand")

def removeFile(self, pfnToRemove):
"""
_removeFile_
Expand Down Expand Up @@ -183,7 +189,7 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None):
# destination may also need PFN changed
# i.e. if we are staging in a file from an SE
targetPFN = self.createTargetName(protocol, targetPFN)

self.numRetries = 0
# //
# // Create the output directory if implemented
# //
Expand Down Expand Up @@ -212,22 +218,28 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None):
# // Run the command
# //

stageOutEx = None # variable to store the possible StageOutError
for retryCount in range(self.numRetries + 1):
try:
logging.info("Running the stage out...")
self.executeCommand(command)
break
break # This line won't be reached due to the raised error
except StageOutError as ex:
msg = "Attempt %s to stage out failed.\n" % retryCount
msg += "Automatically retrying in %s secs\n " % self.retryPause
msg += "Error details:\n%s\n" % str(ex)
logging.error(msg)
if retryCount == self.numRetries:
# //
# // last retry, propagate exception
# //
raise ex
# Last retry, propagate the information outside of the for loop
stageOutEx = ex
msg += "Automatically retrying in %s secs\n " % self.retryPause
time.sleep(self.retryPause)

# This block will now always be executed after retries are exhausted
if stageOutEx is not None:
logging.error("Maximum number of retries exhausted. Further details on the failed command reported below.")
command = self.createDebuggingCommand(sourcePFN, targetPFN, options, checksums)
self.executeCommand(command)
raise stageOutEx from None

# should never reach this point
return

0 comments on commit eebab42

Please sign in to comment.