Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addition of a new method to StageOutImpl to log details about failing gfal commands #12081

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 94 additions & 31 deletions src/python/WMCore/Storage/Backends/GFAL2Impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ def __init__(self, stagein=False):
# Next commands after separation are executed without env -i and this leads us with
# mixed environment with COMP and system python.
# GFAL2 is not build under COMP environment and it had failures with mixed environment.
self.setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '%s'"
self.removeCommand = self.setups % '. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 %s '
self.copyCommand = self.setups % '. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p %(checksum)s %(options)s %(source)s %(destination)s'
anpicci marked this conversation as resolved.
Show resolved Hide resolved
self.copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure %(checksum)s %(options)s %(source)s %(destination)s'
self.copyCommand = self.setups % ('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + self.copyOpts)
self.setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'"
self.removeCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}')
self.copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure {checksum} {options} {source} {destination}'
self.copyCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + self.copyOpts)

def createFinalPFN(self, pfn):
"""
Expand All @@ -38,9 +37,9 @@ def createFinalPFN(self, pfn):
if pfn.startswith('file:'):
return pfn
elif os.path.isfile(pfn):
return "file://%s" % os.path.abspath(pfn)
return "file://{}".format(os.path.abspath(pfn))
elif pfn.startswith('/'):
return "file://%s" % os.path.abspath(pfn)
return "file://{}".format(os.path.abspath(pfn))
return pfn

def createSourceName(self, protocol, pfn):
Expand Down Expand Up @@ -75,24 +74,20 @@ def createRemoveFileCommand(self, pfn):
Command is interrupted if time expires before it finishes
"""
if os.path.isfile(pfn):
return "/bin/rm -f %s" % os.path.abspath(pfn)
return "/bin/rm -f {}".format(os.path.abspath(pfn))
else:
return self.removeCommand % self.createFinalPFN(pfn)
return self.removeCommand.format(self.createFinalPFN(pfn))

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):

def buildCopyCommandDict(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
_createStageOutCommand_
Build a gfal-copy command
Build the gfal-cp command for stageOut

gfal-copy options used:
-t maximum time for the operation to terminate
-T global timeout for the transfer operation
-p if the destination directory does not exist, create it
-K checksum algorithm to use, or algorithm:value
-v enable the verbose mode (-v for warning level)
--abort-on-failure abort the whole copy as soon as one failure is encountered
:sourcePFN: str, PFN of the source file
:targetPFN: str, destination PFN
:options: str, additional options for gfal-cp
:checksums: dict, collect checksums according to the algorithms saved as keys
"""
result = "#!/bin/bash\n"

copyCommandDict = {'checksum': '', 'options': '', 'source': '', 'destination': ''}

Expand All @@ -107,31 +102,99 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No

if not args.nochecksum:
if useChecksum:
checksums['adler32'] = "%08x" % int(checksums['adler32'], 16)
copyCommandDict['checksum'] = "-K adler32:%s" % checksums['adler32']
checksums['adler32'] = "{:08x}".format(int(checksums['adler32'], 16))
copyCommandDict['checksum'] = "-K adler32:{}".format(checksums['adler32'])
else:
copyCommandDict['checksum'] = "-K adler32"

copyCommandDict['options'] = ' '.join(unknown)

copyCommandDict['source'] = self.createFinalPFN(sourcePFN)
copyCommandDict['destination'] = self.createFinalPFN(targetPFN)

copyCommand = self.copyCommand % copyCommandDict
result += copyCommand
return copyCommandDict

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
Create gfal-cp command for stageOut

:sourcePFN: str, PFN of the source file
:targetPFN: str, destination PFN
:options: str, additional options for gfal-cp
:checksums: dict, collect checksums according to the algorithms saved as keys
"""

copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums)
copyCommand = self.copyCommand.format_map(copyCommandDict)
result = "#!/bin/bash\n" + copyCommand

if _CheckExitCodeOption:
result += """
EXIT_STATUS=$?
echo "gfal-copy exit status: $EXIT_STATUS"
if [[ $EXIT_STATUS != 0 ]]; then
echo "ERROR: gfal-copy exited with $EXIT_STATUS"
echo "Cleaning up failed file:"
%s
echo "ERROR: gfal-copy exited with $EXIT_STATUS"
echo "Cleaning up failed file:"
{remove_command}
fi
exit $EXIT_STATUS
""" % self.createRemoveFileCommand(targetPFN)
""".format(remove_command=self.createRemoveFileCommand(targetPFN))

return result

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
Debug a failed gfal-cp command for stageOut, without re-running it,
providing information on the environment and the certifications

:sourcePFN: str, PFN of the source file
:targetPFN: str, destination PFN
:options: str, additional options for gfal-cp
:checksums: dict, collect checksums according to the algorithms saved as keys
"""

copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums)
copyCommand = self.copyCommand.format_map(copyCommandDict)

result = "#!/bin/bash\n"
result += """
anpicci marked this conversation as resolved.
Show resolved Hide resolved
echo
echo
echo "-----------------------------------------------------------"
echo "==========================================================="
echo
echo "Debugging information on failing gfal-copy command"
echo
echo "Current date and time: $(date +"%Y-%m-%d %H:%M:%S")"
echo "gfal-copy command which failed: {copy_command}"
echo "Hostname: $(hostname -f)"
echo "OS: $(uname -r -s)"
echo
echo "GFAL environment variables:"
env | grep ^GFAL
echo
echo "PYTHON environment variables:"
env | grep ^PYTHON
echo
echo "LD_* environment variables:"
env | grep ^LD_
echo
echo "gfal-copy location: $(which gfal-copy)"
echo "Source PFN: {source}"
echo "Target PFN: {destination}"
echo
echo
echo "Information for credentials in the environment"
echo "Bearer token content: $BEARER_TOKEN"
echo "Bearer token file: $BEARER_TOKEN_FILE"
echo "httokendecode path: $(which httokendecode)"
echo "httokendecode: $httokendecode"
echo
echo "VOMS proxy info:"
voms-proxy-info -all
echo "==========================================================="
echo "-----------------------------------------------------------"
echo
""".format(copy_command=copyCommand, source=copyCommandDict['source'], destination=copyCommandDict['destination'])
return result

def removeFile(self, pfnToRemove):
Expand All @@ -140,9 +203,9 @@ def removeFile(self, pfnToRemove):
CleanUp pfn provided
"""
if os.path.isfile(pfnToRemove):
command = "/bin/rm -f %s" % os.path.abspath(pfnToRemove)
command = "/bin/rm -f {}".format(os.path.abspath(pfnToRemove))
else:
command = self.removeCommand % self.createFinalPFN(pfnToRemove)
command = self.removeCommand.format(self.createFinalPFN(pfnToRemove))
self.executeCommand(command)


Expand Down
58 changes: 33 additions & 25 deletions src/python/WMCore/Storage/StageOutImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"""
from __future__ import print_function

from builtins import range, object
from builtins import range

import logging
import os
Expand All @@ -18,7 +18,7 @@
from WMCore.Storage.StageOutError import StageOutError


class StageOutImpl(object):
class StageOutImpl:
"""
_StageOutImpl_

Expand Down Expand Up @@ -48,12 +48,12 @@ def splitPFN(pfn):
"""
protocol = pfn.split(':')[0]
host = pfn.split('/')[2]
thisList = pfn.replace('%s://%s/' % (protocol, host), '').split('?')
thisList = pfn.replace('{}://{}/'.format(protocol, host), '').split('?')
path = thisList[0]
opaque = ""
# If we have any opaque info keep it
if len(thisList) == 2:
opaque = "?%s" % thisList[1]
opaque = "?{}".format(thisList[1])

# check for the path to actually be in the opaque information
if opaque.startswith("?path="):
Expand Down Expand Up @@ -85,16 +85,16 @@ def executeCommand(self, command):
"""
try:
exitCode, output = runCommandWithOutput(command)
msg = "Command exited with status: %s\nOutput message: %s" % (exitCode, output)
msg = "Command exited with status: {}\nOutput message: {}".format(exitCode, output)
logging.info(msg)
except Exception as ex:
raise StageOutError(str(ex), Command=command, ExitCode=60311)
raise StageOutError(str(ex), Command=command, ExitCode=60311) from ex

if exitCode:
msg = "Command exited non-zero, ExitCode:%s\nOutput: %s " % (exitCode, output)
logging.error("Exception During Stage Out:\n%s", msg)
msg = "Command exited non-zero, ExitCode: {}\nOutput: {}".format(exitCode, output)
formatted_msg = "Exception During Stage Out:\n{}".format(msg)
logging.error(formatted_msg)
raise StageOutError(msg, Command=command, ExitCode=exitCode)
return

def createSourceName(self, protocol, pfn):
"""
Expand Down Expand Up @@ -131,7 +131,6 @@ def createOutputDirectory(self, targetPFN):

