Skip to content

Commit

Permalink
Merge pull request #12081 from anpicci/devb_11731
Browse files Browse the repository at this point in the history
Addition of a new method to StageOutImpl to log details about failing gfal commands
  • Loading branch information
amaltaro authored Oct 17, 2024
2 parents 393357b + b8e502b commit 64c182f
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 68 deletions.
125 changes: 94 additions & 31 deletions src/python/WMCore/Storage/Backends/GFAL2Impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ def __init__(self, stagein=False):
# Next commands after separation are executed without env -i and this leads us with
# mixed environment with COMP and system python.
# GFAL2 is not build under COMP environment and it had failures with mixed environment.
self.setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '%s'"
self.removeCommand = self.setups % '. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 %s '
self.copyCommand = self.setups % '. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p %(checksum)s %(options)s %(source)s %(destination)s'
self.copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure %(checksum)s %(options)s %(source)s %(destination)s'
self.copyCommand = self.setups % ('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + self.copyOpts)
self.setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'"
self.removeCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}')
self.copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure {checksum} {options} {source} {destination}'
self.copyCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + self.copyOpts)

def createFinalPFN(self, pfn):
"""
Expand All @@ -38,9 +37,9 @@ def createFinalPFN(self, pfn):
if pfn.startswith('file:'):
return pfn
elif os.path.isfile(pfn):
return "file://%s" % os.path.abspath(pfn)
return "file://{}".format(os.path.abspath(pfn))
elif pfn.startswith('/'):
return "file://%s" % os.path.abspath(pfn)
return "file://{}".format(os.path.abspath(pfn))
return pfn

def createSourceName(self, protocol, pfn):
Expand Down Expand Up @@ -75,24 +74,20 @@ def createRemoveFileCommand(self, pfn):
Command is interrupted if time expires before it finishes
"""
if os.path.isfile(pfn):
return "/bin/rm -f %s" % os.path.abspath(pfn)
return "/bin/rm -f {}".format(os.path.abspath(pfn))
else:
return self.removeCommand % self.createFinalPFN(pfn)
return self.removeCommand.format(self.createFinalPFN(pfn))

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):

def buildCopyCommandDict(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
_createStageOutCommand_
Build a gfal-copy command
Build the gfal-cp command for stageOut
gfal-copy options used:
-t maximum time for the operation to terminate
-T global timeout for the transfer operation
-p if the destination directory does not exist, create it
-K checksum algorithm to use, or algorithm:value
-v enable the verbose mode (-v for warning level)
--abort-on-failure abort the whole copy as soon as one failure is encountered
:sourcePFN: str, PFN of the source file
:targetPFN: str, destination PFN
:options: str, additional options for gfal-cp
:checksums: dict, collect checksums according to the algorithms saved as keys
"""
result = "#!/bin/bash\n"

copyCommandDict = {'checksum': '', 'options': '', 'source': '', 'destination': ''}

Expand All @@ -107,31 +102,99 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No

if not args.nochecksum:
if useChecksum:
checksums['adler32'] = "%08x" % int(checksums['adler32'], 16)
copyCommandDict['checksum'] = "-K adler32:%s" % checksums['adler32']
checksums['adler32'] = "{:08x}".format(int(checksums['adler32'], 16))
copyCommandDict['checksum'] = "-K adler32:{}".format(checksums['adler32'])
else:
copyCommandDict['checksum'] = "-K adler32"

copyCommandDict['options'] = ' '.join(unknown)

copyCommandDict['source'] = self.createFinalPFN(sourcePFN)
copyCommandDict['destination'] = self.createFinalPFN(targetPFN)

copyCommand = self.copyCommand % copyCommandDict
result += copyCommand
return copyCommandDict

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
Create gfal-cp command for stageOut
:sourcePFN: str, PFN of the source file
:targetPFN: str, destination PFN
:options: str, additional options for gfal-cp
:checksums: dict, collect checksums according to the algorithms saved as keys
"""

copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums)
copyCommand = self.copyCommand.format_map(copyCommandDict)
result = "#!/bin/bash\n" + copyCommand

if _CheckExitCodeOption:
result += """
EXIT_STATUS=$?
echo "gfal-copy exit status: $EXIT_STATUS"
if [[ $EXIT_STATUS != 0 ]]; then
echo "ERROR: gfal-copy exited with $EXIT_STATUS"
echo "Cleaning up failed file:"
%s
echo "ERROR: gfal-copy exited with $EXIT_STATUS"
echo "Cleaning up failed file:"
{remove_command}
fi
exit $EXIT_STATUS
""" % self.createRemoveFileCommand(targetPFN)
""".format(remove_command=self.createRemoveFileCommand(targetPFN))

return result

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
Debug a failed gfal-cp command for stageOut, without re-running it,
providing information on the environment and the certifications
:sourcePFN: str, PFN of the source file
:targetPFN: str, destination PFN
:options: str, additional options for gfal-cp
:checksums: dict, collect checksums according to the algorithms saved as keys
"""

copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums)
copyCommand = self.copyCommand.format_map(copyCommandDict)

result = "#!/bin/bash\n"
result += """
echo
echo
echo "-----------------------------------------------------------"
echo "==========================================================="
echo
echo "Debugging information on failing gfal-copy command"
echo
echo "Current date and time: $(date +"%Y-%m-%d %H:%M:%S")"
echo "gfal-copy command which failed: {copy_command}"
echo "Hostname: $(hostname -f)"
echo "OS: $(uname -r -s)"
echo
echo "GFAL environment variables:"
env | grep ^GFAL
echo
echo "PYTHON environment variables:"
env | grep ^PYTHON
echo
echo "LD_* environment variables:"
env | grep ^LD_
echo
echo "gfal-copy location: $(which gfal-copy)"
echo "Source PFN: {source}"
echo "Target PFN: {destination}"
echo
echo
echo "Information for credentials in the environment"
echo "Bearer token content: $BEARER_TOKEN"
echo "Bearer token file: $BEARER_TOKEN_FILE"
echo "httokendecode path: $(which httokendecode)"
echo "httokendecode: $httokendecode"
echo
echo "VOMS proxy info:"
voms-proxy-info -all
echo "==========================================================="
echo "-----------------------------------------------------------"
echo
""".format(copy_command=copyCommand, source=copyCommandDict['source'], destination=copyCommandDict['destination'])
return result

def removeFile(self, pfnToRemove):
Expand All @@ -140,9 +203,9 @@ def removeFile(self, pfnToRemove):
CleanUp pfn provided
"""
if os.path.isfile(pfnToRemove):
command = "/bin/rm -f %s" % os.path.abspath(pfnToRemove)
command = "/bin/rm -f {}".format(os.path.abspath(pfnToRemove))
else:
command = self.removeCommand % self.createFinalPFN(pfnToRemove)
command = self.removeCommand.format(self.createFinalPFN(pfnToRemove))
self.executeCommand(command)


Expand Down
58 changes: 33 additions & 25 deletions src/python/WMCore/Storage/StageOutImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"""
from __future__ import print_function

from builtins import range, object
from builtins import range

import logging
import os
Expand All @@ -18,7 +18,7 @@
from WMCore.Storage.StageOutError import StageOutError


class StageOutImpl(object):
class StageOutImpl:
"""
_StageOutImpl_
Expand Down Expand Up @@ -48,12 +48,12 @@ def splitPFN(pfn):
"""
protocol = pfn.split(':')[0]
host = pfn.split('/')[2]
thisList = pfn.replace('%s://%s/' % (protocol, host), '').split('?')
thisList = pfn.replace('{}://{}/'.format(protocol, host), '').split('?')
path = thisList[0]
opaque = ""
# If we have any opaque info keep it
if len(thisList) == 2:
opaque = "?%s" % thisList[1]
opaque = "?{}".format(thisList[1])

