From aae9f0ecea37e8cbd7d37bbeac705e474645ea06 Mon Sep 17 00:00:00 2001 From: Andrea Piccinelli Date: Fri, 23 Aug 2024 18:02:56 +0200 Subject: [PATCH 1/2] Augmentation of gfal-cp debuggin for StageOutImpl and GFAL2Impl scripts --- .../WMCore/Storage/Backends/GFAL2Impl.py | 125 +++++++++++++----- src/python/WMCore/Storage/StageOutImpl.py | 58 ++++---- 2 files changed, 127 insertions(+), 56 deletions(-) diff --git a/src/python/WMCore/Storage/Backends/GFAL2Impl.py b/src/python/WMCore/Storage/Backends/GFAL2Impl.py index 6e1d5f2b13..4b00d16591 100644 --- a/src/python/WMCore/Storage/Backends/GFAL2Impl.py +++ b/src/python/WMCore/Storage/Backends/GFAL2Impl.py @@ -24,11 +24,10 @@ def __init__(self, stagein=False): # Next commands after separation are executed without env -i and this leads us with # mixed environment with COMP and system python. # GFAL2 is not build under COMP environment and it had failures with mixed environment. - self.setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '%s'" - self.removeCommand = self.setups % '. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 %s ' - self.copyCommand = self.setups % '. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p %(checksum)s %(options)s %(source)s %(destination)s' - self.copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure %(checksum)s %(options)s %(source)s %(destination)s' - self.copyCommand = self.setups % ('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + self.copyOpts) + self.setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'" + self.removeCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}') + self.copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure {checksum} {options} {source} {destination}' + self.copyCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + self.copyOpts) def createFinalPFN(self, pfn): """ @@ -38,9 +37,9 @@ def createFinalPFN(self, pfn): if pfn.startswith('file:'): return pfn elif os.path.isfile(pfn): - return "file://%s" % os.path.abspath(pfn) + return "file://{}".format(os.path.abspath(pfn)) elif pfn.startswith('/'): - return "file://%s" % os.path.abspath(pfn) + return "file://{}".format(os.path.abspath(pfn)) return pfn def createSourceName(self, protocol, pfn): @@ -75,24 +74,20 @@ def createRemoveFileCommand(self, pfn): Command is interrupted if time expires before it finishes """ if os.path.isfile(pfn): - return "/bin/rm -f %s" % os.path.abspath(pfn) + return "/bin/rm -f {}".format(os.path.abspath(pfn)) else: - return self.removeCommand % self.createFinalPFN(pfn) + return self.removeCommand.format(self.createFinalPFN(pfn)) - def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None): + + def buildCopyCommandDict(self, sourcePFN, targetPFN, options=None, checksums=None): """ - _createStageOutCommand_ - Build a gfal-copy command + Build the gfal-cp command for stageOut - gfal-copy options used: - -t maximum time for the operation to terminate - -T global timeout for the transfer operation - -p if the destination directory does not exist, create it - -K checksum algorithm to use, or algorithm:value - -v enable the verbose mode (-v for warning level) - --abort-on-failure abort the whole copy as soon as one failure is encountered + :sourcePFN: str, PFN of the source file + :targetPFN: str, destination PFN + :options: str, additional options for gfal-cp + :checksums: dict, collect checksums according to the algorithms saved as keys """ - result = "#!/bin/bash\n" copyCommandDict = {'checksum': '', 'options': '', 'source': '', 'destination': ''} @@ -107,31 +102,99 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No if not args.nochecksum: if useChecksum: - checksums['adler32'] = "%08x" % int(checksums['adler32'], 16) - copyCommandDict['checksum'] = "-K adler32:%s" % checksums['adler32'] + checksums['adler32'] = "{:08x}".format(int(checksums['adler32'], 16)) + copyCommandDict['checksum'] = "-K adler32:{}".format(checksums['adler32']) else: copyCommandDict['checksum'] = "-K adler32" copyCommandDict['options'] = ' '.join(unknown) - copyCommandDict['source'] = self.createFinalPFN(sourcePFN) copyCommandDict['destination'] = self.createFinalPFN(targetPFN) - copyCommand = self.copyCommand % copyCommandDict - result += copyCommand + return copyCommandDict + + def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None): + """ + Create gfal-cp command for stageOut + + :sourcePFN: str, PFN of the source file + :targetPFN: str, destination PFN + :options: str, additional options for gfal-cp + :checksums: dict, collect checksums according to the algorithms saved as keys + """ + + copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums) + copyCommand = self.copyCommand.format_map(copyCommandDict) + result = "#!/bin/bash\n" + copyCommand if _CheckExitCodeOption: result += """ EXIT_STATUS=$? echo "gfal-copy exit status: $EXIT_STATUS" if [[ $EXIT_STATUS != 0 ]]; then - echo "ERROR: gfal-copy exited with $EXIT_STATUS" - echo "Cleaning up failed file:" - %s + echo "ERROR: gfal-copy exited with $EXIT_STATUS" + echo "Cleaning up failed file:" + {remove_command} fi exit $EXIT_STATUS - """ % self.createRemoveFileCommand(targetPFN) + """.format(remove_command=self.createRemoveFileCommand(targetPFN)) + + return result + + def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None): + """ + Debug a failed gfal-cp command for stageOut, without re-running it, + providing information on the environment and the certifications + :sourcePFN: str, PFN of the source file + :targetPFN: str, destination PFN + :options: str, additional options for gfal-cp + :checksums: dict, collect checksums according to the algorithms saved as keys + """ + + copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums) + copyCommand = self.copyCommand.format_map(copyCommandDict) + + result = "#!/bin/bash\n" + result += """ + echo + echo + echo "-----------------------------------------------------------" + echo "===========================================================" + echo + echo "Debugging information on failing gfal-copy command" + echo + echo "Current date and time: $(date +"%Y-%m-%d %H:%M:%S")" + echo "gfal-copy command which failed: {copy_command}" + echo "Hostname: $(hostname -f)" + echo "OS: $(uname -r -s)" + echo + echo "GFAL environment variables:" + env | grep ^GFAL + echo + echo "PYTHON environment variables:" + env | grep ^PYTHON + echo + echo "LD_* environment variables:" + env | grep ^LD_ + echo + echo "gfal-copy location: $(which gfal-copy)" + echo "Source PFN: {source}" + echo "Target PFN: {destination}" + echo + echo + echo "Information for credentials in the environment" + echo "Bearer token content: $BEARER_TOKEN" + echo "Bearer token file: $BEARER_TOKEN_FILE" + echo "httokendecode path: $(which httokendecode)" + echo "httokendecode: $httokendecode" + echo + echo "VOMS proxy info:" + voms-proxy-info -all + echo "===========================================================" + echo "-----------------------------------------------------------" + echo + """.format(copy_command=copyCommand, source=copyCommandDict['source'], destination=copyCommandDict['destination']) return result def removeFile(self, pfnToRemove): @@ -140,9 +203,9 @@ def removeFile(self, pfnToRemove): CleanUp pfn provided """ if os.path.isfile(pfnToRemove): - command = "/bin/rm -f %s" % os.path.abspath(pfnToRemove) + command = "/bin/rm -f {}".format(os.path.abspath(pfnToRemove)) else: - command = self.removeCommand % self.createFinalPFN(pfnToRemove) + command = self.removeCommand.format(self.createFinalPFN(pfnToRemove)) self.executeCommand(command) diff --git a/src/python/WMCore/Storage/StageOutImpl.py b/src/python/WMCore/Storage/StageOutImpl.py index 24a8f029a9..46573aec4b 100644 --- a/src/python/WMCore/Storage/StageOutImpl.py +++ b/src/python/WMCore/Storage/StageOutImpl.py @@ -8,7 +8,7 @@ """ from __future__ import print_function -from builtins import range, object +from builtins import range import logging import os @@ -18,7 +18,7 @@ from WMCore.Storage.StageOutError import StageOutError -class StageOutImpl(object): +class StageOutImpl: """ _StageOutImpl_ @@ -48,12 +48,12 @@ def splitPFN(pfn): """ protocol = pfn.split(':')[0] host = pfn.split('/')[2] - thisList = pfn.replace('%s://%s/' % (protocol, host), '').split('?') + thisList = pfn.replace('{}://{}/'.format(protocol, host), '').split('?') path = thisList[0] opaque = "" # If we have any opaque info keep it if len(thisList) == 2: - opaque = "?%s" % thisList[1] + opaque = "?{}".format(thisList[1]) # check for the path to actually be in the opaque information if opaque.startswith("?path="): @@ -85,16 +85,16 @@ def executeCommand(self, command): """ try: exitCode, output = runCommandWithOutput(command) - msg = "Command exited with status: %s\nOutput message: %s" % (exitCode, output) + msg = "Command exited with status: {}\nOutput message: {}".format(exitCode, output) logging.info(msg) except Exception as ex: - raise StageOutError(str(ex), Command=command, ExitCode=60311) + raise StageOutError(str(ex), Command=command, ExitCode=60311) from ex if exitCode: - msg = "Command exited non-zero, ExitCode:%s\nOutput: %s " % (exitCode, output) - logging.error("Exception During Stage Out:\n%s", msg) + msg = "Command exited non-zero, ExitCode: {}\nOutput: {}".format(exitCode, output) + formatted_msg = "Exception During Stage Out:\n{}".format(msg) + logging.error(formatted_msg) raise StageOutError(msg, Command=command, ExitCode=exitCode) - return def createSourceName(self, protocol, pfn): """ @@ -131,7 +131,6 @@ def createOutputDirectory(self, targetPFN): If no directory is required, do not implement this method """ - pass def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None): """ @@ -143,6 +142,13 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No """ raise NotImplementedError("StageOutImpl.createStageOutCommand") + def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None): + """ + Build a shell command that will report in the logs the details about + failing stageOut commands + """ + raise NotImplementedError("StageOutImpl.createDebuggingCommand") + def removeFile(self, pfnToRemove): """ _removeFile_ @@ -160,9 +166,9 @@ def createRemoveFileCommand(self, pfn): return the command to delete a file after a failed copy """ if pfn.startswith("/"): - return "/bin/rm -f %s" % pfn + return "/bin/rm -f {}".format(pfn) elif os.path.isfile(pfn): - return "/bin/rm -f %s" % os.path.abspath(pfn) + return "/bin/rm -f {}".format(os.path.abspath(pfn)) else: return "" @@ -183,7 +189,6 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None): # destination may also need PFN changed # i.e. if we are staging in a file from an SE targetPFN = self.createTargetName(protocol, targetPFN) - # // # // Create the output directory if implemented # // @@ -193,9 +198,9 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None): self.createOutputDirectory(targetPFN) break except StageOutError as ex: - msg = "Attempt %s to create a directory for stageout failed.\n" % retryCount - msg += "Automatically retrying in %s secs\n " % self.retryPause - msg += "Error details:\n%s\n" % str(ex) + msg = "Attempt {} to create a directory for stageout failed.\n".format(retryCount) + msg += "Automatically retrying stage out in {} secs\n ".format(self.retryPause) + msg += "Error details:\n{}\n".format(str(ex)) logging.error(msg) if retryCount == self.numRetries: # // @@ -212,22 +217,25 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None): # // Run the command # // + stageOutEx = None # variable to store the possible StageOutError for retryCount in range(self.numRetries + 1): try: logging.info("Running the stage out...") self.executeCommand(command) break except StageOutError as ex: - msg = "Attempt %s to stage out failed.\n" % retryCount - msg += "Automatically retrying in %s secs\n " % self.retryPause - msg += "Error details:\n%s\n" % str(ex) + msg = "Attempt {} to stage out failed.\n".format(retryCount) + msg += "Error details:\n{}\n".format(str(ex)) logging.error(msg) if retryCount == self.numRetries: - # // - # // last retry, propagate exception - # // - raise ex + # Last retry, propagate the information outside of the for loop + stageOutEx = ex + msg += "Automatically retrying in {} secs\n ".format(self.retryPause) time.sleep(self.retryPause) - # should never reach this point - return + # This block will now always be executed after retries are exhausted + if stageOutEx is not None: + logging.error("Maximum number of retries exhausted. Further details on the failed command reported below.") + command = self.createDebuggingCommand(sourcePFN, targetPFN, options, checksums) + self.executeCommand(command) + raise stageOutEx from None From b8e502b1a0b2504a0002f869e15b1d299cd191f4 Mon Sep 17 00:00:00 2001 From: Andrea Piccinelli Date: Wed, 11 Sep 2024 14:49:30 +0200 Subject: [PATCH 2/2] Adjusting the unit test scripts --- .../Storage_t/Backends_t/GFAL2Impl_t.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/test/python/WMCore_t/Storage_t/Backends_t/GFAL2Impl_t.py b/test/python/WMCore_t/Storage_t/Backends_t/GFAL2Impl_t.py index 9c856611f7..d82a504416 100644 --- a/test/python/WMCore_t/Storage_t/Backends_t/GFAL2Impl_t.py +++ b/test/python/WMCore_t/Storage_t/Backends_t/GFAL2Impl_t.py @@ -9,16 +9,16 @@ class GFAL2ImplTest(unittest.TestCase): def setUp(self): self.GFAL2Impl = GFAL2Impl() - self.removeCommand = self.GFAL2Impl.removeCommand = "removeCommand %s" - self.copyCommand = self.GFAL2Impl.copyCommand = "copyCommand %(checksum)s %(options)s %(source)s %(destination)s" + self.removeCommand = self.GFAL2Impl.removeCommand = "removeCommand {}" + self.copyCommand = self.GFAL2Impl.copyCommand = "copyCommand {checksum} {options} {source} {destination}" def testInit(self): testGFAL2Impl = GFAL2Impl() removeCommand = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c " \ - "'. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 %s '" + "'. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}'" copyCommand = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '" \ - ". $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p " \ - "-v --abort-on-failure %(checksum)s %(options)s %(source)s %(destination)s'" + ". $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p " \ + "-v --abort-on-failure {checksum} {options} {source} {destination}'" self.assertEqual(removeCommand, testGFAL2Impl.removeCommand) self.assertEqual(copyCommand, testGFAL2Impl.copyCommand) @@ -105,22 +105,22 @@ def getCopyCommandDict(self, checksum, options, source, destination): def getStageOutCommandResult(self, copyCommandDict, createRemoveFileCommandResult): result = "#!/bin/bash\n" - copyCommand = self.copyCommand % copyCommandDict + copyCommand = self.copyCommand.format_map(copyCommandDict) result += copyCommand result += """ EXIT_STATUS=$? echo "gfal-copy exit status: $EXIT_STATUS" if [[ $EXIT_STATUS != 0 ]]; then - echo "ERROR: gfal-copy exited with $EXIT_STATUS" - echo "Cleaning up failed file:" - %s + echo "ERROR: gfal-copy exited with $EXIT_STATUS" + echo "Cleaning up failed file:" + {remove_command} fi exit $EXIT_STATUS - """ % createRemoveFileCommandResult - + """.format(remove_command=createRemoveFileCommandResult) + return result - + @mock.patch('WMCore.Storage.Backends.GFAL2Impl.os.path') @mock.patch('WMCore.Storage.StageOutImpl.StageOutImpl.executeCommand') def testRemoveFile_isFile(self, mock_executeCommand, mock_path):