If no directory is required, do not implement this method
"""
pass

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
Expand All @@ -143,6 +142,13 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No
"""
raise NotImplementedError("StageOutImpl.createStageOutCommand")

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
Build a shell command that will report in the logs the details about
failing stageOut commands
"""
raise NotImplementedError("StageOutImpl.createDebuggingCommand")
anpicci marked this conversation as resolved.
Show resolved Hide resolved

def removeFile(self, pfnToRemove):
"""
_removeFile_
Expand All @@ -160,9 +166,9 @@ def createRemoveFileCommand(self, pfn):
return the command to delete a file after a failed copy
"""
if pfn.startswith("/"):
return "/bin/rm -f %s" % pfn
return "/bin/rm -f {}".format(pfn)
elif os.path.isfile(pfn):
return "/bin/rm -f %s" % os.path.abspath(pfn)
return "/bin/rm -f {}".format(os.path.abspath(pfn))
else:
return ""

Expand All @@ -183,7 +189,6 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None):
# destination may also need PFN changed
# i.e. if we are staging in a file from an SE
targetPFN = self.createTargetName(protocol, targetPFN)

# //
# // Create the output directory if implemented
# //
Expand All @@ -193,9 +198,9 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None):
self.createOutputDirectory(targetPFN)
break
except StageOutError as ex:
msg = "Attempt %s to create a directory for stageout failed.\n" % retryCount
msg += "Automatically retrying in %s secs\n " % self.retryPause
msg += "Error details:\n%s\n" % str(ex)
msg = "Attempt {} to create a directory for stageout failed.\n".format(retryCount)
msg += "Automatically retrying stage out in {} secs\n ".format(self.retryPause)
msg += "Error details:\n{}\n".format(str(ex))
logging.error(msg)
if retryCount == self.numRetries:
# //
Expand All @@ -212,22 +217,25 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None):
# // Run the command
# //

stageOutEx = None # variable to store the possible StageOutError
for retryCount in range(self.numRetries + 1):
try:
logging.info("Running the stage out...")
self.executeCommand(command)
break
except StageOutError as ex:
msg = "Attempt %s to stage out failed.\n" % retryCount
msg += "Automatically retrying in %s secs\n " % self.retryPause
msg += "Error details:\n%s\n" % str(ex)
msg = "Attempt {} to stage out failed.\n".format(retryCount)
msg += "Error details:\n{}\n".format(str(ex))
logging.error(msg)
if retryCount == self.numRetries:
# //
# // last retry, propagate exception
# //
raise ex
# Last retry, propagate the information outside of the for loop
stageOutEx = ex
msg += "Automatically retrying in {} secs\n ".format(self.retryPause)
time.sleep(self.retryPause)

# should never reach this point
return
# This block will now always be executed after retries are exhausted
if stageOutEx is not None:
anpicci marked this conversation as resolved.
Show resolved Hide resolved
logging.error("Maximum number of retries exhausted. Further details on the failed command reported below.")
command = self.createDebuggingCommand(sourcePFN, targetPFN, options, checksums)
self.executeCommand(command)
raise stageOutEx from None
24 changes: 12 additions & 12 deletions test/python/WMCore_t/Storage_t/Backends_t/GFAL2Impl_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
class GFAL2ImplTest(unittest.TestCase):
def setUp(self):
self.GFAL2Impl = GFAL2Impl()
self.removeCommand = self.GFAL2Impl.removeCommand = "removeCommand %s"
self.copyCommand = self.GFAL2Impl.copyCommand = "copyCommand %(checksum)s %(options)s %(source)s %(destination)s"
self.removeCommand = self.GFAL2Impl.removeCommand = "removeCommand {}"
self.copyCommand = self.GFAL2Impl.copyCommand = "copyCommand {checksum} {options} {source} {destination}"

def testInit(self):
testGFAL2Impl = GFAL2Impl()
removeCommand = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c " \
"'. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 %s '"
"'. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}'"
copyCommand = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '" \
". $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p " \
"-v --abort-on-failure %(checksum)s %(options)s %(source)s %(destination)s'"
". $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p " \
"-v --abort-on-failure {checksum} {options} {source} {destination}'"
self.assertEqual(removeCommand, testGFAL2Impl.removeCommand)
self.assertEqual(copyCommand, testGFAL2Impl.copyCommand)

Expand Down Expand Up @@ -105,22 +105,22 @@ def getCopyCommandDict(self, checksum, options, source, destination):
def getStageOutCommandResult(self, copyCommandDict, createRemoveFileCommandResult):
result = "#!/bin/bash\n"

copyCommand = self.copyCommand % copyCommandDict
copyCommand = self.copyCommand.format_map(copyCommandDict)
result += copyCommand

result += """
EXIT_STATUS=$?
echo "gfal-copy exit status: $EXIT_STATUS"
if [[ $EXIT_STATUS != 0 ]]; then
echo "ERROR: gfal-copy exited with $EXIT_STATUS"
echo "Cleaning up failed file:"
%s
echo "ERROR: gfal-copy exited with $EXIT_STATUS"
echo "Cleaning up failed file:"
{remove_command}
fi
exit $EXIT_STATUS
""" % createRemoveFileCommandResult

""".format(remove_command=createRemoveFileCommandResult)
return result

@mock.patch('WMCore.Storage.Backends.GFAL2Impl.os.path')
@mock.patch('WMCore.Storage.StageOutImpl.StageOutImpl.executeCommand')
def testRemoveFile_isFile(self, mock_executeCommand, mock_path):
Expand Down