Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/basic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
strategy:
matrix:
python:
- 2.7.18
# - 2.7.18
- 3.6.15
- 3.9.17

Expand Down
24 changes: 16 additions & 8 deletions Pilot/dirac-pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,30 @@
# print the buffer, so we have a "classic' logger back in sync.
sys.stdout.write(bufContent)
# now the remote logger.
remote = pilotParams.pilotLogging and (pilotParams.loggerURL is not None)
if remote:
if pilotParams.pilotLogging:
# In a remote logger enabled Dirac version we would have some classic logger content from a wrapper,
# which we passed in:
receivedContent = ""
if not sys.stdin.isatty():
receivedContent = sys.stdin.read()
log = RemoteLogger(
pilotParams.loggerURL,
"Pilot",
useServerCertificate=pilotParams.useServerCertificate,
name="Pilot",
bufsize=pilotParams.loggerBufsize,
pilotUUID=pilotParams.pilotUUID,
debugFlag=pilotParams.debugFlag,
wnVO=pilotParams.wnVO,
)
log.info("Remote logger activated")
log.buffer.write(receivedContent)
log.buffer.write(log.format_to_json(
"INFO",
receivedContent,
))
log.buffer.flush()
log.buffer.write(bufContent)
log.buffer.write(log.format_to_json(
"INFO",
bufContent,
))
else:
log = Logger("Pilot", debugFlag=pilotParams.debugFlag)

Expand All @@ -103,7 +108,7 @@
log.info("Requested command extensions: %s" % str(pilotParams.commandExtensions))

log.info("Executing commands: %s" % str(pilotParams.commands))

remote = pilotParams.pilotLogging
if remote:
# It's safer to cancel the timer here. Each command has got its own logger object with a timer cancelled by the
# finaliser. No need for a timer in the "else" code segment below.
Expand All @@ -122,5 +127,8 @@
log.error("Command %s could not be instantiated" % commandName)
# send the last message and abandon ship.
if remote:
log.buffer.flush()
log.buffer.flush(force=True)
sys.exit(-1)

log.info("Pilot tasks finished.")
log.buffer.flush(force=True)
25 changes: 17 additions & 8 deletions Pilot/pilotCommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,29 @@ def wrapper(self):

except SystemExit as exCode: # or Exception ?
# controlled exit
if self.pp.pilotLogging:
try:
self.log.error(str(exCode))
self.log.error(traceback.format_exc())
self.log.buffer.flush(force=True)
except Exception as exc:
self.log.error("Remote logger couldn't be finalised %s " % str(exc))
raise

# If we don't have a remote logger
pRef = self.pp.pilotReference
self.log.info(
"Flushing the remote logger buffer for pilot on sys.exit(): %s (exit code:%s)" % (pRef, str(exCode))
"Flushing the logger buffer for pilot on sys.exit(): %s (exit code:%s)" % (pRef, str(exCode))
)
self.log.buffer.flush() # flush the buffer unconditionally (on sys.exit()).
try:
sendMessage(self.log.url, self.log.pilotUUID, self.log.wnVO, "finaliseLogs", {"retCode": str(exCode)})
except Exception as exc:
self.log.error("Remote logger couldn't be finalised %s " % str(exc))
raise

except Exception as exc:
# unexpected exit: document it and bail out.
self.log.error(str(exc))
self.log.error(traceback.format_exc())
if self.pp.pilotLogging:
# Force flush if it's a remote logger
self.log.buffer.flush(force=True)
else:
self.log.buffer.flush()
raise
finally:
self.log.buffer.cancelTimer()
Expand Down
149 changes: 76 additions & 73 deletions Pilot/pilotTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,15 +518,15 @@ class RemoteLogger(Logger):

def __init__(
self,
url,
url, # Not used yet
useServerCertificate,
name="Pilot",
debugFlag=False,
pilotOutput="pilot.out",
isPilotLoggerOn=True,
pilotUUID="unknown",
flushInterval=10,
bufsize=1000,
wnVO="unknown",
):
"""
c'tor
Expand All @@ -536,36 +536,49 @@ def __init__(
super(RemoteLogger, self).__init__(name, debugFlag, pilotOutput)
self.url = url
self.pilotUUID = pilotUUID
self.wnVO = wnVO
self.isPilotLoggerOn = isPilotLoggerOn
sendToURL = partial(sendMessage, url, pilotUUID, wnVO, "sendMessage")
sendToURL = partial(sendMessage, useServerCertificate, pilotUUID)
self.buffer = FixedSizeBuffer(sendToURL, bufsize=bufsize, autoflush=flushInterval)

def debug(self, msg, header=True, _sendPilotLog=False):
# TODO: Send pilot log remotely?
def format_to_json(self, level, message):

escaped = json.dumps(message)[1:-1] # remove outer quotes

# Split on escaped newlines
splitted_message = escaped.split("\\n")

output = []
for mess in splitted_message:
if mess:
output.append({
"timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"severity": level,
"message": mess,
"scope": self.name
})
return output

def debug(self, msg, header=True):
super(RemoteLogger, self).debug(msg, header)
if (
self.isPilotLoggerOn and self.debugFlag
): # the -d flag activates this debug flag in CommandBase via PilotParams
self.sendMessage(self.messageTemplate.format(level="DEBUG", message=msg))
self.sendMessage(self.format_to_json(level="DEBUG", message=msg))

def error(self, msg, header=True, _sendPilotLog=False):
# TODO: Send pilot log remotely?
def error(self, msg, header=True):
super(RemoteLogger, self).error(msg, header)
if self.isPilotLoggerOn:
self.sendMessage(self.messageTemplate.format(level="ERROR", message=msg))
self.sendMessage(self.format_to_json(level="ERROR", message=msg))

def warn(self, msg, header=True, _sendPilotLog=False):
# TODO: Send pilot log remotely?
def warn(self, msg, header=True):
super(RemoteLogger, self).warn(msg, header)
if self.isPilotLoggerOn:
self.sendMessage(self.messageTemplate.format(level="WARNING", message=msg))
self.sendMessage(self.format_to_json(level="WARNING", message=msg))

def info(self, msg, header=True, _sendPilotLog=False):
# TODO: Send pilot log remotely?
def info(self, msg, header=True):
super(RemoteLogger, self).info(msg, header)
if self.isPilotLoggerOn:
self.sendMessage(self.messageTemplate.format(level="INFO", message=msg))
self.sendMessage(self.format_to_json(level="INFO", message=msg))

def sendMessage(self, msg):
"""
Expand All @@ -577,7 +590,7 @@ def sendMessage(self, msg):
:rtype: None
"""
try:
self.buffer.write(msg + "\n")
self.buffer.write(msg)
except Exception as err:
super(RemoteLogger, self).error("Message not sent")
super(RemoteLogger, self).error(str(err))
Expand Down Expand Up @@ -622,34 +635,31 @@ def __init__(self, senderFunc, bufsize=1000, autoflush=10):
self._timer.start()
else:
self._timer = None
self.output = StringIO()
self.output = []
self.bufsize = bufsize
self._nlines = 0
self.senderFunc = senderFunc

@synchronized
def write(self, text):
def write(self, content_json):
"""
Write text to a string buffer. Newline characters are counted and number of lines in the buffer
is increased accordingly.

:param text: text string to write
:type text: str
:param content_json: Json to send, format following format_to_json
:type content_json: list[dict]
:return: None
:rtype: None
"""
# reopen the buffer in a case we had to flush a partially filled buffer
if self.output.closed:
self.output = StringIO()
self.output.write(text)
self._nlines += max(1, text.count("\n"))

self.output.extend(content_json)

try:
self._nlines += max(1, len(content_json))
except Exception:
raise ValueError(content_json)
self.sendFullBuffer()

@synchronized
def getValue(self):
content = self.output.getvalue()
return content

@synchronized
def sendFullBuffer(self):
"""
Expand All @@ -659,22 +669,19 @@ def sendFullBuffer(self):