# check for the path to actually be in the opaque information
if opaque.startswith("?path="):
Expand Down Expand Up @@ -85,16 +85,16 @@ def executeCommand(self, command):
"""
try:
exitCode, output = runCommandWithOutput(command)
msg = "Command exited with status: %s\nOutput message: %s" % (exitCode, output)
msg = "Command exited with status: {}\nOutput message: {}".format(exitCode, output)
logging.info(msg)
except Exception as ex:
raise StageOutError(str(ex), Command=command, ExitCode=60311)
raise StageOutError(str(ex), Command=command, ExitCode=60311) from ex

if exitCode:
msg = "Command exited non-zero, ExitCode:%s\nOutput: %s " % (exitCode, output)
logging.error("Exception During Stage Out:\n%s", msg)
msg = "Command exited non-zero, ExitCode: {}\nOutput: {}".format(exitCode, output)
formatted_msg = "Exception During Stage Out:\n{}".format(msg)
logging.error(formatted_msg)
raise StageOutError(msg, Command=command, ExitCode=exitCode)
return

def createSourceName(self, protocol, pfn):
"""
Expand Down Expand Up @@ -131,7 +131,6 @@ def createOutputDirectory(self, targetPFN):
If no directory is required, do not implement this method
"""
pass

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
Expand All @@ -143,6 +142,13 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No
"""
raise NotImplementedError("StageOutImpl.createStageOutCommand")

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
Build a shell command that will report in the logs the details about
failing stageOut commands
"""
raise NotImplementedError("StageOutImpl.createDebuggingCommand")

def removeFile(self, pfnToRemove):
"""
_removeFile_
Expand All @@ -160,9 +166,9 @@ def createRemoveFileCommand(self, pfn):
return the command to delete a file after a failed copy
"""
if pfn.startswith("/"):
return "/bin/rm -f %s" % pfn
return "/bin/rm -f {}".format(pfn)
elif os.path.isfile(pfn):
return "/bin/rm -f %s" % os.path.abspath(pfn)
return "/bin/rm -f {}".format(os.path.abspath(pfn))
else:
return ""

Expand All @@ -183,7 +189,6 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None):
# destination may also need PFN changed
# i.e. if we are staging in a file from an SE
targetPFN = self.createTargetName(protocol, targetPFN)

# //
# // Create the output directory if implemented
# //
Expand All @@ -193,9 +198,9 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None):
self.createOutputDirectory(targetPFN)
break
except StageOutError as ex:
msg = "Attempt %s to create a directory for stageout failed.\n" % retryCount
msg += "Automatically retrying in %s secs\n " % self.retryPause
msg += "Error details:\n%s\n" % str(ex)
msg = "Attempt {} to create a directory for stageout failed.\n".format(retryCount)
msg += "Automatically retrying stage out in {} secs\n ".format(self.retryPause)
msg += "Error details:\n{}\n".format(str(ex))
logging.error(msg)
if retryCount == self.numRetries:
# //
Expand All @@ -212,22 +217,25 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None):
# // Run the command
# //

stageOutEx = None # variable to store the possible StageOutError
for retryCount in range(self.numRetries + 1):
try:
logging.info("Running the stage out...")
self.executeCommand(command)
break
except StageOutError as ex:
msg = "Attempt %s to stage out failed.\n" % retryCount
msg += "Automatically retrying in %s secs\n " % self.retryPause
msg += "Error details:\n%s\n" % str(ex)
msg = "Attempt {} to stage out failed.\n".format(retryCount)
msg += "Error details:\n{}\n".format(str(ex))
logging.error(msg)
if retryCount == self.numRetries:
# //
# // last retry, propagate exception
# //
raise ex
# Last retry, propagate the information outside of the for loop
stageOutEx = ex
msg += "Automatically retrying in {} secs\n ".format(self.retryPause)
time.sleep(self.retryPause)

# should never reach this point
return
# This block will now always be executed after retries are exhausted
if stageOutEx is not None:
logging.error("Maximum number of retries exhausted. Further details on the failed command reported below.")
command = self.createDebuggingCommand(sourcePFN, targetPFN, options, checksums)
self.executeCommand(command)
raise stageOutEx from None
24 changes: 12 additions & 12 deletions test/python/WMCore_t/Storage_t/Backends_t/GFAL2Impl_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
class GFAL2ImplTest(unittest.TestCase):
def setUp(self):
self.GFAL2Impl = GFAL2Impl()
self.removeCommand = self.GFAL2Impl.removeCommand = "removeCommand %s"
self.copyCommand = self.GFAL2Impl.copyCommand = "copyCommand %(checksum)s %(options)s %(source)s %(destination)s"
self.removeCommand = self.GFAL2Impl.removeCommand = "removeCommand {}"
self.copyCommand = self.GFAL2Impl.copyCommand = "copyCommand {checksum} {options} {source} {destination}"

def testInit(self):
testGFAL2Impl = GFAL2Impl()
removeCommand = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c " \
"'. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 %s '"
"'. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}'"
copyCommand = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '" \
". $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p " \
"-v --abort-on-failure %(checksum)s %(options)s %(source)s %(destination)s'"
". $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p " \
"-v --abort-on-failure {checksum} {options} {source} {destination}'"
self.assertEqual(removeCommand, testGFAL2Impl.removeCommand)
self.assertEqual(copyCommand, testGFAL2Impl.copyCommand)

Expand Down Expand Up @@ -105,22 +105,22 @@ def getCopyCommandDict(self, checksum, options, source, destination):
def getStageOutCommandResult(self, copyCommandDict, createRemoveFileCommandResult):
result = "#!/bin/bash\n"

copyCommand = self.copyCommand % copyCommandDict
copyCommand = self.copyCommand.format_map(copyCommandDict)
result += copyCommand

result += """
EXIT_STATUS=$?
echo "gfal-copy exit status: $EXIT_STATUS"
if [[ $EXIT_STATUS != 0 ]]; then
echo "ERROR: gfal-copy exited with $EXIT_STATUS"
echo "Cleaning up failed file:"
%s
echo "ERROR: gfal-copy exited with $EXIT_STATUS"
echo "Cleaning up failed file:"
{remove_command}
fi
exit $EXIT_STATUS
""" % createRemoveFileCommandResult

""".format(remove_command=createRemoveFileCommandResult)
return result

@mock.patch('WMCore.Storage.Backends.GFAL2Impl.os.path')
@mock.patch('WMCore.Storage.StageOutImpl.StageOutImpl.executeCommand')
def testRemoveFile_isFile(self, mock_executeCommand, mock_path):
Expand Down

0 comments on commit 64c182f

Please sign in to comment.