Skip to content

Commit

Permalink
collect_and_parse + methods and classes deprecation
Browse files Browse the repository at this point in the history
Signed-off-by: Tullio Sebastiani <[email protected]>
  • Loading branch information
tsebastiani committed Sep 4, 2024
1 parent 98f721d commit d175548
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 33 deletions.
16 changes: 15 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "krkn-lib"
version = "0.0.0"
version = "1.0.0"
description = "Foundation library for Kraken"
authors = ["Red Hat Chaos Team"]
license = "Apache-2.0"
Expand All @@ -26,6 +26,7 @@ elasticsearch-dsl = "7.4.1"
wheel = "^0.42.0"
cython = "3.0"
numpy= "1.26.4"
deprecation="2.1.0"

[tool.poetry.group.test.dependencies]
jinja2 = "^3.1.2"
Expand Down
106 changes: 105 additions & 1 deletion src/krkn_lib/k8s/krkn_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
from functools import partial
from queue import Queue
from typing import Any, Dict, List, Optional
from krkn_lib.version import __version__

import arcaflow_lib_kubernetes
import deprecation
import kubernetes
import urllib3
import yaml
Expand All @@ -40,7 +42,7 @@
Volume,
VolumeMount,
)
from krkn_lib.models.telemetry import NodeInfo, Taint
from krkn_lib.models.telemetry import NodeInfo, Taint, ClusterEvent
from krkn_lib.utils import filter_dictionary, get_random_string
from krkn_lib.utils.safe_logger import SafeLogger

Expand Down Expand Up @@ -1535,6 +1537,12 @@ def get_pod_info(self, name: str, namespace: str = "default") -> Pod:
)
return None

@deprecation.deprecated(
deprecated_in="3.1.0",
removed_in="3.2.0",
current_version=__version__,
details="litmus support dropped from krkn",
)
def get_litmus_chaos_object(
self, kind: str, name: str, namespace: str = "default"
) -> LitmusChaosObject:
Expand Down Expand Up @@ -2480,6 +2488,82 @@ def is_pod_terminating(self, pod_name: str, namespace: str) -> bool:
except Exception:
return False

def collect_and_parse_cluster_events(
self,
start_timestamp: int,
end_timestamp: int,
local_timezone: str,
cluster_timezone: str = "UTC",
limit: int = 500,
namespace: str = None,
) -> list[ClusterEvent]:
"""
Collects cluster events querying `/api/v1/events`
filtered in a given time interval and writes them in
a temporary file in json format.
:param start_timestamp: timestamp of the minimum date
after that the event is relevant
:param end_timestamp: timestamp of the maximum date
before that the event is relevant
:param local_timezone: timezone of the local system
:param cluster_timezone: timezone of the remote cluster
:param limit: limit of the number of events to be fetched
from the cluster
:param namespace: Namespace from which the events must be
collected, if None all-namespaces will be selected
:return: Returns a list of parsed ClusterEvents
"""
events = []
try:
path_params: Dict[str, str] = {}
query_params = {"limit": limit}
header_params: Dict[str, str] = {}
auth_settings = ["BearerToken"]
header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)

path = "/api/v1/events"
if namespace:
path = f"/api/v1/namespaces/{namespace}/events"

(data) = self.api_client.call_api(
path,
"GET",
path_params,
query_params,
header_params,
response_type="str",
auth_settings=auth_settings,
)

json_obj = ast.literal_eval(data[0])
events_list = reversed(json_obj["items"])
for obj in events_list:
filtered_obj = filter_dictionary(
obj,
"firstTimestamp",
start_timestamp,
end_timestamp,
cluster_timezone,
local_timezone,
)
if filtered_obj:
events.append(ClusterEvent(k8s_json_dict=obj))

except Exception as e:
logging.error(str(e))

return events

