diff --git a/lxc_autoscale_ml/monitor/lxc_monitor.py b/lxc_autoscale_ml/monitor/lxc_monitor.py index d738758..9065d4e 100644 --- a/lxc_autoscale_ml/monitor/lxc_monitor.py +++ b/lxc_autoscale_ml/monitor/lxc_monitor.py @@ -8,6 +8,8 @@ from subprocess import check_output, CalledProcessError from datetime import datetime from concurrent.futures import ThreadPoolExecutor +from typing import Any, Dict, List, Optional, Tuple +import aiofiles # Load configuration from YAML file with open("/etc/lxc_autoscale/lxc_monitor.yaml", 'r') as config_file: @@ -31,7 +33,7 @@ # Timed rotating file handler with log retention file_handler = TimedRotatingFileHandler(LOG_FILE, when="midnight", interval=1, backupCount=LOG_BACKUP_COUNT) file_handler.setFormatter(formatter) -file_handler.suffix = "%Y-%m-%d" # Filename will be suffixed with the date +file_handler.suffix = "%Y-%m-%d" # Add handlers to the logger logger.addHandler(console_handler) @@ -49,25 +51,24 @@ RETRY_LIMIT = config['monitoring'].get('retry_limit', 3) # Maximum retry attempts RETRY_DELAY = config['monitoring'].get('retry_delay', 2) # Delay between retries in seconds -def get_running_lxc_containers(): +def get_running_lxc_containers() -> List[str]: """Retrieve a list of running LXC containers.""" try: pct_output = check_output(['pct', 'list'], text=True).splitlines() - running_containers = [line.split()[0] for line in pct_output if 'running' in line] - return running_containers + return [line.split()[0] for line in pct_output if 'running' in line] except CalledProcessError as e: logger.error(f"Error retrieving LXC containers: {e}") return [] -def run_command(command): +def run_command(command: List[str]) -> Optional[str]: """Run a shell command in a thread pool.""" try: return check_output(command, text=True) except CalledProcessError as e: - logger.error(f"Command failed: {command}, error: {e}") + logger.error(f"Command failed: {' '.join(command)}, error: {e}") return None -async def retry_on_failure(func, *args, **kwargs): +async def retry_on_failure(func: Any, *args, **kwargs) -> Any: """Retry logic wrapper for transient failures.""" for attempt in range(RETRY_LIMIT): try: @@ -80,10 +81,31 @@ async def retry_on_failure(func, *args, **kwargs): logger.error(f"All {RETRY_LIMIT} attempts failed for {func.__name__}.") raise -async def get_container_cpu_usage(container_id, executor): +async def get_container_metric(command: List[str], executor: ThreadPoolExecutor) -> Optional[str]: + """Helper function to execute commands asynchronously in containers.""" + return await asyncio.get_event_loop().run_in_executor(executor, run_command, command) + +async def parse_meminfo(container_id: str, executor: ThreadPoolExecutor) -> Dict[str, float]: + """Retrieve memory and swap usage inside the container.""" + mem_info = {} + for metric, key in [('MemTotal', 'memory_usage_mb'), ('MemAvailable', 'memory_free_mb'), + ('SwapTotal', 'swap_total_mb'), ('SwapFree', 'swap_free_mb')]: + command = ['pct', 'exec', container_id, '--', 'grep', metric, '/proc/meminfo'] + result = await get_container_metric(command, executor) + if result: + try: + mem_info[key] = int(result.split()[1]) / 1024 # Convert to MB + except ValueError: + logger.warning(f"Unexpected memory info format for container {container_id}: {result}") + # Calculate used memory and swap usage + mem_info['memory_usage_mb'] = mem_info.get('memory_usage_mb', 0.0) - mem_info.get('memory_free_mb', 0.0) + mem_info['swap_usage_mb'] = mem_info.get('swap_total_mb', 0.0) - mem_info.get('swap_free_mb', 0.0) + return mem_info + +async def get_container_cpu_usage(container_id: str, executor: ThreadPoolExecutor) -> float: """Use pct exec to retrieve CPU usage inside the container.""" command = ['pct', 'exec', container_id, '--', 'grep', 'cpu ', '/proc/stat'] - result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command) + result = await get_container_metric(command, executor) if result: fields = result.split() if len(fields) < 5: @@ -91,47 +113,13 @@ async def get_container_cpu_usage(container_id, executor): return 0.0 idle_time = int(fields[4]) total_time = sum(int(field) for field in fields[1:]) - cpu_usage = 100 * (1 - (idle_time / total_time)) - return cpu_usage + return 100 * (1 - (idle_time / total_time)) return 0.0 -async def get_container_memory_usage(container_id, executor): - """Use pct exec to retrieve memory and swap usage inside the container.""" - try: - command = ['pct', 'exec', container_id, '--', 'grep', 'MemTotal', '/proc/meminfo'] - result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command) - mem_total_kb = int(result.split()[1]) - - command = ['pct', 'exec', container_id, '--', 'grep', 'MemAvailable', '/proc/meminfo'] - result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command) - mem_available_kb = int(result.split()[1]) - - mem_used_kb = mem_total_kb - mem_available_kb - - swap_used_kb = swap_total_kb = 0 - if ENABLE_SWAP: - command = ['pct', 'exec', container_id, '--', 'grep', 'SwapTotal', '/proc/meminfo'] - result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command) - swap_total_kb = int(result.split()[1]) - - command = ['pct', 'exec', container_id, '--', 'grep', 'SwapFree', '/proc/meminfo'] - result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command) - swap_free_kb = int(result.split()[1]) - swap_used_kb = swap_total_kb - swap_free_kb - - return { - "memory_usage_mb": mem_used_kb / 1024, # Convert to MB - "swap_usage_mb": swap_used_kb / 1024, # Convert to MB - "swap_total_mb": swap_total_kb / 1024 # Convert to MB - } - except (CalledProcessError, ValueError) as e: - logger.warning(f"Failed to get memory/swap usage for container {container_id}: {e}") - return {"memory_usage_mb": 0.0, "swap_usage_mb": 0.0, "swap_total_mb": 0.0} - -async def get_container_io_stats(container_id, executor): - """Use pct exec to retrieve I/O statistics inside the container.""" +async def get_container_io_stats(container_id: str, executor: ThreadPoolExecutor) -> Dict[str, int]: + """Retrieve I/O statistics inside the container.""" command = ['pct', 'exec', container_id, '--', 'grep', '', '/proc/diskstats'] - result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command) + result = await get_container_metric(command, executor) if result: io_stats_lines = result.splitlines() io_stats = {"reads": 0, "writes": 0} @@ -143,20 +131,18 @@ async def get_container_io_stats(container_id, executor): device = fields[2] if any(device.startswith(exclude) for exclude in EXCLUDED_DEVICES): continue - reads = int(fields[5]) - writes = int(fields[9]) - io_stats["reads"] += reads - io_stats["writes"] += writes + io_stats["reads"] += int(fields[5]) + io_stats["writes"] += int(fields[9]) return io_stats return {} -async def get_container_network_usage(container_id, executor): - """Use pct exec to retrieve network usage inside the container.""" +async def get_container_network_usage(container_id: str, executor: ThreadPoolExecutor) -> Dict[str, int]: + """Retrieve network usage inside the container.""" if not ENABLE_NETWORK: return {"rx_bytes": 0, "tx_bytes": 0} command = ['pct', 'exec', container_id, '--', 'cat', '/proc/net/dev'] - result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command) + result = await get_container_metric(command, executor) if result: net_stats_lines = result.splitlines()[2:] # Skip headers rx_bytes, tx_bytes = 0, 0 @@ -172,18 +158,14 @@ async def get_container_network_usage(container_id, executor): return {"rx_bytes": rx_bytes, "tx_bytes": tx_bytes} return {"rx_bytes": 0, "tx_bytes": 0} -async def get_container_filesystem_usage(container_id, executor): - """Use pct exec to retrieve filesystem usage inside the container.""" +async def get_container_filesystem_usage(container_id: str, executor: ThreadPoolExecutor) -> Dict[str, float]: + """Retrieve filesystem usage inside the container.""" if not ENABLE_FILESYSTEM: return {"filesystem_usage_gb": 0, "filesystem_total_gb": 0, "filesystem_free_gb": 0} - try: - command = ['pct', 'exec', container_id, '--', 'df', '-m', '/'] - result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command) - if not result: - command = ['pct', 'exec', container_id, '--', 'df', '-BM', '/'] - result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command) - + command = ['pct', 'exec', container_id, '--', 'df', '-m', '/'] + result = await get_container_metric(command, executor) + if result: lines = result.splitlines() if len(lines) < 2: logger.warning(f"Unexpected filesystem stats format for container {container_id}: {lines}") @@ -203,27 +185,26 @@ async def get_container_filesystem_usage(container_id, executor): "filesystem_total_gb": filesystem_total_gb, "filesystem_free_gb": filesystem_free_gb } - except CalledProcessError as e: - logger.warning(f"Failed to get filesystem usage for container {container_id}: {e}") - return {"filesystem_usage_gb": 0, "filesystem_total_gb": 0, "filesystem_free_gb": 0} + return {"filesystem_usage_gb": 0, "filesystem_total_gb": 0, "filesystem_free_gb": 0} -async def get_container_process_count(container_id, executor): - """Use pct exec to retrieve the number of processes running inside the container.""" +async def get_container_process_count(container_id: str, executor: ThreadPoolExecutor) -> int: + """Retrieve the number of processes running inside the container.""" command = ['pct', 'exec', container_id, '--', 'ps', '-e'] - result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command) + result = await get_container_metric(command, executor) if result: lines = result.splitlines() - # Check if the first line contains a header (e.g., "PID TTY TIME CMD") and remove it if lines and any(header in lines[0] for header in ["PID", "TTY", "TIME", "CMD"]): - lines = lines[1:] + lines = lines[1:] # Remove header line return len(lines) return 0 -async def collect_metrics_for_container(container_id, executor): +async def collect_metrics_for_container(container_id: str, executor: ThreadPoolExecutor) -> Tuple[str, Dict[str, Any]]: + """Collect all metrics for a given container.""" logger.info(f"Collecting metrics for container: {container_id}") + # Use retry logic for each metric collection cpu_usage = await retry_on_failure(get_container_cpu_usage, container_id, executor) - memory_swap_usage = await retry_on_failure(get_container_memory_usage, container_id, executor) + memory_swap_usage = await retry_on_failure(parse_meminfo, container_id, executor) io_stats = await retry_on_failure(get_container_io_stats, container_id, executor) network_usage = await retry_on_failure(get_container_network_usage, container_id, executor) filesystem_usage = await retry_on_failure(get_container_filesystem_usage, container_id, executor) @@ -232,9 +213,9 @@ async def collect_metrics_for_container(container_id, executor): container_metrics = { "timestamp": datetime.now().isoformat(), "cpu_usage_percent": cpu_usage, - "memory_usage_mb": memory_swap_usage["memory_usage_mb"], - "swap_usage_mb": memory_swap_usage["swap_usage_mb"], - "swap_total_mb": memory_swap_usage["swap_total_mb"], + "memory_usage_mb": memory_swap_usage.get("memory_usage_mb", 0.0), + "swap_usage_mb": memory_swap_usage.get("swap_usage_mb", 0.0), + "swap_total_mb": memory_swap_usage.get("swap_total_mb", 0.0), "process_count": process_count, "io_stats": io_stats, "network_usage": network_usage, @@ -243,15 +224,12 @@ async def collect_metrics_for_container(container_id, executor): "filesystem_free_gb": filesystem_usage["filesystem_free_gb"] } - logger.info(f"Metrics for {container_id}: " - f"CPU: {cpu_usage:.2f}%, Memory: {memory_swap_usage['memory_usage_mb']:.2f} MB, " - f"Swap Used: {memory_swap_usage['swap_usage_mb']:.2f} MB, Processes: {process_count}, " - f"Network: {network_usage}, Filesystem Used: {filesystem_usage['filesystem_usage_gb']} GB, " - f"Filesystem Free: {filesystem_usage['filesystem_free_gb']} GB, I/O: {io_stats}") + logger.info(f"Metrics for {container_id}: {container_metrics}") return container_id, container_metrics async def collect_and_export_metrics(): + """Collect and export metrics for all running LXC containers.""" start_time = datetime.now() metrics = {} containers = get_running_lxc_containers() @@ -262,16 +240,15 @@ async def collect_and_export_metrics(): logger.debug(f"Found {len(containers)} running containers.") - executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) - - if PARALLEL_PROCESSING: - tasks = [collect_metrics_for_container(container_id, executor) for container_id in containers] - results = await asyncio.gather(*tasks) - else: - results = [] - for container_id in containers: - result = await collect_metrics_for_container(container_id, executor) - results.append(result) + async with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + if PARALLEL_PROCESSING: + tasks = [collect_metrics_for_container(container_id, executor) for container_id in containers] + results = await asyncio.gather(*tasks) + else: + results = [] + for container_id in containers: + result = await collect_metrics_for_container(container_id, executor) + results.append(result) # Ensure results are processed correctly for container_id, container_metrics in results: @@ -292,44 +269,54 @@ async def collect_and_export_metrics(): metrics["summary"] = summary # Load existing data if the file exists - existing_data = [] - if os.path.exists(EXPORT_FILE): - try: - with open(EXPORT_FILE, "r") as json_file: - existing_data = json.load(json_file) - if not isinstance(existing_data, list): - logger.error(f"Data in {EXPORT_FILE} is not a list. Resetting data to an empty list.") - existing_data = [] - logger.debug(f"Loaded existing metrics from {EXPORT_FILE}.") - except (IOError, json.JSONDecodeError) as e: - logger.warning(f"Failed to read existing data from {EXPORT_FILE}: {e}") - existing_data = [] + existing_data = await load_existing_data(EXPORT_FILE) # Append the new metrics existing_data.append(metrics) logger.debug(f"Appending new metrics: {metrics}") - # Write the updated data to a temporary file first - temp_file = f"{EXPORT_FILE}.tmp" + # Write the updated data to the file + await write_metrics_to_file(EXPORT_FILE, existing_data) + +async def load_existing_data(file_path: str) -> List[Dict[str, Any]]: + """Load existing data from the JSON file.""" + if not os.path.exists(file_path): + return [] + + try: + async with aiofiles.open(file_path, mode='r') as json_file: + content = await json_file.read() + data = json.loads(content) + if not isinstance(data, list): + logger.error(f"Data in {file_path} is not a list. Resetting to an empty list.") + return [] + logger.debug(f"Loaded existing metrics from {file_path}.") + return data + except (IOError, json.JSONDecodeError) as e: + logger.warning(f"Failed to read existing data from {file_path}: {e}") + return [] + +async def write_metrics_to_file(file_path: str, data: List[Dict[str, Any]]): + """Write metrics data to a JSON file asynchronously.""" + temp_file = f"{file_path}.tmp" try: - with open(temp_file, "w") as json_file: - json.dump(existing_data, json_file, indent=4) - # Rename the temp file to the original file, ensuring atomic operation - os.replace(temp_file, EXPORT_FILE) - logger.info(f"Metrics successfully exported to {EXPORT_FILE}") + async with aiofiles.open(temp_file, mode='w') as json_file: + await json_file.write(json.dumps(data, indent=4)) + os.replace(temp_file, file_path) + logger.info(f"Metrics successfully exported to {file_path}") except IOError as e: - logger.error(f"Failed to write metrics to {EXPORT_FILE}: {e}") - # If there's an error, remove the temporary file to avoid partial writes + logger.error(f"Failed to write metrics to {file_path}: {e}") if os.path.exists(temp_file): os.remove(temp_file) async def monitor_and_export(): + """Continuously monitor and export metrics at the defined intervals.""" try: while True: logger.info("Starting new metrics collection cycle.") await collect_and_export_metrics() - logger.info(f"Waiting for {CHECK_INTERVAL} seconds before next cycle.") + logger.info(f"Waiting for {CHECK_INTERVAL} seconds before the next cycle.") await asyncio.sleep(CHECK_INTERVAL) except KeyboardInterrupt: logger.info("Shutting down metrics collector due to KeyboardInterrupt.")