diff --git a/sdk/python/v1alpha1/CHANGELOG.md b/sdk/python/v1alpha1/CHANGELOG.md index 061f2bc..ea48d02 100644 --- a/sdk/python/v1alpha1/CHANGELOG.md +++ b/sdk/python/v1alpha1/CHANGELOG.md @@ -14,6 +14,7 @@ and **Merged pull requests**. Critical items to know are: The versions coincide with releases on pip. Only major versions will be released as tags on Github. ## [0.0.x](https://github.com/converged-computing/metrics-operator/tree/main) (0.0.x) + - Allow getting raw logs for any metric (without parser) (0.0.19) - Refactor of structure of Operator and addition of metrics (0.0.18) - Add wait for delete function to python parser (0.0.17) - LAMMPS python parser (0.0.16) diff --git a/sdk/python/v1alpha1/metricsoperator/client.py b/sdk/python/v1alpha1/metricsoperator/client.py index 6fb482c..ac0af50 100644 --- a/sdk/python/v1alpha1/metricsoperator/client.py +++ b/sdk/python/v1alpha1/metricsoperator/client.py @@ -22,14 +22,22 @@ def __init__(self, yaml_file): self.spec = utils.read_yaml(self.yaml_file) config.load_kube_config() - def watch(self): + def watch(self, raw_logs=False, pod_prefix=None, container_name=None): """ Wait for (and yield parsed) metric logs. """ + if raw_logs and not pod_prefix: + raise ValueError("You must provide a pod_prefix to ask for raw logs.") + for metric in self.spec["spec"]["metrics"]: - parser = mutils.get_metric(metric["name"])(self.spec) + if raw_logs: + parser = mutils.get_metric()(self.spec, container_name=container_name) + else: + parser = mutils.get_metric(metric["name"])( + self.spec, container_name=container_name + ) print("Watching %s" % metric["name"]) - for pod, container in parser.logging_containers(): + for pod, container in parser.logging_containers(pod_prefix=pod_prefix): yield parser.parse(pod=pod, container=container) def create(self): @@ -65,7 +73,7 @@ def namespace(self): def name(self): return self.spec["metadata"]["name"] - def delete(self): + def delete(self, pod_prefix=None): """ Delete the associated YAML file. """ @@ -77,14 +85,14 @@ def delete(self): plural=self.plural, name=self.name, ) - self.wait_for_delete() + self.wait_for_delete(pod_prefix) return result - def wait_for_delete(self): + def wait_for_delete(self, pod_prefix=None): """ Wait for pods to be gone (deleted) """ for metric in self.spec["spec"]["metrics"]: parser = mutils.get_metric(metric["name"])(self.spec) print("Watching %s for deletion" % metric["name"]) - parser.wait_for_delete() + parser.wait_for_delete(pod_prefix=pod_prefix) diff --git a/sdk/python/v1alpha1/metricsoperator/metrics/__init__.py b/sdk/python/v1alpha1/metricsoperator/metrics/__init__.py index 4aa2532..aa1d63f 100644 --- a/sdk/python/v1alpha1/metricsoperator/metrics/__init__.py +++ b/sdk/python/v1alpha1/metricsoperator/metrics/__init__.py @@ -2,6 +2,7 @@ # (c.f. AUTHORS, NOTICE.LLNS, COPYING) import metricsoperator.metrics.app as apps +import metricsoperator.metrics.base as base import metricsoperator.metrics.network as network import metricsoperator.metrics.perf as perf import metricsoperator.metrics.storage as storage @@ -17,11 +18,12 @@ } -def get_metric(name): +def get_metric(name=None): """ Get a named metric parser. """ metric = metrics.get(name) + # If we don't have a matching metric, return base (for raw logs) if not metric: - raise ValueError(f"Metric {name} does not have a known parser") + return base.MetricBase return metric diff --git a/sdk/python/v1alpha1/metricsoperator/metrics/base.py b/sdk/python/v1alpha1/metricsoperator/metrics/base.py index 4162432..84f8b3e 100644 --- a/sdk/python/v1alpha1/metricsoperator/metrics/base.py +++ b/sdk/python/v1alpha1/metricsoperator/metrics/base.py @@ -13,6 +13,7 @@ class MetricBase: collection_end = "METRICS OPERATOR COLLECTION END" metadata_start = "METADATA START" metadata_end = "METADATA END" + container_name = None def __init__(self, spec=None, **kwargs): """ @@ -23,6 +24,10 @@ def __init__(self, spec=None, **kwargs): self.spec = spec self._core_v1 = kwargs.get("core_v1_api") + # If we don't have a default container name... + if not self.container_name: + self.container_name = kwargs.get("container_name") or "launcher" + # Load kubeconfig on Metricbase init only if self.spec is not None: config.load_kube_config() @@ -55,6 +60,14 @@ def parse(self, pod, container): ) return self.parse_log(lines) + def parse_log(self, lines): + """ + If the parser doesn't have anything, just return the lines + """ + # Get the log metadata, split lines by newline so not so hefty a log! + metadata = self.get_log_metadata(lines) + return {"data": lines.split("\n"), "metadata": metadata, "spec": self.spec} + @property def core_v1(self): """ @@ -69,12 +82,14 @@ def core_v1(self): self._core_v1 = core_v1_api.CoreV1Api() return self._core_v1 - def logging_containers(self, namespace=None, states=None, retry_seconds=5): + def logging_containers( + self, namespace=None, states=None, retry_seconds=5, pod_prefix=None + ): """ Return list of containers intended to get logs from """ containers = [] - pods = self.wait(namespace, states, retry_seconds) + pods = self.wait(namespace, states, retry_seconds, pod_prefix=pod_prefix) container_name = getattr(self, "container_name", self.container) print(f"Looking for container name {container_name}...") for pod in pods.items: @@ -90,7 +105,16 @@ def logging_containers(self, namespace=None, states=None, retry_seconds=5): ) return containers - def wait(self, namespace=None, states=None, retry_seconds=5): + def get_pod_prefix(self, pod_prefix=None): + """ + Return the default or a custom pod prefix. + """ + pod_prefix = pod_prefix or getattr(self, "pod_prefix", None) + if not pod_prefix: + raise ValueError("A pod prefix 'pod_prefix' is required to wait for pods.") + return pod_prefix + + def wait(self, namespace=None, states=None, retry_seconds=5, pod_prefix=None): """ Wait for one or more pods of interest to be done. @@ -98,9 +122,10 @@ def wait(self, namespace=None, states=None, retry_seconds=5): particular state. If looking for Termination -> gone, use wait_for_delete. """ + pod_prefix = self.get_pod_prefix(pod_prefix) namespace = namespace or self.namespace - print(f"Looking for prefix {self.pod_prefix} in namespace {namespace}") - pod_list = self.get_pods(namespace, self.pod_prefix) + print(f"Looking for prefix {pod_prefix} in namespace {namespace}") + pod_list = self.get_pods(namespace, pod_prefix) size = len(pod_list.items) # We only want logs when they are completed @@ -111,7 +136,7 @@ def wait(self, namespace=None, states=None, retry_seconds=5): ready = set() while len(ready) != size: print(f"{len(ready)} pods are ready, out of {size}") - pod_list = self.get_pods(name=self.pod_prefix, namespace=namespace) + pod_list = self.get_pods(name=pod_prefix, namespace=namespace) for pod in pod_list.items: print(f"{pod.metadata.name} is in phase {pod.status.phase}") @@ -126,16 +151,17 @@ def wait(self, namespace=None, states=None, retry_seconds=5): print(f'All pods are in states "{states}"') return pod_list - def wait_for_delete(self, namespace=None, retry_seconds=5): + def wait_for_delete(self, namespace=None, retry_seconds=5, pod_prefix=None): """ Wait for one or more pods of interest to be gone """ + pod_prefix = self.get_pod_prefix(pod_prefix) namespace = namespace or self.namespace - print(f"Looking for prefix {self.pod_prefix} in namespace {namespace}") - pod_list = self.get_pods(namespace, name=self.pod_prefix) + print(f"Looking for prefix {pod_prefix} in namespace {namespace}") + pod_list = self.get_pods(namespace, name=pod_prefix) while len(pod_list.items) != 0: print(f"{len(pod_list.items)} pods exist, waiting for termination.") - pod_list = self.get_pods(name=self.pod_prefix, namespace=namespace) + pod_list = self.get_pods(name=pod_prefix, namespace=namespace) time.sleep(retry_seconds) print("All pods are terminated.") diff --git a/sdk/python/v1alpha1/setup.py b/sdk/python/v1alpha1/setup.py index feedb54..6772e6e 100644 --- a/sdk/python/v1alpha1/setup.py +++ b/sdk/python/v1alpha1/setup.py @@ -30,7 +30,7 @@ if __name__ == "__main__": setup( name="metricsoperator", - version="0.0.18", + version="0.0.19", author="Vanessasaurus", author_email="vsoch@users.noreply.github.com", maintainer="Vanessasaurus",