diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6769e21 --- /dev/null +++ b/.gitignore @@ -0,0 +1,160 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..08a6335 --- /dev/null +++ b/main.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 + +# standard imports +from datetime import datetime as dt + +# third party imports +import click + +# first party imports +from utilities import DataCleaner +from utilities import QueryManager +from utilities import Query +from utilities import QuerySplitter +from utilities.semaphore import ThreadManager + + +@click.command() +@click.option("-a", "--api-endpoint", default=None, required=True, help="api endpoint to query against") +@click.option( + "-c", + "--cert", + default=None, + help="relative path to the certificate which is used to create the request", +) +@click.option( + "-t", + "--timeout", + default=30, + help="number of seconds the client will wait for the server to send a response", + show_default=True, + type=int, +) +@click.option( + "-k", + "--kwargs", + default=None, + help="parameters for the query; supported keys: target, params\ntarget > specifies a target behind the api endpoint\nparams > sets specific parameters for the query\n\tsupported parameters are:\n\t - 'query'\n\t - 'dedup'\n\t - 'partial_response'\n\t - 'step'\n\t - 'max_source_resolution'\n\t - 'engine'\n\t - 'analyze'", +) +@click.option("-p", "--directory-path", default=None, help="directory path in which the query results are stored") +@click.option( + "-b", + "--threshold", + default=None, + help="Threshold which specifies when the data are interpolated by Thanos\nThis helps splitting the queries due to efficiency and resource optimization", + type=int, +) +# option for max long term storage +def main( + api_endpoint: str = None, + cert: str = None, + timeout: int = 30, + kwargs: dict = None, + storage_path: str = None, + threshold: int = None, +): + start = dt.now() + if kwargs is None: + kwargs = {} + + tm = ThreadManager(12) + qm = QueryManager(cert=cert, timeout=timeout, storage_path=storage_path, threshold=threshold, thread_manager=tm) + + query = Query(base_url=api_endpoint) + + queries = QuerySplitter.split_by_treshold(QuerySplitter(), query=query, threshold=threshold) + + query_uuids = [qm.add_query_queue() for i in range(len(queries))] + + if not queries[0] is None: + qm.create_query_objects(query_queue_uuid=query_uuids[0], query=queries[0], separator=60 * 60 * 24) + + if not queries[1] is None: + qm.create_query_objects(query_queue_uuid=query_uuids[1], query=queries[1], separator=60 * 60 * 24 * 90) + + qm.create_environments() + + for query_uuid in query_uuids: + if len(qm.queues[query_uuid].query_objects) == 0: + continue + qm.queues[query_uuid].schedule_queries() + + tm.execute_all_threads() + end = dt.now() + print(f"Downloading data lastet: {(end - start)} seconds.") + + start = dt.now() + + max_index = 2 + paths = [None for i in range(max_index)] + dc = DataCleaner() + + for index in range(max_index): + query_uuid = query_uuids[index] + + if not len(qm.queues[query_uuid].query_objects) == 0: + queue = qm.queues[query_uuid] + paths[index] = queue.path + + if not paths[index] is None: + if index == 1: + dc.clear_query_results(path=paths[index], step=3600) + continue + + dc.clear_query_results(path=paths[index], step=60) + + end = dt.now() + print(f"Cleaning data lastet: {(end - start)} seconds.") + + +if __name__ == "__main__": + main() + print("finished…") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c5d76e9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +click=>8,<9 +requests>=2,<3 +setuptools>=75 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..b025501 --- /dev/null +++ b/setup.py @@ -0,0 +1,8 @@ +from setuptools import setup + +setup( + name="main", + version="0.1.0", + py_modules=["main", "utilities"], + install_requires=["click", "requests"], +) diff --git a/tests/Test_Filter.py b/tests/Test_Filter.py new file mode 100644 index 0000000..b1492d9 --- /dev/null +++ b/tests/Test_Filter.py @@ -0,0 +1,102 @@ +import unittest + +from utilities.data_filter import create_time_ranges, remove_state_from_timestamp_value + +# TODO consider outsorcing into corresponding + + +class TestFilter(unittest.TestCase): + def test_remove_state_from_timestamp_value_with_normal_input(self): + data = [[0.0, "1"], [13.0, "1"]] + + expected_result = [0.0, 13] + result = remove_state_from_timestamp_value(data=data) + + self.assertEqual(result, expected_result) + + def test_remove_state_from_timestamp_value_with_empty_input(self): + data = [] + + expected_result = [] + result = remove_state_from_timestamp_value(data=data) + + self.assertEqual(result, expected_result) + + def test_remove_state_from_timestamp_value_with_wrong_inputs_1(self): + data = "ABAP" + + with self.assertRaises(TypeError): + remove_state_from_timestamp_value(data=data) + + def test_remove_state_from_timestamp_value_with_wrong_inputs_2(self): + data = ["ABAP"] + + with self.assertRaises(TypeError): + remove_state_from_timestamp_value(data=data) + + def test_remove_state_from_timestamp_value_with_wrong_inputs_3(self): + data = [["ABAP"]] + + with self.assertRaises(TypeError): + remove_state_from_timestamp_value(data=data) + + def test_create_time_ranges_with_normal_input(self): + data = [0, 5, 10, 15, 35, 50, 55, 60, 65, 67, 68, 69, 73, 78, 83, 88, 90] + step = 5 + + expected_result = [(0, 15), (35, 0), (50, 15), (67, 0), (68, 0), (69, 0), (73, 15), (90, 0)] + + result = create_time_ranges(data=data, step=step) + + self.assertEqual(result, expected_result) + + def test_create_time_ranges_with_empty_input(self): + data = [] + step = 5 + + expected_result = [] + + result = create_time_ranges(data=data, step=step) + + self.assertEqual(result, expected_result) + + def test_create_time_ranges_with_continous_input(self): + data = [0, 5, 10, 15, 20, 25] + step = 5 + + expected_result = [(0, 25)] + + result = create_time_ranges(data=data, step=step) + + self.assertEqual(result, expected_result) + + def test_create_time_ranges_with_continous_double_input(self): + data = [0, 5, 10, 15, 20, 20, 25] + step = 5 + + expected_result = [(0, 25)] + + result = create_time_ranges(data=data, step=step) + + self.assertEqual(result, expected_result) + + def test_create_time_ranges_with_one_input(self): + data = [77] + step = 5 + + expected_result = [(77, 0)] + + result = create_time_ranges(data=data, step=step) + + self.assertEqual(result, expected_result) + + def test_create_time_ranges_with_wrong_input(self): + data = "ABAP" + step = 5 + + with self.assertRaises(TypeError): + create_time_ranges(data=data, step=step) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/Test_Query.py b/tests/Test_Query.py new file mode 100644 index 0000000..2f2cc15 --- /dev/null +++ b/tests/Test_Query.py @@ -0,0 +1,41 @@ +import unittest + +from datetime import datetime as dt +from datetime import timedelta as td + +from utilities import Query + +API_ENDPOINT = "https://metrics-internal.qa-de-1.cloud.sap/api/v1/" + + +class TestQuery(unittest.TestCase): + def test_query_args_parsing(self): + base_url = API_ENDPOINT + + now = dt.now() + start = str(now.timestamp()) + end = str((now - td(hours=5)).timestamp()) + kwargs = {"params": {"step": "3600"}} + + query = Query(base_url=base_url, start=start, end=end, kwargs=kwargs) + + target = "query_range" + + params = { + "query": "ALERTS", + "dedup": "true", + "partial_response": "false", + "start": start, + "end": end, + "step": "3600", + "max_source_resolution": "0s", + "engine": "thanos", + "analyze": "false", + } + + self.assertEqual(query.params, params) + self.assertEqual(query.target, target) + + +if __name__ == "__main__": + unittest.main() diff --git a/utilities/__init__.py b/utilities/__init__.py new file mode 100644 index 0000000..ce21054 --- /dev/null +++ b/utilities/__init__.py @@ -0,0 +1,2 @@ +from utilities.query_management import Query, QueryExecutor, QueryManager, QuerySplitter +from utilities.data_cleaner import DataCleaner diff --git a/utilities/calc.py b/utilities/calc.py new file mode 100644 index 0000000..5dcecf9 --- /dev/null +++ b/utilities/calc.py @@ -0,0 +1,7 @@ +from datetime import datetime as dt +from datetime import timedelta as td + + +def calculate_past_five_years_timestamp(start: dt) -> float: + past = start - td(weeks=52 * 5) # assume that one year has 52 weeks + return past.timestamp() diff --git a/utilities/data_cleaner.py b/utilities/data_cleaner.py new file mode 100644 index 0000000..de8d3f7 --- /dev/null +++ b/utilities/data_cleaner.py @@ -0,0 +1,75 @@ +"""Module documentation""" + +import json +import os + +from utilities.data_filter import create_time_ranges + + +class DataCleaner(object): + def __init__(self): + self.data = None + self.metric_index_map = {} + + def __reset(self): + self.data = None + self.metric_index_map = {} + + def clear_query_results(self, path: str, step: int): + groups = os.listdir(path) + + print("Scanning files") + files = [ + os.path.join(path, group, file) + for group in groups + if group.startswith("group") + for file in os.listdir(os.path.join(path, group)) + ] + + print("Staging files") + with open(file=files[0], mode="r", encoding="utf-8") as f: + data = json.load(f) + self.data = data["data"]["result"] + + for index, result in enumerate(self.data): + metric = result["metric"] + flatted_key = str(metric) + self.metric_index_map[flatted_key] = index + + for file in files[1:]: + print(f"file {files.index(file)+1} from {len(files)}", end="\r") + with open(file=file, mode="r", encoding="utf-8") as f: + sub_data = json.load(f) + + if sub_data["status"] == "error": + continue + + self.__assert_index_to_metrics(results=sub_data["data"]["result"]) + + for result in self.data: + result["values"] = sorted(set(result["values"])) + + for result in self.data: + result["values"] = create_time_ranges(data=result["values"], step=step) + + print("Writing files") + with open(file=os.path.join(path, "finalData.json"), mode="w", encoding="utf-8") as f: + f.write(json.dumps(self.data, indent=4)) + + self.__reset() + + def __assert_index_to_metrics(self, results) -> int | None: + for result in results: + metric = result["metric"] + flatted_key = str(metric) + + metric_index = self.metric_index_map.get(flatted_key, -1) + + if metric_index == -1: + length = len(self.metric_index_map) + self.metric_index_map[flatted_key] = length + self.data.append(result) + else: + self.data[metric_index]["values"].extend(result["values"]) + + return diff --git a/utilities/data_filter.py b/utilities/data_filter.py new file mode 100644 index 0000000..ad3b71d --- /dev/null +++ b/utilities/data_filter.py @@ -0,0 +1,98 @@ +def remove_state_from_timestamp_value(data: list[list]) -> list[int]: + """ + Extracts the first element (assumed to be a timestamp) from each sublist in the input list. + + Args: + data (list[list]): A list of lists, where each sublist contains at least one element, + and the first element is expected to be a float representing a timestamp. + + Returns: + list[int]: A list of the first elements (timestamps) from each sublist in the input list. + + Raises: + TypeError: If the input data is not a list of lists, or if the first element of any sublist is not a float. + """ + + out = [] + + if not isinstance(data, list): + raise TypeError(f"Invalid input format: isinstance(data, list) = {isinstance(data, list)}; type: {type(data)}") + + for value in data: + if not isinstance(value, list): + raise TypeError( + f"Invalid input format: isinstance(value, list) = {isinstance(value, list)}; type: {type(value)}" + ) + + if not isinstance(value[0], float): + raise TypeError( + f"Invalid input format: isinstance(value[0], float) = {isinstance(value[0], float)}; type: {type(value[0])}" + ) + + out.append(value[0]) + + return out + + +def create_time_ranges(data: list[float], step: int) -> list[tuple]: + """ + Create time ranges from a list of float values. + + This function takes a list of float values and a step value, and returns a list of tuples. + Each tuple represents a time range, where the first element is the start value and the second + element is the duration (difference between the start and end values). + + Args: + data (list[float]): A list of float values representing time points. + step (int): The step value to determine the continuity of the time ranges. + + Returns: + list[tuple]: A list of tuples, where each tuple contains the start value and the duration + of a time range. + + Raises: + TypeError: If the input data is not a list. + + Example: + >>> create_time_ranges([1.0, 2.0, 3.0, 5.0, 6.0], 1) + [(1.0, 2.0), (5.0, 1.0)] + """ + + if not isinstance(data, list): + raise TypeError(f"Invalid input format: isinstance(data, list) = {isinstance(data, list)}; type: {type(data)}") + + out = [] + + start = None + prev = None + + if len(data) == 0: + return out + + for value in data[:-1]: + if start is None: + start = prev = value + continue + + if value == prev: + continue + + if value != prev + step: # consider using > + out.append((start, prev - start)) + start = prev = value + continue + + prev = value + + value = data[-1] + if start is None: + out.append((value, 0)) + return out + + if not value == prev + step: # consider using > + out.append((start, prev - start)) + out.append((value, 0)) + else: + out.append((start, value - start)) + + return out diff --git a/utilities/errors.py b/utilities/errors.py new file mode 100644 index 0000000..c2f7e5c --- /dev/null +++ b/utilities/errors.py @@ -0,0 +1,3 @@ +class InvalidQueryQueueError(Exception): + def __init__(self, *args): + super().__init__(*args) diff --git a/utilities/helper.py b/utilities/helper.py new file mode 100644 index 0000000..8b43319 --- /dev/null +++ b/utilities/helper.py @@ -0,0 +1,19 @@ +import requests + + +class ResponseDummy(requests.Response): + def __init__(self, value): + self.value = value + self._text = value + super().__init__() + + def json(self, **kwargs): + return self.value + + @property + def text(self): + return self._text + + @text.setter + def text(self, value): + self._text = value diff --git a/utilities/query_management.py b/utilities/query_management.py new file mode 100644 index 0000000..df11e21 --- /dev/null +++ b/utilities/query_management.py @@ -0,0 +1,747 @@ +""" +Module: query_management + +This module provides classes and methods for managing and executing queries, handling query results, and managing query queues. It includes functionality for splitting queries, executing them, and storing the results. + +Classes: + QueryExecutor: + Executes a given query and handles the result. Can split the query if the result exceeds a maximum threshold. + + Query: + Represents a query with parameters such as base URL, start and end times, and additional keyword arguments. Provides methods to initialize, execute, and set request parameters. + + QueryManager: + Manages multiple query queues, creates query objects, and sets up environments for query execution. + + QueryObject: + Represents a single query object within a query queue. Responsible for creating its environment and executing the query. + + QueryQueue: + Manages a queue of query objects. Creates the environment for the queue and schedules the execution of queries. + + QuerySplitter: + Provides methods to split queries based on a threshold or a separator. + +Functions: + None + +Usage: + This module is intended to be used for managing and executing large sets of queries, particularly in environments where queries need to be split and executed in parallel. It handles the complexities of query execution, result handling, and error management. + +Dependencies: + - copy + - json + - os + - uuid + - datetime + - requests + - utilities.calc + - utilities.errors + - utilities.data_filter + - utilities.response_messages + - utilities.semaphore + - utilities.helper.ResponseDummy +""" + +from __future__ import annotations + +import copy +import json +import os +import uuid + +from datetime import datetime as dt +from datetime import timedelta as td +from datetime import timezone as tz + +import requests + +from utilities import calc +from utilities import errors +from utilities import data_filter +from utilities import response_messages +from utilities import semaphore +from utilities.helper import ResponseDummy + +# TODO add META information to query -> QueryGroup + + +class QueryExecutor: + """ + A class to manage and execute queries, handle their results, and split queries if necessary. + + Attributes: + path (str): The file path where query results will be saved. + query (Query): The current query being executed. + chunk (int): The chunk number for splitting query results. + + Methods: + __init__(path: str): + Initializes the QueryExecutor with a given file path. + + execute_query(query: Query): + Executes a given query and handles the result. + + reset(): + Resets the query and chunk attributes to their initial state. + """ + + def __init__(self, path: str): + self.query = None + self.path = path + self.chunk = 0 + + def execute_query(self, query: Query): + """ + Executes the given query and handles the result. + + Args: + query (Query): The query object to be executed. + + Returns: + None + """ + self.query = query + result = query.execute() + self.__handle_query_result(result=result) + + def reset(self): + """ + Resets the query and chunk attributes to their initial states. + + This method sets the `query` attribute to None and the `chunk` attribute to 0. + """ + self.query = None + self.chunk = 0 + + def __handle_query_result(self, result: dict): + if result["status"] == "success": + for index, value in enumerate(result["data"]["result"]): + value["values"] = data_filter.remove_state_from_timestamp_value(value["values"]) + result["data"]["result"][index] = value + + filename = os.path.join(self.path, f"data{self.chunk}.json") + + with open(file=filename, mode="w", encoding="utf-8") as f: + f.write(json.dumps(result, indent=4)) + elif result == response_messages.MESSAGE_EXCEEDED_MAXIMUM: + query1, query2 = self.__split_request_by_half(self.query) + + self.execute_query(query=query1) + self.chunk += 1 + self.execute_query(query=query2) + self.chunk += 1 + + return + + def __split_request_by_half(self, query: Query) -> tuple[Query, Query]: + start_tt = float(query.global_start) + end_tt = float(query.global_end) + + time_difference = end_tt - start_tt + half = time_difference / 2 + mid = end_tt - half + + query1 = copy.deepcopy(query) + query2 = copy.deepcopy(query) + + query1.global_start = mid + query2.global_end = mid + + return (query1, query2) + + +class Query(object): + """ + A class to manage and execute queries with specified parameters. + + Attributes: + base_url (str): The base URL for the query. + global_start (str): The start time for the query. + global_end (str): The end time for the query. + kwargs (dict): Additional keyword arguments for the query. + path (str): The path for the query. + cert (str): The certificate for the query. + params (dict): The parameters for the query. + target (str): The target for the query. + timeout (int): The timeout for the query. + + Methods: + initialize(): + Initializes the query parameters. + execute(): + Executes the query and returns the result. + set_request_parameters(cert: str = None, timeout: int = None): + Sets the request parameters for the query. + set_start(start: str): + Sets the start time for the query. + set_end(end: str): + Sets the end time for the query. + """ + + def __init__( + self, + base_url: str = None, + start: str = None, + end: str = None, + kwargs: dict = None, + ): # TODO set standard format for start / end -> str | float + self.path = None + + self.base_url = base_url + self.global_start = start + self.global_end = end + + # maybe apply kwargs as kwargs + self.kwargs = {} if kwargs is None else kwargs + + self.cert = None + self.params = None + self.target = None + self.timeout = None + + self.initialize() + + def initialize(self): + """ + Initializes the query management by setting the global start and end timestamps. + + This method ensures that the `global_end` and `global_start` attributes have valid values. + If `global_end` is None, it sets it to the current timestamp. + If `global_start` is None, it calculates the timestamp for five years ago and sets it. + Finally, it applies additional request data through the `__parse_request_data` method. + """ + now = dt.now(tz.utc) + + self.global_end = str(now.timestamp()) if self.global_end is None else self.global_end # ensures end has value + + if self.global_start is None: # ensures start has value + self.global_start = calc.calculate_past_five_years_timestamp(now) + self.global_start = str(self.global_start) + + # apply kwargs values + self.__parse_request_data() + + def execute(self): + """ + Executes a request and parses the result. + + This method sends a request using the __execute_request method, + then parses the response using the __parse_request_result method, + and returns the parsed result. + + Returns: + The parsed result of the request. + """ + response = self.__execute_request() + result = self.__parse_request_result(response=response) + + return result + + def set_request_parameters(self, cert: str = None, timeout: int = None): + """ + Sets the request parameters for the query management. + + Args: + cert (str, optional): The certificate to be used for the request. Defaults to None. + timeout (int, optional): The timeout duration for the request in seconds. Defaults to None. + """ + self.cert = cert + self.timeout = timeout + + # TODO set property + def set_start(self, start: str): + """ + Sets the start parameter for the query. + + Args: + start (str): The start time or date to be set. + + Returns: + None + """ + self.global_start = start + + if self.params is None: + return + + self.params["start"] = start + + # TODO set property + def set_end(self, end: str): + """ + Sets the end parameter for the query. + + Args: + end (str): The end time or date to set for the query. + + Returns: + None + """ + self.global_end = end + + if self.params is None: + return + + self.params["end"] = end + + def __execute_request(self) -> requests.Response: + base_url = self.base_url + cert = self.cert + params = self.params + target = self.target + timeout = self.timeout + + url = base_url + target + + for _ in range(3): + try: + print(f"starting request… [{dt.fromtimestamp(float(params['start']))}]") # TODO remove print statements + res = requests.get(url=url, cert=cert, params=params, timeout=timeout) + print(f"request finished [{dt.fromtimestamp(float(params['start']))}]") + except requests.ConnectTimeout: + print("requests.ConnectTimeout") + continue + except requests.exceptions.ReadTimeout: + print("requests.exceptions.ReadTimeout") + return ResponseDummy(response_messages.MESSAGE_EXCEEDED_MAXIMUM) + except requests.exceptions.SSLError: + print("requests.exceptions.SSLError") + continue + except requests.exceptions.ConnectionError: + print("requests.exceptions.ConnectionError") + continue + except requests.exceptions.ChunkedEncodingError: + print("requests.exceptions.ChunkedEncodingError") + return ResponseDummy(response_messages.MESSAGE_EXCEEDED_MAXIMUM) + except Exception as e: + raise e + else: + return res + + print("safety catch invoked") + return ResponseDummy(response_messages.EMPTY_RESULTS) + + def __parse_request_result(self, response: requests.Response): + if response is None: + return response_messages.EMPTY_RESULTS + + try: + data = response.json() + except requests.exceptions.JSONDecodeError: + # LOGGING + print("=" * 50) + print(response.text) + print("=" * 50) + return response_messages.EMPTY_RESULTS + except Exception as e: + print(f"Exception occured: {e.args}, {e.__traceback__.__str__}") + return response_messages.EMPTY_RESULTS + + try: + if data["status"] == "error" and data["errorType"] != "bad_data": + return response_messages.EMPTY_RESULTS + except KeyError: + return response_messages.EMPTY_RESULTS + else: + return data + + def __parse_request_data(self): + start = self.global_start + end = self.global_end + + valid_query_parameters = ( + "query", + "dedup", + "partial_response", + "step", + "max_source_resolution", + "engine", + "analyze", + ) + + if "target" in self.kwargs: + target = self.kwargs["target"] + else: + target = "query_range" + + params = { + "query": "ALERTS", + "dedup": "true", + "partial_response": "false", + "start": start, + "end": end, + "step": "60", + "max_source_resolution": "0s", + "engine": "thanos", + "analyze": "false", + } + + if "params" in self.kwargs: # TODO consider using filters + for parameter_key in self.kwargs["params"].keys(): + if parameter_key in valid_query_parameters: + params[parameter_key] = self.kwargs["params"][parameter_key] + + self.params = params + self.target = target + + +class QueryManager: + """ + Manages query queues and their associated operations. + + Attributes: + cert (str): Certificate for authentication. + timeout (int): Timeout duration for queries. + storage_path (str): Path to store query data. + threshold (int): Threshold value for query processing. + thread_manager (semaphore.ThreadManager): Manager for handling threads. + queues (dict[str, QueryQueue]): Dictionary of query queues managed by this instance. + + Methods: + add_query_queue() -> int: + Adds a new query queue and returns its UUID. + + create_query_objects(query_queue_uuid: str, query: Query, separator: int): + Splits a query into multiple query objects and adds them to the specified query queue. + + create_environments(): + Creates environments for all query queues with query objects. + """ + + def __init__( + self, + cert: str = None, + timeout: int = None, + storage_path: str = None, + threshold: int = None, + thread_manager: semaphore.ThreadManager = None, + ): + self.cert = cert + self.timeout = timeout + self.treshold = threshold + self.thread_manager = thread_manager + + self.queues: dict[str, QueryQueue] = {} + self.storage_path = "data" if storage_path is None else storage_path + + def add_query_queue(self) -> int: + """ + Adds a new query queue to the query manager. + + This method generates a new UUID for the query queue, creates a new + QueryQueue instance, and adds it to the manager's queue dictionary. + + Returns: + int: The UUID of the newly created query queue. + """ + query_queue_uuid = uuid.uuid4().hex + query_queue = QueryQueue(query_manager=self) + self.queues[query_queue_uuid] = query_queue + + return query_queue_uuid + + def create_query_objects(self, query_queue_uuid: str, query: Query, separator: int): + """ + Creates query objects from a given query and adds them to the specified query queue. + + Args: + query_queue_uuid (str): The UUID of the query queue to which the query objects will be added. + query (Query): The query to be split into query objects. + separator (int): The separator used to split the query into multiple query objects. + + Raises: + errors.InvalidQueryQueueError: If the provided query queue UUID does not exist in the QueryManager. + + """ + query_objects = QuerySplitter.split_by_separator(QuerySplitter(), query=query, separator=separator) + query_queue = self.queues.get(query_queue_uuid, None) + + if query_queue is None: + raise errors.InvalidQueryQueueError( + f"The provided QueryQueue object doesn't exist inside this QueryManager.\nquery_queue_uuid: {query_queue_uuid}" + ) + + for key, value in query_objects.items(): + query_object = QueryObject(query_queue=query_queue, query=value, nr=key) + query_queue.add_query_object(query_object=query_object) + + def create_environments(self): + """ + Creates environments for each queue in the queues dictionary. + + This method iterates over the values in the `queues` dictionary. For each queue, + if the queue has query objects, it calls the `create_query_queue_environment` + method on the queue, passing the `storage_path` as an argument. + + Returns: + None + """ + for queue in self.queues.values(): + if len(queue.query_objects) == 0: + continue + queue.create_query_queue_environemt(self.storage_path) + + +# TODO add visual feedback + + +class QueryObject(object): + """ + A class to manage and execute queries within a specified environment. + + Attributes: + query_queue (QueryQueue): The queue that holds the query. + query (Query): The query to be executed. + object_nr (int): The identifier number for the query object. + path (str): The file path where the query object environment is created. + + Methods: + create_query_object_environment(path: str): + Creates a directory for the query object environment at the specified path. + + execute_query(): + Executes the query using the parameters from the query queue's manager. + """ + + def __init__(self, query_queue: QueryQueue, query: Query, nr: int): + self.query_queue = query_queue + self.query = query + self.object_nr = nr + self.path = None + + def create_query_object_environment(self, path: str): + """ + Creates a query object environment by ensuring the specified path exists and + creating a subdirectory for the query object. + + Args: + path (str): The base directory path where the query object environment + should be created. + + Raises: + OSError: If the directory creation fails. + """ + if not os.path.exists(path=path): + os.makedirs(name=path) + + self.path = os.path.join(path, f"group{self.object_nr}") + os.mkdir(self.path) + + def execute_query(self): + """ + Executes the query with the specified parameters. + + This method retrieves the certificate and timeout from the query manager, + sets the request parameters for the query, and then executes the query + using a QueryExecutor instance. + + Attributes: + cert (str): The certificate used for the query. + path (str): The path where the query will be executed. + timeout (int): The timeout value for the query. + + Raises: + Exception: If the query execution fails. + """ + cert = self.query_queue.query_manager.cert + path = self.path + timeout = self.query_queue.query_manager.timeout + self.query.set_request_parameters(cert=cert, timeout=timeout) + qe = QueryExecutor(path=path) + qe.execute_query(self.query) + + +class QueryQueue(object): + """ + A class to manage a queue of query objects and their execution. + + Attributes: + query_manager (QueryManager): An instance of QueryManager to manage query execution. + query_objects (list[QueryObject]): A list to store query objects. + path (str): The path where the query queue environment is created. + + Methods: + create_query_queue_environemt(path: str): + Creates a directory environment for the query queue at the specified path. + + add_query_object(query_object: QueryObject): + Adds a query object to the queue. + + schedule_queries() -> list[str]: + Schedules the execution of all query objects in the queue and returns a list of thread UUIDs. + """ + + def __init__(self, query_manager: QueryManager): + self.query_manager = query_manager + self.query_objects: list[QueryObject] = [] + self.path = None + + def create_query_queue_environemt(self, path: str): + """ + Creates a query queue environment at the specified path. + + This method performs the following steps: + 1. Checks if the specified path exists. If not, it creates the directory. + 2. Generates a unique identifier (UUID) for the query queue. + 3. Constructs the full path for the query queue using the base path and the UUID. + 4. Creates the directory for the query queue. + 5. Iterates over the query objects and calls their `create_query_object_environment` method to set up their environments. + + Args: + path (str): The base directory path where the query queue environment will be created. + """ + if not os.path.exists(path=path): + os.makedirs(name=path) + + queue_uuid = uuid.uuid4().hex + + self.path = os.path.join(path, queue_uuid) + + os.mkdir(self.path) + + for query_object in self.query_objects: + query_object.create_query_object_environment(self.path) + + def add_query_object(self, query_object: QueryObject): + """ + Adds a QueryObject to the list of query objects. + + Args: + query_object (QueryObject): The query object to be added to the list. + """ + self.query_objects.append(query_object) + + def schedule_queries(self) -> list[str]: + """ + Schedules the execution of queries by creating a new thread for each query object. + + Returns: + list[str]: A list of thread UUIDs corresponding to the scheduled queries. + """ + out = [] + + for query_object in self.query_objects: + thread_uuid = self.query_manager.thread_manager.add_thread(query_object.execute_query) + out.append(thread_uuid) + + return out + + +class QuerySplitter(object): + """ + A utility class for splitting queries based on specified thresholds or separators. + + Methods + ------- + split_by_treshold(query: Query, threshold: int = None) -> list[Query | None, Query | None]: + Splits a query into two parts based on a given threshold in days. If no threshold is provided, returns the original query and None. + + split_by_separator(query: Query, separator: int): + Splits a query into multiple parts based on a given time separator in seconds. + """ + + def __init__(self): + pass + + def split_by_treshold(self, query: Query, threshold: int = None) -> list[Query | None, Query | None]: + """ + Splits a given query into two separate queries based on a specified threshold. + + Args: + query (Query): The original query to be split. + threshold (int, optional): The threshold in days to split the query. If not provided, the query will not be split. + + Returns: + list[Query | None, Query | None]: A list containing two queries. The first query covers the period from the threshold to the end date, + and the second query covers the period from the start date to the threshold. If the threshold is not + provided or the split is not possible, one of the queries will be None. + """ + queries = [] + + if not threshold: + queries.extend([query, None]) + return queries + + now = dt.now(tz.utc) + + base_url = query.base_url + start = query.global_start + end = query.global_end + kwargs = query.kwargs + + if start is None: + start = calc.calculate_past_five_years_timestamp(now) + start = str(start) + + end = str(now.timestamp()) if end is None else end + + split = str((now - td(days=threshold)).timestamp()) + + if end > split > start: + queries.append(Query(base_url=base_url, start=split, end=end, kwargs=kwargs)) + + params = { + "step": "3600", + "max_source_resolution": "1h", + } # for 2nd query + + kwargs = copy.deepcopy(kwargs) + if "params" in kwargs.keys(): + kwargs["params"].update(params) + else: + kwargs["params"] = params + + queries.append(Query(base_url=base_url, start=start, end=split, kwargs=kwargs)) + else: + if split > end: + queries.extend([None, query]) + elif start > split: + queries.extend([query, None]) + else: + print(f"Unexpected split: start {start}, split {split}, end {end}") + + return queries + + def split_by_separator(self, query: Query, separator: int): + """ + Splits a given query into multiple sub-queries based on a specified time separator. + + Args: + query (Query): The query object to be split. + separator (int): The time interval in seconds to split the query by. + + Returns: + dict: A dictionary where the keys are integers representing the sub-query index, + and the values are the sub-query objects. + """ + query_objects = {} + + start = dt.fromtimestamp(float(query.global_start)) + global_end = dt.fromtimestamp(float(query.global_end)) + step = td(seconds=separator) + objects_counter = 0 + + end = start + step + while end < global_end: + query_copy = self.__create_query_copy(query=query, start=start, end=end) + query_objects[objects_counter] = query_copy + + start = end + end = start + step + objects_counter += 1 + + diff = global_end - end + end = end + diff + + query_copy = self.__create_query_copy(query=query, start=start, end=end) + query_objects[objects_counter] = query_copy + + return query_objects + + def __create_query_copy(self, query: Query, start: dt, end: dt) -> Query: + query_copy = copy.deepcopy(query) + query_copy.set_start(start=start.timestamp()) + query_copy.set_end(end=end.timestamp()) + + return query_copy diff --git a/utilities/response_messages.py b/utilities/response_messages.py new file mode 100644 index 0000000..5d6cc8b --- /dev/null +++ b/utilities/response_messages.py @@ -0,0 +1,9 @@ +"""This module contains some default messages for possible responses.""" + +MESSAGE_EXCEEDED_MAXIMUM = { + "status": "error", + "errorType": "bad_data", + "error": "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)", +} + +EMPTY_RESULTS = {"status": "success", "data": {"resultType": "matrix", "result": []}} diff --git a/utilities/semaphore.py b/utilities/semaphore.py new file mode 100644 index 0000000..e50e81c --- /dev/null +++ b/utilities/semaphore.py @@ -0,0 +1,47 @@ +import os +import time +import uuid + +from threading import Thread, BoundedSemaphore + + +class ThreadManager(object): + def __init__(self, semaphore_count: int | None = None): + if semaphore_count is None: + semaphore_count = os.cpu_count() - 2 + + self.semaphore = BoundedSemaphore(semaphore_count) + self.threads: dict[str] = {} + + def add_thread(self, func) -> str: + thread_uuid = uuid.uuid4().hex + self.threads[thread_uuid] = func + + return thread_uuid + + def wrapper_thread(self, func): + with self.semaphore: + func() + + def start_thread(self, thread_uuid: str = None) -> Thread: + if not thread_uuid: + return + + func = self.threads[thread_uuid] + + thread = Thread(target=self.wrapper_thread, args=(func,)) + thread.start() + + # TODO pop the used thread uuids from the list + + return thread + + def execute_all_threads(self): + threads: list[Thread] = [] + for key in self.threads: + thread = self.start_thread(thread_uuid=key) + threads.append(thread) + time.sleep(1) + + for thread in threads: + thread.join()