if self._nlines >= self.bufsize:
self.flush()
self.output = StringIO()
self.output = []

@synchronized
def flush(self):
def flush(self, force=False):
"""
Flush the buffer and send log records to a remote server. The buffer is closed as well.

:return: None
:rtype: None
"""
if not self.output.closed and self._nlines > 0:
self.output.flush()
buf = self.getValue()
self.senderFunc(buf)
if force or (self.output and self._nlines > 0):
self.senderFunc(self.output)
self._nlines = 0
self.output.close()

def cancelTimer(self):
"""
Expand All @@ -687,40 +694,32 @@ def cancelTimer(self):
self._timer.cancel()


def sendMessage(url, pilotUUID, wnVO, method, rawMessage):
"""
Invoke a remote method on a Tornado server and pass a JSON message to it.

:param str url: Server URL
:param str pilotUUID: pilot unique ID
:param str wnVO: VO name, relevant only if not contained in a proxy
:param str method: a method to be invoked
:param str rawMessage: a message to be sent, in JSON format
:return: None.
"""
caPath = os.getenv("X509_CERT_DIR")
cert = os.getenv("X509_USER_PROXY")

context = ssl.create_default_context()
context.load_verify_locations(capath=caPath)
def sendMessage(useServerCertificate, pilotUUID, rawMessage = []):
cfg = []
if useServerCertificate:
cfg.append("-o /DIRAC/Security/UseServerCertificate=yes")

message = json.dumps((json.dumps(rawMessage), pilotUUID, wnVO))
formatted_logs = json.dumps(rawMessage)

# Escape single quotes in JSON string for safe shell quoting
safe_logs = formatted_logs.replace("'", "'\\''")

try:
context.load_cert_chain(cert) # this is a proxy
raw_data = {"method": method, "args": message}
except IsADirectoryError: # assuming it'a dir containing cert and key
context.load_cert_chain(os.path.join(cert, "hostcert.pem"), os.path.join(cert, "hostkey.pem"))
raw_data = {"method": method, "args": message, "extraCredentials": '"hosts"'}

if sys.version_info.major == 3:
data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3
else:
# Python2
data = urlencode(raw_data)
cmd = "dirac-admin-send-pilot-logs %s '%s' %s -d" % (
pilotUUID,
safe_logs,
" ".join(cfg),
)

res = urlopen(url, data, context=context)
res.close()
FNULL = open(os.devnull, 'w')
_p = subprocess.Popen(
cmd,
shell=True,
stdout=FNULL,
stderr=FNULL,
close_fds=False
)
_p.wait()
FNULL.close()


class CommandBase(object):
Expand Down Expand Up @@ -750,12 +749,12 @@ def __init__(self, pilotParams):
# remote logger
self.log = RemoteLogger(
loggerURL,
self.__class__.__name__,
useServerCertificate=pilotParams.useServerCertificate,
name=self.__class__.__name__,
pilotUUID=pilotParams.pilotUUID,
debugFlag=self.debugFlag,
flushInterval=interval,
bufsize=bufsize,
wnVO=pilotParams.wnVO,
)

self.log.isPilotLoggerOn = isPilotLoggerOn
Expand Down Expand Up @@ -805,8 +804,12 @@ def executeAndGetOutput(self, cmd, environDict=None):
else:
sys.stdout.write(outChunk)
sys.stdout.flush()
if hasattr(self.log, "buffer") and self.log.isPilotLoggerOn:
self.log.buffer.write(outChunk)
if hasattr(self.log, "url"):
# It's a remote logger
self.log.buffer.write(self.log.format_to_json( # type: ignore
"COMMAND",
outChunk
))
outData += outChunk
# If no data was read on any of the pipes then the process has finished
if not dataWasRead:
Expand Down