@deprecation.deprecated(
deprecated_in="3.1.0",
removed_in="3.2.0",
current_version=__version__,
details="replaced by `collect_and_parse_cluster_events`",
)
def collect_cluster_events(
self,
start_timestamp: int,
Expand Down Expand Up @@ -2557,6 +2641,26 @@ def collect_cluster_events(
logging.error(str(e))
return None

def parse_events_from_file(
self, events_filename: str
) -> Optional[list[ClusterEvent]]:
if not events_filename or not os.path.exists(events_filename):
logging.error(f"events file do not exist {events_filename}")
return

events = []
with open(events_filename, "r") as jstream:
try:
json_obj = json.load(jstream)
for event in json_obj:
events.append(ClusterEvent(k8s_json_dict=event))
except Exception:
logging.error(
f"failed to parse events file: {events_filename}"
)

return events

def create_token_for_sa(
self, namespace: str, service_account: str, token_expiration=43200
) -> Optional[str]:
Expand Down
15 changes: 12 additions & 3 deletions src/krkn_lib/models/k8s/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from concurrent.futures import Future, ThreadPoolExecutor
from dataclasses import dataclass
from typing import Optional
from krkn_lib.version import __version__

import deprecation


@dataclass(frozen=True, order=False)
Expand Down Expand Up @@ -116,13 +119,21 @@ class Pod:
"""


@dataclass(frozen=True, order=False)
class LitmusChaosObject:
"""
Data class to hold information regarding
a custom object of litmus project
"""

@deprecation.deprecated(
deprecated_in="3.1.0",
removed_in="3.2.0",
current_version=__version__,
details="litmus support removed from krkn",
)
def __init__(self):
pass

kind: str
"""
Litmus Object Kind
Expand All @@ -149,7 +160,6 @@ class LitmusChaosObject:
"""


@dataclass(frozen=True, order=False)
class ChaosEngine(LitmusChaosObject):
"""
Data class to hold information
Expand All @@ -166,7 +176,6 @@ class ChaosEngine(LitmusChaosObject):
"""


@dataclass(frozen=True, order=False)
class ChaosResult(LitmusChaosObject):
"""
Data class to hold information
Expand Down
38 changes: 27 additions & 11 deletions src/krkn_lib/ocp/krkn_openshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def get_clusterversion_string(self) -> str:
for cv in cvs["items"]:
for condition in cv["status"]["conditions"]:
if condition["type"] == "Available":
return condition["message"].split(' ')[-1]
return condition["message"].split(" ")[-1]
return ""
except client.exceptions.ApiException as e:
if e.status == 404:
Expand Down Expand Up @@ -292,6 +292,7 @@ def collect_filter_archive_ocp_logs(
log_filter_patterns: list[str],
threads: int,
safe_logger: SafeLogger,
namespace: str = None,
oc_path: str = None,
) -> str:
"""
Expand All @@ -314,6 +315,8 @@ def collect_filter_archive_ocp_logs(
(it supports several formats by default but not every date format)
:param safe_logger: thread safe logger used to log
the output on a file stream
:param namespace: if set the logs will refer only to the provided
namespace
:param oc_path: the path of the `oc` CLI, if None will
be searched in the PATH
Expand Down Expand Up @@ -368,16 +371,29 @@ def collect_filter_archive_ocp_logs(
os.chdir(src_dir)
safe_logger.info(f"collecting openshift logs in {src_dir}...")
try:
subprocess.Popen(
[
oc,
"adm",
"must-gather",
"--kubeconfig",
kubeconfig_path,
],
stdout=subprocess.DEVNULL,
).wait()
if namespace:
subprocess.Popen(
[
oc,
"adm",
"inspect",
f"ns/{namespace}",
"--kubeconfig",
kubeconfig_path,
],
stdout=subprocess.DEVNULL,
).wait()
else:
subprocess.Popen(
[
oc,
"adm",
"must-gather",
"--kubeconfig",
kubeconfig_path,
],
stdout=subprocess.DEVNULL,
).wait()
os.chdir(cur_dir)
except Exception as e:
safe_logger.error(
Expand Down
37 changes: 28 additions & 9 deletions src/krkn_lib/telemetry/k8s/krkn_telemetry_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@
import threading
import time
import warnings
import deprecation
from queue import Queue
from typing import Optional

import requests
import urllib3
import yaml
from tzlocal.unix import get_localzone

import krkn_lib.utils as utils
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.krkn import ChaosRunAlertSummary
from krkn_lib.models.telemetry import ChaosRunTelemetry, ScenarioTelemetry
from krkn_lib.utils.safe_logger import SafeLogger
from krkn_lib.version import __version__


class KrknTelemetryKubernetes:
Expand Down Expand Up @@ -522,23 +525,32 @@ def set_parameters_base64(
raise Exception("telemetry: {0}".format(str(e)))
scenario_telemetry.parameters_base64 = input_file_base64

@deprecation.deprecated(
deprecated_in="3.1.0",
removed_in="3.2.0",
current_version=__version__,
details="Cluster events has been added to the telemetry json,"
"so won't be uploaded as separated file",
)
def put_cluster_events(
self,
request_id: str,
telemetry_config: dict,
events_filename: str,
namespace: str = None,
start_timestamp: int,
end_timestamp: int,
cluster_timezone: str = "UTC",
):
"""
Collects and puts cluster events on telemetry S3 bucket
:param request_id: uuid of the session that will represent the
S3 folder on which the prometheus files will be stored
:param telemetry_config: telemetry section of kraken config.yaml
:param events_filename: the filename of the events
collected by collect_cluster_events
:param namespace: if set the namespace name will be appended to
file on S3
:param start_timestamp: timestamp of the minimum date
after that the event is relevant
:param end_timestamp: timestamp of the maximum date
before that the event is relevant
:param cluster_timezone: timezone of the remote cluster
"""

queue = Queue()
Expand Down Expand Up @@ -570,7 +582,14 @@ def put_cluster_events(
if len(exceptions) > 0:
raise Exception(", ".join(exceptions))

if not events_filename or not os.path.exists(events_filename):
events_file = self.kubecli.collect_cluster_events(
start_timestamp,
end_timestamp,
str(get_localzone()),
cluster_timezone,
limit=500,
)
if not events_file:
self.safe_logger.warning(
"no cluster events found in the specified time interval"
)
Expand All @@ -579,7 +598,7 @@ def put_cluster_events(
# this parameter has doesn't have an utility in this context
# used to match the method signature and reuse it (Poor design?)
uploaded_files = list[str]()
queue.put((0, events_filename, 0))
queue.put((0, events_file, 0))
queue_size = queue.qsize()
self.safe_logger.info("uploading cluster events...")

Expand All @@ -596,7 +615,7 @@ def put_cluster_events(
0,
uploaded_files,
max_retries,
"events-" if not namespace else f"events-{namespace}-",
"events-",
".json",
),
)
Expand Down
Loading

0 comments on commit d175548

Please sign in to comment.