From 65583126aead9c3b42fc569c8777bcec5fce2ee5 Mon Sep 17 00:00:00 2001 From: Ludovic <54670129+lbr38@users.noreply.github.com> Date: Fri, 27 Sep 2024 17:25:33 +0200 Subject: [PATCH] 3.4.0 --- src/controllers/Module/Reposerver/Agent.py | 403 +++++++++++++++------ src/controllers/Package/Apt.py | 31 +- version | 2 +- 3 files changed, 316 insertions(+), 120 deletions(-) diff --git a/src/controllers/Module/Reposerver/Agent.py b/src/controllers/Module/Reposerver/Agent.py index 56cb7bb..2af7f2e 100644 --- a/src/controllers/Module/Reposerver/Agent.py +++ b/src/controllers/Module/Reposerver/Agent.py @@ -8,6 +8,8 @@ import json import sys from pathlib import Path +from shutil import rmtree +import os # Import classes from src.controllers.Log import Log @@ -24,6 +26,17 @@ def __init__(self): self.reposerverStatusController = Status() self.packageController = Package() + # Set default values + self.authenticated = False + + # Root directory for the requests logs + self.request_dir = '/opt/linupdate/tmp/reposerver/requests' + + # Create the directories to store the logs if they do not exist + if not Path(self.request_dir).is_dir(): + Path(self.request_dir).mkdir(parents=True, exist_ok=True) + + #----------------------------------------------------------------------------------------------- # # General checks for running the agent @@ -139,20 +152,123 @@ def set_request_status(self, request_id, status): self.websocket.send(json.dumps(json_data)) + #----------------------------------------------------------------------------------------------- + # + # Send remaining requests logs to the reposerver + # + #----------------------------------------------------------------------------------------------- + def send_remaining_requests_logs(self): + try: + # Quit if not authenticated to the reposerver + if not self.authenticated: + return + + # Get all requests logs directories + requests_dirs = Path(self.request_dir).iterdir() + + # Then order them by name + requests_dirs = sorted(requests_dirs, key=lambda x: x.name) + + # For each directory, get its name (request id) + for request_dir in requests_dirs: + try: + status = 'unknown' + summary = None + error = None + logcontent = None + + # Check if it is a directory + if not request_dir.is_dir(): + continue + + # Only process directories with a creation time older than 5 minutes + if time.time() - os.path.getmtime(request_dir) < 300: + continue + + # If the directory is empty, then delete it + if not any(request_dir.iterdir()): + rmtree(request_dir) + continue + + # Initialize json + json_response = { + 'response-to-request': {} + } + + # Get request id and set it in the json + request_id = request_dir.name + json_response['response-to-request']['request-id'] = request_id + + # Get general log, if log file exists, and set it in the json + if Path(self.request_dir + '/' + request_id + '/log').is_file(): + with open(self.request_dir + '/' + request_id + '/log', 'r') as file: + # Get log content and remove ANSI escape codes + logcontent = Utils().remove_ansi(file.read()).strip() + + # Get status, if log file exists, and set it in the json + if Path(self.request_dir + '/' + request_id + '/status').is_file(): + with open(self.request_dir + '/' + request_id + '/status', 'r') as file: + status = file.read().strip() + + # Get summary, if log file exists, and set it in the json + if Path(self.request_dir + '/' + request_id + '/summary').is_file(): + with open(self.request_dir + '/' + request_id + '/summary', 'r') as file: + summary = file.read().strip() + # Convert the string to a json + summary = json.loads(summary) + + # Get error, if log file exists, and set it in the json + if Path(self.request_dir + '/' + request_id + '/error').is_file(): + with open(self.request_dir + '/' + request_id + '/error', 'r') as file: + error = file.read().strip() + + if status: + json_response['response-to-request']['status'] = status + + if summary: + json_response['response-to-request']['summary'] = summary + + if error: + json_response['response-to-request']['error'] = error + + if logcontent: + json_response['response-to-request']['log'] = logcontent + + # Send the response + print('[reposerver-agent] Sending remaining requests logs for request id #' + request_id + ' with status: ' + status) + self.websocket.send(json.dumps(json_response)) + + # Delete the directory and all its content + if Path(self.request_dir + '/' + request_id).is_dir(): + rmtree(self.request_dir + '/' + request_id) + except Exception as e: + raise Exception('could not send remaining requests logs for request id #' + request_id + ': ' + str(e)) + except Exception as e: + print('[reposerver-agent] Error: ' + str(e)) + + #----------------------------------------------------------------------------------------------- # # On message received from the websocket # #----------------------------------------------------------------------------------------------- def websocket_on_message(self, ws, message): - # Decode JSON message - message = json.loads(message) + # Default values request_id = None summary = None - log = '/tmp/linupdate.reposerver.request.log' + status = None + error = None + + # Decode JSON message + message = json.loads(message) + + # Default log file path, could be overwritten if request id is present + log = '/opt/linupdate/tmp/reposerver/requests/log' + # Lock to prevent service restart while processing the request lock = '/tmp/linupdate.reposerver.request.lock' - error = None + + # Default json response json_response = { 'response-to-request': { 'request-id': '', @@ -162,134 +278,181 @@ def websocket_on_message(self, ws, message): } } - # Create a lock file to prevent service restart while processing the request - if not Path(lock).is_file(): - Path(lock).touch() - - # If the message contains 'request' - if 'request' in message: - try: - # Retrieve request Id if any (authenticate request does not have an id) - if 'request-id' in message: - request_id = message['request-id'] - - # Case the request is 'authenticate', then authenticate to the reposerver - if message['request'] == 'authenticate': - print('[reposerver-agent] Authenticating to the reposerver') - - id = self.configuration['client']['auth']['id'] - token = self.configuration['client']['auth']['token'] - - # Send a response to authenticate to the reposerver, with id and token - self.websocket.send(json.dumps({'response-to-request': {'request': 'authenticate', 'auth-id': id, 'token': token}})) - - # Case the request is 'request-general-infos', then send general informations to the reposerver - elif message['request'] == 'request-general-infos': - print('[reposerver-agent] Reposerver requested general informations') - with Log(log): - self.reposerverStatusController.send_general_info() - - # Case the request is 'request-packages-infos', then send packages informations to the reposerver - elif message['request'] == 'request-packages-infos': - print('[reposerver-agent] Reposerver requested packages informations') - - # Send a response to the reposerver to make the request as running - self.set_request_status(request_id, 'running') - - # Log everything to the log file - with Log(log): - self.reposerverStatusController.send_packages_info() - - # Case the request is 'update-all-packages', then update all packages - elif message['request'] == 'update-all-packages': - print('[reposerver-agent] Reposerver requested all packages update') + try: + # Create a lock file to prevent service restart while processing the request + if not Path(lock).is_file(): + Path(lock).touch() - # Send a response to the reposerver to make the request as running - self.set_request_status(request_id, 'running') + # If the message contains 'request' + if 'request' in message: + try: + # Retrieve request Id if any (authenticate request does not have an id) + if 'request-id' in message: + request_id = str(message['request-id']) - # Log everything to the log file - with Log(log): - self.packageController.update([], True) + # Create a new directory for the request id + if not Path(self.request_dir + '/' + request_id).is_dir(): + Path(self.request_dir + '/' + request_id).mkdir(parents=True, exist_ok=True) - # Send a summary to the reposerver, with the summary of the update (number of packages updated or failed) - summary = self.packageController.summary + # Set new log files path for the request id + log = self.request_dir + '/' + request_id + '/log' + log_status = self.request_dir + '/' + request_id + '/status' + log_error = self.request_dir + '/' + request_id + '/error' + log_summary = self.request_dir + '/' + request_id + '/summary' - # Case the request is 'request-specific-packages-installation', then update a list of packages - # A list of packages must be provided in the message - elif message['request'] == 'request-specific-packages-installation': - if 'data' in message: - if 'packages' in message['data'] and len(message['data']['packages']) > 0: - print('[reposerver-agent] Reposerver requested to update a list of packages') + # Case the request is 'authenticate', then authenticate to the reposerver + if message['request'] == 'authenticate': + print('[reposerver-agent] Authenticating to the reposerver') - # Send a response to the reposerver to make the request as running - self.set_request_status(request_id, 'running') + id = self.configuration['client']['auth']['id'] + token = self.configuration['client']['auth']['token'] - # Log everything to the log file - with Log(log): - self.packageController.update(message['data']['packages'], True) + # Send a response to authenticate to the reposerver, with id and token + self.websocket.send(json.dumps({'response-to-request': {'request': 'authenticate', 'auth-id': id, 'token': token}})) - # Send a summary to the reposerver, with the summary of the installation (number of packages installed or failed) - summary = self.packageController.summary + # Case the request is 'request-general-infos', then send general informations to the reposerver + elif message['request'] == 'request-general-infos': + print('[reposerver-agent] Reposerver requested general informations') + with Log(log): + self.reposerverStatusController.send_general_info() - else: - raise Exception('unknown request sent by reposerver: ' + message['request']) + # Case the request is 'request-packages-infos', then send packages informations to the reposerver + elif message['request'] == 'request-packages-infos': + print('[reposerver-agent] Reposerver requested packages informations') - # If request was successful - status = 'completed' + # Send a response to the reposerver to make the request as running + self.set_request_status(request_id, 'running') - # If request failed - except Exception as e: - print('[reposerver-agent] Error: ' + str(e)) - status = 'failed' - error = str(e) + # Log everything to the log file + with Log(log): + self.reposerverStatusController.send_packages_info() - finally: - # If there was a request id, then send a response to reposerver to make the request as completed - if request_id: - # Set request id - json_response['response-to-request']['request-id'] = request_id + # Case the request is 'update-all-packages', then update all packages + elif message['request'] == 'update-all-packages': + print('[reposerver-agent] Reposerver requested all packages update') - # Set status - json_response['response-to-request']['status'] = status + # Send a response to the reposerver to make the request as running + self.set_request_status(request_id, 'running') - # If there was an error - if error: - json_response['response-to-request']['error'] = error + # Log everything to the log file + with Log(log): + self.packageController.update([], True) - # If there is a summary - if summary: - json_response['response-to-request']['summary'] = summary + # Send a summary to the reposerver, with the summary of the update (number of packages updated or failed) + summary = self.packageController.summary - # If there is a log file - if log and Path(log).is_file(): - # Get log file content - try: - with open(log, 'r') as file: - # Get log content and remove ANSI escape codes - logcontent = Utils().remove_ansi(file.read()) + # Case the request is 'request-specific-packages-installation', then update a list of packages + # A list of packages must be provided in the message + elif message['request'] == 'request-specific-packages-installation': + if 'data' in message: + if 'packages' in message['data'] and len(message['data']['packages']) > 0: + print('[reposerver-agent] Reposerver requested to update a list of packages') - # Delete the log file - Path(log).unlink() - except Exception as e: - # If content could not be read, then generate an error message - logcontent = 'Error: could not read log file: ' + str(e) + # Send a response to the reposerver to make the request as running + self.set_request_status(request_id, 'running') - json_response['response-to-request']['log'] = logcontent + # Log everything to the log file + with Log(log): + self.packageController.update(message['data']['packages'], True) - # Send the response - self.websocket.send(json.dumps(json_response)) + # Send a summary to the reposerver, with the summary of the installation (number of packages installed or failed) + summary = self.packageController.summary - # If the message contains 'info' - if 'info' in message: - print('[reposerver-agent] Received info message from reposerver: ' + message['info']) + else: + raise Exception('unknown request sent by reposerver: ' + message['request']) - # If the message contains 'error' - if 'error' in message: - print('[reposerver-agent] Received error message from reposerver: ' + message['error']) + # If request was successful + status = 'completed' - # Delete the lock file - if Path(lock).is_file(): - Path(lock).unlink() + # If request failed + except Exception as e: + print('[reposerver-agent] Error: ' + str(e)) + status = 'failed' + error = str(e) + + finally: + # If there was a request id, then send a response to reposerver to make the request as completed + if request_id: + # First, save the log, status, summary and error to a file, in case the message cannot be sent + # It will be sent later when the agent is running again + if status: + with open(log_status, 'w') as file: + file.write(status) + if summary: + with open(log_summary, 'w') as file: + # Convert summary to string to write it to the file + json.dump(summary, file) + if error: + with open(log_error, 'w') as file: + file.write(error) + + # Then try to send the response + + # Set request id + json_response['response-to-request']['request-id'] = request_id + + # Set status + json_response['response-to-request']['status'] = status + + # If there was an error + if error: + json_response['response-to-request']['error'] = error + + # If there is a summary + if summary: + json_response['response-to-request']['summary'] = summary + + # If there is a log file + if log and Path(log).is_file(): + # Get log file content + try: + with open(log, 'r') as file: + # Get log content and remove ANSI escape codes + logcontent = Utils().remove_ansi(file.read()) + except Exception as e: + # If content could not be read, then generate an error message + logcontent = 'Error: could not read log file: ' + str(e) + + json_response['response-to-request']['log'] = logcontent + + # Try to send the response to the reposerver + # Note: impossible to use try/except here, because no exception is raised directly, + # if there is an error then it is the on_error function that is called + self.websocket.send(json.dumps(json_response)) + + # If the message contains 'info' + if 'info' in message: + print('[reposerver-agent] Received info message from reposerver: ' + message['info']) + + # If the message is 'Authentication successful', then set authenticated to True + if message['info'] == 'Authentication successful': + self.authenticated = True + + # If the message is 'Request response received', then delete the remaining logs + if message['info'] == 'Request response received': + # First retrieve the request id + if 'request-id' in message: + request_id = message['request-id'] + + # If the server has tell what kinf of data it has received, then delete the corresponding files if they exist + if 'data' in message: + for data in message['data']: + if Path(self.request_dir + '/' + request_id + '/' + data).is_file(): + Path(self.request_dir + '/' + request_id + '/' + data).unlink() + + # Then delete the logs directory + # if Path(self.request_dir + '/' + request_id).is_dir(): + # rmtree(self.request_dir + '/' + request_id) + + # If the message contains 'error' + if 'error' in message: + print('[reposerver-agent] Received error message from reposerver: ' + message['error']) + + # If all goes well or if an exception is raised, then delete the lock file + finally: + # Delete the lock file + if Path(lock).is_file(): + Path(lock).unlink() #----------------------------------------------------------------------------------------------- @@ -308,6 +471,7 @@ def websocket_on_error(self, ws, error): #----------------------------------------------------------------------------------------------- def websocket_on_close(self, ws, close_status_code, close_msg): print('[reposerver-agent] Reposerver websocket connection closed with status code: ' + str(close_status_code) + ' and message: ' + close_msg) + self.authenticated = False raise Exception('reposerver websocket connection closed') @@ -356,12 +520,14 @@ def websocket_client(self): # self.websocket.on_open = self.websocket_on_open self.websocket.run_forever() - except KeyboardInterrupt: + except KeyboardInterrupt as e: self.websocket_is_running = False self.websocket_exception = str(e) + self.authenticated = False except Exception as e: self.websocket_is_running = False self.websocket_exception = str(e) + self.authenticated = False #----------------------------------------------------------------------------------------------- @@ -378,9 +544,6 @@ def main(self): self.websocket_is_running = False self.websocket_exception = None - # Checking that all the necessary elements are present for the agent execution - self.run_general_checks() - # Executing regular tasks while True: # Checking that all the necessary elements are present for the agent execution. @@ -391,9 +554,10 @@ def main(self): # 3600 / 5sec (sleep 5) = 720 if counter == 0 or counter == 720: # Sending full status - print('[reposerver-agent] Periodically sending informations about this host to the repomanager server') - self.reposerverStatusController.send_general_info() - self.reposerverStatusController.send_packages_info() + # TODO debug + # print('[reposerver-agent] Periodically sending informations about this host to the repomanager server') + # self.reposerverStatusController.send_general_info() + # self.reposerverStatusController.send_packages_info() # Reset counter counter = 0 @@ -424,6 +588,9 @@ def main(self): thread.start() except Exception as e: raise Exception('reposerver websocket connection failed: ' + str(e)) + + # If some requests logs were not sent (because the program crashed or the reposerver was unavailable for eg), then send them now + self.send_remaining_requests_logs() time.sleep(5) diff --git a/src/controllers/Package/Apt.py b/src/controllers/Package/Apt.py index 04eedf7..b92da5d 100644 --- a/src/controllers/Package/Apt.py +++ b/src/controllers/Package/Apt.py @@ -139,6 +139,29 @@ def get_available_packages(self, dist_upgrade: bool = False): return list + #----------------------------------------------------------------------------------------------- + # + # Wait for dpkg lock to be released + # Default timeout is 30 seconds + # + #----------------------------------------------------------------------------------------------- + def wait_for_dpkg_lock(self, timeout: int = 30): + import fcntl + from time import sleep + + while timeout > 0: + with open('/var/lib/dpkg/lock', 'w') as handle: + try: + fcntl.lockf(handle, fcntl.LOCK_EX | fcntl.LOCK_NB) + return + except IOError: + pass + + timeout -= 1 + sleep(1) + + raise Exception('could not acquire dpkg lock (timeout ' + str(timeout) + 's)') + #----------------------------------------------------------------------------------------------- # # Clear apt cache @@ -146,6 +169,9 @@ def get_available_packages(self, dist_upgrade: bool = False): #----------------------------------------------------------------------------------------------- def clear_cache(self): try: + # Wait for the lock to be released + self.wait_for_dpkg_lock() + self.aptcache.clear() except Exception as e: raise Exception('could not clear apt cache: ' + str(e)) @@ -159,12 +185,15 @@ def clear_cache(self): def update_cache(self): try: # Clear cache - self.aptcache.clear() + self.wait_for_dpkg_lock() + self.clear_cache() # Update cache + self.wait_for_dpkg_lock() self.aptcache.update() # Reopen cache + self.wait_for_dpkg_lock() self.aptcache.open(None) except Exception as e: diff --git a/version b/version index 0fa4ae4..fbcbf73 100644 --- a/version +++ b/version @@ -1 +1 @@ -3.3.0 \ No newline at end of file +3.4.0 \ No newline at end of file