Skip to content

Commit

Permalink
RHINENG-13973 execute pcp commands (#576)
Browse files Browse the repository at this point in the history
* RHINENG-13973 execute pcp commands

* fix place for Kafka commit
  • Loading branch information
r14chandra authored Feb 14, 2025
1 parent 0614d2e commit 6543bfd
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 61 deletions.
10 changes: 10 additions & 0 deletions ros/lib/pcp_extract_config
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
disk.dev.total
hinv.ncpu
kernel.all.cpu.idle
kernel.all.pressure.cpu.some.avg
kernel.all.pressure.io.full.avg
kernel.all.pressure.io.some.avg
kernel.all.pressure.memory.full.avg
kernel.all.pressure.memory.some.avg
mem.physmem
mem.util.available
198 changes: 161 additions & 37 deletions ros/processor/suggestions_engine.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import os
import json
import requests
import shutil
import subprocess
from http import HTTPStatus
from ros.lib import consume
from insights import extract
from ros.lib.config import get_logger
from contextlib import contextmanager
from tempfile import NamedTemporaryFile

import requests
from insights import extract
from prometheus_client import start_http_server

from ros.lib import consume
from ros.lib.config import (
INVENTORY_EVENTS_TOPIC,
get_logger,
METRICS_PORT,
GROUP_ID_SUGGESTIONS_ENGINE
INVENTORY_EVENTS_TOPIC,
GROUP_ID_SUGGESTIONS_ENGINE,
)


Expand All @@ -22,6 +28,144 @@ def __init__(self):
self.service = 'SUGGESTIONS_ENGINE'
self.event = None

def run_pmlogextract(self, host, index_file_path, output_dir):
"""Run the pmlogextract command."""

pmlogextract_command = [
"pmlogextract",
"-c",
"ros/lib/pcp_extract_config",
index_file_path,
output_dir
]

logging.debug(f"{self.service} - {self.event} - Running pmlogextract command for system {host.get('id')}.")
try:
subprocess.run(pmlogextract_command, check=True)
logging.debug(
f"{self.service} - {self.event} - Successfully ran pmlogextract command for system {host.get('id')}."
)
except subprocess.CalledProcessError as error:
logging.error(
f"{self.service} - {self.event} - Error running pmlogextract command for system {host.get('id')}:"
f" {error.stdout}"
)
raise

def run_pmlogsummary(self, host, output_dir):
"""Run the pmlogsummary command."""

pmlogsummary_command = [
"pmlogsummary",
"-F",
os.path.join(output_dir, ".index")
]

logging.debug(f"{self.service} - {self.event} - Running pmlogsummary command for system {host.get('id')}.")
try:
subprocess.run(pmlogsummary_command, check=True, text=True, capture_output=True)
logging.debug(
f"{self.service} - {self.event} - Successfully ran pmlogsummary command for system {host.get('id')}."
)
except subprocess.CalledProcessError as error:
logging.error(
f"{self.service} - {self.event} - Error running pmlogsummary command for system {host.get('id')}:"
f" {error.stdout}"
)
raise

def create_output_dir(self, request_id, host):
output_dir = f"/var/tmp/pmlogextract-output-{request_id}/"
if not os.path.exists(output_dir):
os.makedirs(output_dir)

logging.debug(f"Successfully created output_dir for system {host.get('id')}: {output_dir}")

return output_dir

def run_pcp_commands(self, host, index_file_path, request_id):
sanitized_request_id = request_id.replace("/", "_")
output_dir = self.create_output_dir(sanitized_request_id, host)

self.run_pmlogextract(host, index_file_path, output_dir)

self.run_pmlogsummary(host, output_dir)

logging.info(f"{self.service} - {self.event} - Successfully ran pcp commands for system {host.get('id')}.")

try:
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
logging.debug(
f"{self.service} - {self.event} - Cleaned up output directory for system {host.get('id')}."
)
except Exception as error:
logging.error(
f"{self.service} - {self.event} - Error cleaning up the output directory for system {host.get('id')}:"
f" {error}"
)

def find_root_directory(self, directory, target_file):
"""
Recursively search for the target file in the given directory - to get root directory of an archive.
"""
for dirpath, _, filenames in os.walk(directory):
if target_file in filenames:
return dirpath

return None

def get_index_file_path(self, host, extracted_dir):
extracted_dir_root = self.find_root_directory(extracted_dir, "insights_archive.txt")

if not extracted_dir_root:
logging.error(
f"{self.service} - {self.event} -"
f" insights_archive.txt not found in the extracted dir for system {host.get('id')}"
)
return None

pmlogger_dir = os.path.join(extracted_dir_root, "data/var/log/pcp/pmlogger/")

index_files = [
file for file in os.listdir(pmlogger_dir) if file.endswith(".index")
]
index_file_path = os.path.join(pmlogger_dir, index_files[0])

return index_file_path

@contextmanager
def download_and_extract(self, archive_URL, host, org_id):
logging.debug(f"{self.service} - {self.event} - Report downloading for system {host.get('id')}.")

try:
response = requests.get(archive_URL, timeout=10)
self.consumer.commit()

if response.status_code != HTTPStatus.OK:
logging.error(
f"{self.service} - {self.event} - Unable to download the report for system {host.get('id')}. "
f"ERROR - {response.reason}"
)
yield None
else:
with NamedTemporaryFile(delete=True) as tempfile:
tempfile.write(response.content)
logging.info(
f"{self.service} - {self.event} - Report downloaded successfully for system {host.get('id')}"
)
tempfile.flush()
with extract(tempfile.name) as extract_dir:
yield extract_dir
except Exception as error:
logging.error(f"{self.service} - {self.event} - Error occurred during download and extraction: {error}")

def is_pcp_collected(self, platform_metadata):
return (
platform_metadata.get('is_ros_v2') and
platform_metadata.get('is_pcp_raw_data_collected')
)

def handle_create_update(self, payload):
self.event = "Update event" if payload.get('type') == 'updated' else "Create event"

Expand All @@ -32,11 +176,19 @@ def handle_create_update(self, payload):
logging.info(f"{self.service} - {self.event} - Missing host or/and platform_metadata field(s).")
return

if not is_pcp_collected(platform_metadata):
if not self.is_pcp_collected(platform_metadata):
return

archive_URL = platform_metadata.get('url')
download_and_extract(self.service, self.event, archive_URL, host, org_id=host.get('org_id'))
with self.download_and_extract(
archive_URL,
host,
org_id=host.get('org_id')
) as ext_dir:
extracted_dir = ext_dir.tmp_dir
index_file_path = self.get_index_file_path(host, extracted_dir)
if index_file_path is not None:
self.run_pcp_commands(host, index_file_path, platform_metadata.get('request_id'))

def run(self):
logging.info(f"{self.service} - Engine is running. Awaiting msgs.")
Expand All @@ -52,46 +204,18 @@ def run(self):

if 'created' == event_type or 'updated' == event_type:
self.handle_create_update(payload)
self.consumer.commit()

except json.JSONDecodeError as error:
logging.error(f"{self.service} - {self.event} - Failed to decode message: {error}")
except Exception as error:
logging.error(f"{self.service} - {self.event} - Error processing message: {error}")

except Exception as error:
logging.error(f"{self.service} - {self.event} - error: {error}")
finally:
self.consumer.close()


def download_and_extract(service, event, archive_URL, host, org_id):
logging.info(f"{service} - {event} - Downloading the report for system {host.get('id')}.")

response = requests.get(archive_URL, timeout=10)

if response.status_code != HTTPStatus.OK:
logging.error(
f"{service} - {event} - Unable to download the report for system {host.get('id')}. "
f"ERROR - {response.reason}"
)
else:
with NamedTemporaryFile(delete=True) as tempfile:
tempfile.write(response.content)
logging.info(
f"{service} - {event} - Downloaded the report successfully for system {host.get('id')}"
)
tempfile.flush()
with extract(tempfile.name) as extract_dir:
return extract_dir.tmp_dir


def is_pcp_collected(platform_metadata):
return (
platform_metadata.get('is_ros_v2') and
platform_metadata.get('is_pcp_raw_data_collected')
)


if __name__ == "__main__":
start_http_server(int(METRICS_PORT))
processor = SuggestionsEngine()
Expand Down
Loading

0 comments on commit 6543bfd

Please sign in to comment.