diff --git a/.github/workflows/basic.yml b/.github/workflows/basic.yml index d42df061..c305fcb2 100644 --- a/.github/workflows/basic.yml +++ b/.github/workflows/basic.yml @@ -53,7 +53,7 @@ jobs: strategy: matrix: python: - - 2.7.18 + # - 2.7.18 - 3.6.15 - 3.9.17 diff --git a/Pilot/dirac-pilot.py b/Pilot/dirac-pilot.py index 9c434c97..c0ffc379 100644 --- a/Pilot/dirac-pilot.py +++ b/Pilot/dirac-pilot.py @@ -63,8 +63,7 @@ # print the buffer, so we have a "classic' logger back in sync. sys.stdout.write(bufContent) # now the remote logger. - remote = pilotParams.pilotLogging and (pilotParams.loggerURL is not None) - if remote: + if pilotParams.pilotLogging: # In a remote logger enabled Dirac version we would have some classic logger content from a wrapper, # which we passed in: receivedContent = "" @@ -72,16 +71,22 @@ receivedContent = sys.stdin.read() log = RemoteLogger( pilotParams.loggerURL, - "Pilot", + useServerCertificate=pilotParams.useServerCertificate, + name="Pilot", bufsize=pilotParams.loggerBufsize, pilotUUID=pilotParams.pilotUUID, debugFlag=pilotParams.debugFlag, - wnVO=pilotParams.wnVO, ) log.info("Remote logger activated") - log.buffer.write(receivedContent) + log.buffer.write(log.format_to_json( + "INFO", + receivedContent, + )) log.buffer.flush() - log.buffer.write(bufContent) + log.buffer.write(log.format_to_json( + "INFO", + bufContent, + )) else: log = Logger("Pilot", debugFlag=pilotParams.debugFlag) @@ -103,7 +108,7 @@ log.info("Requested command extensions: %s" % str(pilotParams.commandExtensions)) log.info("Executing commands: %s" % str(pilotParams.commands)) - + remote = pilotParams.pilotLogging if remote: # It's safer to cancel the timer here. Each command has got its own logger object with a timer cancelled by the # finaliser. No need for a timer in the "else" code segment below. @@ -122,5 +127,8 @@ log.error("Command %s could not be instantiated" % commandName) # send the last message and abandon ship. if remote: - log.buffer.flush() + log.buffer.flush(force=True) sys.exit(-1) + + log.info("Pilot tasks finished.") + log.buffer.flush(force=True) \ No newline at end of file diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index 945a6b78..09d2fea5 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -88,20 +88,29 @@ def wrapper(self): except SystemExit as exCode: # or Exception ? # controlled exit + if self.pp.pilotLogging: + try: + self.log.error(str(exCode)) + self.log.error(traceback.format_exc()) + self.log.buffer.flush(force=True) + except Exception as exc: + self.log.error("Remote logger couldn't be finalised %s " % str(exc)) + raise + + # If we don't have a remote logger pRef = self.pp.pilotReference self.log.info( - "Flushing the remote logger buffer for pilot on sys.exit(): %s (exit code:%s)" % (pRef, str(exCode)) + "Flushing the logger buffer for pilot on sys.exit(): %s (exit code:%s)" % (pRef, str(exCode)) ) self.log.buffer.flush() # flush the buffer unconditionally (on sys.exit()). - try: - sendMessage(self.log.url, self.log.pilotUUID, self.log.wnVO, "finaliseLogs", {"retCode": str(exCode)}) - except Exception as exc: - self.log.error("Remote logger couldn't be finalised %s " % str(exc)) - raise + except Exception as exc: # unexpected exit: document it and bail out. - self.log.error(str(exc)) - self.log.error(traceback.format_exc()) + if self.pp.pilotLogging: + # Force flush if it's a remote logger + self.log.buffer.flush(force=True) + else: + self.log.buffer.flush() raise finally: self.log.buffer.cancelTimer() diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 8afe0f62..3f390d9a 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -518,7 +518,8 @@ class RemoteLogger(Logger): def __init__( self, - url, + url, # Not used yet + useServerCertificate, name="Pilot", debugFlag=False, pilotOutput="pilot.out", @@ -526,7 +527,6 @@ def __init__( pilotUUID="unknown", flushInterval=10, bufsize=1000, - wnVO="unknown", ): """ c'tor @@ -536,36 +536,49 @@ def __init__( super(RemoteLogger, self).__init__(name, debugFlag, pilotOutput) self.url = url self.pilotUUID = pilotUUID - self.wnVO = wnVO self.isPilotLoggerOn = isPilotLoggerOn - sendToURL = partial(sendMessage, url, pilotUUID, wnVO, "sendMessage") + sendToURL = partial(sendMessage, useServerCertificate, pilotUUID) self.buffer = FixedSizeBuffer(sendToURL, bufsize=bufsize, autoflush=flushInterval) - def debug(self, msg, header=True, _sendPilotLog=False): - # TODO: Send pilot log remotely? + def format_to_json(self, level, message): + + escaped = json.dumps(message)[1:-1] # remove outer quotes + + # Split on escaped newlines + splitted_message = escaped.split("\\n") + + output = [] + for mess in splitted_message: + if mess: + output.append({ + "timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "severity": level, + "message": mess, + "scope": self.name + }) + return output + + def debug(self, msg, header=True): super(RemoteLogger, self).debug(msg, header) if ( self.isPilotLoggerOn and self.debugFlag ): # the -d flag activates this debug flag in CommandBase via PilotParams - self.sendMessage(self.messageTemplate.format(level="DEBUG", message=msg)) + self.sendMessage(self.format_to_json(level="DEBUG", message=msg)) - def error(self, msg, header=True, _sendPilotLog=False): - # TODO: Send pilot log remotely? + def error(self, msg, header=True): super(RemoteLogger, self).error(msg, header) if self.isPilotLoggerOn: - self.sendMessage(self.messageTemplate.format(level="ERROR", message=msg)) + self.sendMessage(self.format_to_json(level="ERROR", message=msg)) - def warn(self, msg, header=True, _sendPilotLog=False): - # TODO: Send pilot log remotely? + def warn(self, msg, header=True): super(RemoteLogger, self).warn(msg, header) if self.isPilotLoggerOn: - self.sendMessage(self.messageTemplate.format(level="WARNING", message=msg)) + self.sendMessage(self.format_to_json(level="WARNING", message=msg)) - def info(self, msg, header=True, _sendPilotLog=False): - # TODO: Send pilot log remotely? + def info(self, msg, header=True): super(RemoteLogger, self).info(msg, header) if self.isPilotLoggerOn: - self.sendMessage(self.messageTemplate.format(level="INFO", message=msg)) + self.sendMessage(self.format_to_json(level="INFO", message=msg)) def sendMessage(self, msg): """ @@ -577,7 +590,7 @@ def sendMessage(self, msg): :rtype: None """ try: - self.buffer.write(msg + "\n") + self.buffer.write(msg) except Exception as err: super(RemoteLogger, self).error("Message not sent") super(RemoteLogger, self).error(str(err)) @@ -622,34 +635,31 @@ def __init__(self, senderFunc, bufsize=1000, autoflush=10): self._timer.start() else: self._timer = None - self.output = StringIO() + self.output = [] self.bufsize = bufsize self._nlines = 0 self.senderFunc = senderFunc @synchronized - def write(self, text): + def write(self, content_json): """ Write text to a string buffer. Newline characters are counted and number of lines in the buffer is increased accordingly. - :param text: text string to write - :type text: str + :param content_json: Json to send, format following format_to_json + :type content_json: list[dict] :return: None :rtype: None """ - # reopen the buffer in a case we had to flush a partially filled buffer - if self.output.closed: - self.output = StringIO() - self.output.write(text) - self._nlines += max(1, text.count("\n")) + + self.output.extend(content_json) + + try: + self._nlines += max(1, len(content_json)) + except Exception: + raise ValueError(content_json) self.sendFullBuffer() - @synchronized - def getValue(self): - content = self.output.getvalue() - return content - @synchronized def sendFullBuffer(self): """ @@ -659,22 +669,19 @@ def sendFullBuffer(self): if self._nlines >= self.bufsize: self.flush() - self.output = StringIO() + self.output = [] @synchronized - def flush(self): + def flush(self, force=False): """ Flush the buffer and send log records to a remote server. The buffer is closed as well. :return: None :rtype: None """ - if not self.output.closed and self._nlines > 0: - self.output.flush() - buf = self.getValue() - self.senderFunc(buf) + if force or (self.output and self._nlines > 0): + self.senderFunc(self.output) self._nlines = 0 - self.output.close() def cancelTimer(self): """ @@ -687,40 +694,32 @@ def cancelTimer(self): self._timer.cancel() -def sendMessage(url, pilotUUID, wnVO, method, rawMessage): - """ - Invoke a remote method on a Tornado server and pass a JSON message to it. - - :param str url: Server URL - :param str pilotUUID: pilot unique ID - :param str wnVO: VO name, relevant only if not contained in a proxy - :param str method: a method to be invoked - :param str rawMessage: a message to be sent, in JSON format - :return: None. - """ - caPath = os.getenv("X509_CERT_DIR") - cert = os.getenv("X509_USER_PROXY") - - context = ssl.create_default_context() - context.load_verify_locations(capath=caPath) +def sendMessage(useServerCertificate, pilotUUID, rawMessage = []): + cfg = [] + if useServerCertificate: + cfg.append("-o /DIRAC/Security/UseServerCertificate=yes") - message = json.dumps((json.dumps(rawMessage), pilotUUID, wnVO)) + formatted_logs = json.dumps(rawMessage) + + # Escape single quotes in JSON string for safe shell quoting + safe_logs = formatted_logs.replace("'", "'\\''") - try: - context.load_cert_chain(cert) # this is a proxy - raw_data = {"method": method, "args": message} - except IsADirectoryError: # assuming it'a dir containing cert and key - context.load_cert_chain(os.path.join(cert, "hostcert.pem"), os.path.join(cert, "hostkey.pem")) - raw_data = {"method": method, "args": message, "extraCredentials": '"hosts"'} - - if sys.version_info.major == 3: - data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3 - else: - # Python2 - data = urlencode(raw_data) + cmd = "dirac-admin-send-pilot-logs %s '%s' %s -d" % ( + pilotUUID, + safe_logs, + " ".join(cfg), + ) - res = urlopen(url, data, context=context) - res.close() + FNULL = open(os.devnull, 'w') + _p = subprocess.Popen( + cmd, + shell=True, + stdout=FNULL, + stderr=FNULL, + close_fds=False + ) + _p.wait() + FNULL.close() class CommandBase(object): @@ -750,12 +749,12 @@ def __init__(self, pilotParams): # remote logger self.log = RemoteLogger( loggerURL, - self.__class__.__name__, + useServerCertificate=pilotParams.useServerCertificate, + name=self.__class__.__name__, pilotUUID=pilotParams.pilotUUID, debugFlag=self.debugFlag, flushInterval=interval, bufsize=bufsize, - wnVO=pilotParams.wnVO, ) self.log.isPilotLoggerOn = isPilotLoggerOn @@ -805,8 +804,12 @@ def executeAndGetOutput(self, cmd, environDict=None): else: sys.stdout.write(outChunk) sys.stdout.flush() - if hasattr(self.log, "buffer") and self.log.isPilotLoggerOn: - self.log.buffer.write(outChunk) + if hasattr(self.log, "url"): + # It's a remote logger + self.log.buffer.write(self.log.format_to_json( # type: ignore + "COMMAND", + outChunk + )) outData += outChunk # If no data was read on any of the pipes then the process has finished if not dataWasRead: