From caf9b23db2c9d94d6c924c2daad0fc8e3a50d904 Mon Sep 17 00:00:00 2001 From: Lukasz Sojka Date: Wed, 16 Oct 2024 15:43:13 +0200 Subject: [PATCH] improvement(cleanup): remove backend from argus client Some cleanup work had to be done to better structure argus client for less problematic management: * Removed old db stuff, that cause confusion when searching for db/models related things. Along with old migration and prod dump scripts. * moved common modules for backend and client to `common` module next to `backend` and `client` modules * excluded `backend` module from client package. Including backend might cause import error due backend-only required packages * created unit tests for argus client package - build, install, basic import closes: https://github.com/scylladb/argus/issues/480 --- argus/backend/controller/testrun_api.py | 2 +- argus/backend/plugins/core.py | 6 +- .../plugins/driver_matrix_tests/model.py | 2 +- argus/backend/plugins/generic/model.py | 2 +- argus/backend/plugins/sct/service.py | 7 +- argus/backend/plugins/sct/udt.py | 2 +- argus/backend/plugins/sirenada/model.py | 6 +- argus/backend/service/client_service.py | 8 +- argus/backend/service/stats.py | 2 +- argus/backend/service/testrun.py | 3 +- argus/client/base.py | 2 +- argus/client/driver_matrix_tests/cli.py | 4 +- argus/client/driver_matrix_tests/client.py | 2 +- argus/client/generic/cli.py | 4 +- argus/client/sct/client.py | 6 +- argus/client/sirenada/client.py | 2 +- .../{db/.gitkeep => client/tests/__init__.py} | 0 argus/client/tests/conftest.py | 19 + argus/client/tests/test_package.py | 48 ++ argus/common/__init__.py | 0 argus/{backend/util => common}/enums.py | 0 .../sct/types.py => common/sct_types.py} | 0 .../types.py => common/sirenada_types.py} | 0 argus/db/argus_json.py | 14 - argus/db/cloud_types.py | 125 --- argus/db/config.py | 135 ---- argus/db/db_types.py | 139 ---- argus/db/interface.py | 370 --------- argus/db/testrun.py | 740 ------------------ argus/db/utils.py | 15 - pyproject.toml | 9 +- scripts/download_runs_from_prod.py | 38 - scripts/fix_run_metadata.py | 20 - .../migration/migrate_testruns_2022-06-12.py | 35 - scripts/upload_runs_to_active_db.py | 34 - 35 files changed, 101 insertions(+), 1700 deletions(-) rename argus/{db/.gitkeep => client/tests/__init__.py} (100%) create mode 100644 argus/client/tests/conftest.py create mode 100644 argus/client/tests/test_package.py create mode 100644 argus/common/__init__.py rename argus/{backend/util => common}/enums.py (100%) rename argus/{backend/plugins/sct/types.py => common/sct_types.py} (100%) rename argus/{backend/plugins/sirenada/types.py => common/sirenada_types.py} (100%) delete mode 100644 argus/db/argus_json.py delete mode 100644 argus/db/cloud_types.py delete mode 100644 argus/db/config.py delete mode 100644 argus/db/db_types.py delete mode 100644 argus/db/interface.py delete mode 100644 argus/db/testrun.py delete mode 100644 argus/db/utils.py delete mode 100644 scripts/download_runs_from_prod.py delete mode 100644 scripts/fix_run_metadata.py delete mode 100644 scripts/migration/migrate_testruns_2022-06-12.py delete mode 100644 scripts/upload_runs_to_active_db.py diff --git a/argus/backend/controller/testrun_api.py b/argus/backend/controller/testrun_api.py index 1395908d..7607429f 100644 --- a/argus/backend/controller/testrun_api.py +++ b/argus/backend/controller/testrun_api.py @@ -12,7 +12,7 @@ from argus.backend.service.testrun import TestRunService from argus.backend.service.user import api_login_required from argus.backend.util.common import get_payload -from argus.backend.util.enums import TestInvestigationStatus, TestStatus +from argus.common.enums import TestInvestigationStatus, TestStatus bp = Blueprint('testrun_api', __name__, 'testrun') bp.register_error_handler(Exception, handle_api_exception) diff --git a/argus/backend/plugins/core.py b/argus/backend/plugins/core.py index 3ed89063..1546ac47 100644 --- a/argus/backend/plugins/core.py +++ b/argus/backend/plugins/core.py @@ -16,12 +16,10 @@ ArgusScheduleGroup, ArgusSchedule, ArgusScheduleTest, - ArgusScheduleAssignee, - ArgusUserView, - User + ArgusScheduleAssignee ) from argus.backend.util.common import chunk -from argus.backend.util.enums import TestInvestigationStatus, TestStatus +from argus.common.enums import TestInvestigationStatus, TestStatus LOGGER = logging.getLogger(__name__) diff --git a/argus/backend/plugins/driver_matrix_tests/model.py b/argus/backend/plugins/driver_matrix_tests/model.py index 2c8ca0dd..a0d1f029 100644 --- a/argus/backend/plugins/driver_matrix_tests/model.py +++ b/argus/backend/plugins/driver_matrix_tests/model.py @@ -13,7 +13,7 @@ from argus.backend.plugins.core import PluginModelBase from argus.backend.plugins.driver_matrix_tests.udt import TestCollection, TestSuite, TestCase, EnvironmentInfo from argus.backend.plugins.driver_matrix_tests.raw_types import RawMatrixTestResult -from argus.backend.util.enums import TestStatus +from argus.common.enums import TestStatus LOGGER = logging.getLogger(__name__) diff --git a/argus/backend/plugins/generic/model.py b/argus/backend/plugins/generic/model.py index 65649fd4..44bac604 100644 --- a/argus/backend/plugins/generic/model.py +++ b/argus/backend/plugins/generic/model.py @@ -7,7 +7,7 @@ from argus.backend.models.web import ArgusRelease from argus.backend.plugins.core import PluginModelBase from argus.backend.plugins.generic.types import GenericRunFinishRequest, GenericRunSubmitRequest -from argus.backend.util.enums import TestStatus +from argus.common.enums import TestStatus class GenericPluginException(Exception): diff --git a/argus/backend/plugins/sct/service.py b/argus/backend/plugins/sct/service.py index 0dfa5095..e38ac6f9 100644 --- a/argus/backend/plugins/sct/service.py +++ b/argus/backend/plugins/sct/service.py @@ -1,7 +1,6 @@ import base64 from dataclasses import dataclass from datetime import datetime -from functools import reduce import logging import math import re @@ -10,7 +9,7 @@ from flask import g from argus.backend.models.web import ArgusEventTypes from argus.backend.plugins.sct.testrun import SCTJunitReports, SCTTestRun, SubtestType -from argus.backend.plugins.sct.types import GeminiResultsRequest, PerformanceResultsRequest, ResourceUpdateRequest +from argus.common.sct_types import GeminiResultsRequest, PerformanceResultsRequest, ResourceUpdateRequest from argus.backend.plugins.sct.udt import ( CloudInstanceDetails, CloudResource, @@ -22,7 +21,7 @@ ) from argus.backend.service.event_service import EventService from argus.backend.util.common import get_build_number -from argus.backend.util.enums import NemesisStatus, ResourceState, TestStatus +from argus.common.enums import NemesisStatus, ResourceState, TestStatus LOGGER = logging.getLogger(__name__) @@ -488,4 +487,4 @@ def junit_get_all(run_id: str) -> list[SCTJunitReports]: @staticmethod def junit_get_single(run_id: str, file_name: str) -> SCTJunitReports: - return SCTJunitReports.get(test_id=run_id, file_name=file_name) \ No newline at end of file + return SCTJunitReports.get(test_id=run_id, file_name=file_name) diff --git a/argus/backend/plugins/sct/udt.py b/argus/backend/plugins/sct/udt.py index 258d3cf8..c72097a6 100644 --- a/argus/backend/plugins/sct/udt.py +++ b/argus/backend/plugins/sct/udt.py @@ -2,7 +2,7 @@ from cassandra.cqlengine.usertype import UserType from cassandra.cqlengine import columns -from argus.backend.util.enums import ResourceState +from argus.common.enums import ResourceState class PackageVersion(UserType): diff --git a/argus/backend/plugins/sirenada/model.py b/argus/backend/plugins/sirenada/model.py index 1b03e08c..63170b6d 100644 --- a/argus/backend/plugins/sirenada/model.py +++ b/argus/backend/plugins/sirenada/model.py @@ -1,13 +1,13 @@ from datetime import datetime -from uuid import UUID, uuid4 +from uuid import UUID from cassandra.cqlengine import columns from cassandra.cqlengine.usertype import UserType from cassandra.cqlengine.models import Model from argus.backend.db import ScyllaCluster from argus.backend.models.web import ArgusRelease from argus.backend.plugins.core import PluginModelBase -from argus.backend.plugins.sirenada.types import RawSirenadaRequest, SirenadaPluginException -from argus.backend.util.enums import TestStatus +from argus.common.sirenada_types import RawSirenadaRequest, SirenadaPluginException +from argus.common.enums import TestStatus class SirenadaTest(UserType): diff --git a/argus/backend/service/client_service.py b/argus/backend/service/client_service.py index 0c00d087..9df90a2c 100644 --- a/argus/backend/service/client_service.py +++ b/argus/backend/service/client_service.py @@ -1,7 +1,5 @@ -import operator -from dataclasses import asdict, is_dataclass -from datetime import datetime, timezone -from functools import partial +from dataclasses import asdict +from datetime import datetime from uuid import UUID from argus.backend.db import ScyllaCluster @@ -9,7 +7,7 @@ from argus.backend.plugins.core import PluginModelBase from argus.backend.plugins.loader import AVAILABLE_PLUGINS from argus.backend.service.results_service import ResultsService, Cell -from argus.backend.util.enums import TestStatus +from argus.common.enums import TestStatus class ClientException(Exception): diff --git a/argus/backend/service/stats.py b/argus/backend/service/stats.py index 69896086..c272f9ec 100644 --- a/argus/backend/service/stats.py +++ b/argus/backend/service/stats.py @@ -9,7 +9,7 @@ from cassandra.cqlengine.models import Model from argus.backend.plugins.loader import all_plugin_models from argus.backend.util.common import chunk, get_build_number -from argus.backend.util.enums import TestStatus, TestInvestigationStatus +from argus.common.enums import TestStatus, TestInvestigationStatus from argus.backend.models.web import ArgusGithubIssue, ArgusRelease, ArgusGroup, ArgusTest,\ ArgusScheduleTest, ArgusTestRunComment, ArgusUserView from argus.backend.db import ScyllaCluster diff --git a/argus/backend/service/testrun.py b/argus/backend/service/testrun.py index 41c2d19e..8767df56 100644 --- a/argus/backend/service/testrun.py +++ b/argus/backend/service/testrun.py @@ -13,7 +13,6 @@ from cassandra.query import BatchStatement, ConsistencyLevel from cassandra.cqlengine.query import BatchQuery from argus.backend.db import ScyllaCluster -from argus.backend.models.result import ArgusGenericResultMetadata from argus.backend.models.web import ( ArgusEvent, @@ -38,7 +37,7 @@ from argus.backend.service.notification_manager import NotificationManagerService from argus.backend.service.stats import ComparableTestStatus from argus.backend.util.common import chunk, get_build_number, strip_html_tags -from argus.backend.util.enums import TestInvestigationStatus, TestStatus +from argus.common.enums import TestInvestigationStatus, TestStatus LOGGER = logging.getLogger(__name__) diff --git a/argus/client/base.py b/argus/client/base.py index d3f3cbbd..3e38031f 100644 --- a/argus/client/base.py +++ b/argus/client/base.py @@ -6,7 +6,7 @@ import requests -from argus.backend.util.enums import TestStatus +from argus.common.enums import TestStatus from argus.client.generic_result import GenericResultTable from argus.client.sct.types import LogLink diff --git a/argus/client/driver_matrix_tests/cli.py b/argus/client/driver_matrix_tests/cli.py index 1d343353..d02b0778 100644 --- a/argus/client/driver_matrix_tests/cli.py +++ b/argus/client/driver_matrix_tests/cli.py @@ -3,7 +3,7 @@ from pathlib import Path import click import logging -from argus.backend.util.enums import TestStatus +from argus.common.enums import TestStatus from argus.client.driver_matrix_tests.client import ArgusDriverMatrixClient @@ -107,4 +107,4 @@ def finish_driver_matrix_run(api_key: str, base_url: str, run_id: str, status: s if __name__ == "__main__": - cli() \ No newline at end of file + cli() diff --git a/argus/client/driver_matrix_tests/client.py b/argus/client/driver_matrix_tests/client.py index d4787cf0..2b5bb0e1 100644 --- a/argus/client/driver_matrix_tests/client.py +++ b/argus/client/driver_matrix_tests/client.py @@ -1,5 +1,5 @@ from uuid import UUID -from argus.backend.util.enums import TestStatus +from argus.common.enums import TestStatus from argus.client.base import ArgusClient diff --git a/argus/client/generic/cli.py b/argus/client/generic/cli.py index 43d83f9d..bcc55411 100644 --- a/argus/client/generic/cli.py +++ b/argus/client/generic/cli.py @@ -1,7 +1,7 @@ import click import logging -from argus.backend.util.enums import TestStatus +from argus.client.base import TestStatus from argus.client.generic.client import ArgusGenericClient LOGGER = logging.getLogger(__name__) @@ -44,4 +44,4 @@ def finish_run(api_key: str, base_url: str, id: str, status: str, scylla_version if __name__ == "__main__": - cli() \ No newline at end of file + cli() diff --git a/argus/client/sct/client.py b/argus/client/sct/client.py index 11c6f821..c1f2f557 100644 --- a/argus/client/sct/client.py +++ b/argus/client/sct/client.py @@ -2,8 +2,8 @@ from typing import Any from uuid import UUID from dataclasses import asdict -from argus.backend.plugins.sct.types import GeminiResultsRequest, PerformanceResultsRequest -from argus.backend.util.enums import ResourceState, TestStatus +from argus.common.sct_types import GeminiResultsRequest, PerformanceResultsRequest +from argus.common.enums import ResourceState, TestStatus from argus.client.base import ArgusClient from argus.client.sct.types import EventsInfo, LogLink, Package @@ -298,4 +298,4 @@ def sct_submit_junit_report(self, file_name: str, raw_content: str) -> None: "file_name": file_name, "content": str(base64.encodebytes(bytes(raw_content, encoding="utf-8")), encoding="utf-8") } - ) \ No newline at end of file + ) diff --git a/argus/client/sirenada/client.py b/argus/client/sirenada/client.py index b324eec6..3ba7ba96 100644 --- a/argus/client/sirenada/client.py +++ b/argus/client/sirenada/client.py @@ -5,7 +5,7 @@ from typing import TypedDict from xml.etree import ElementTree -from argus.backend.plugins.sirenada.types import RawSirenadaRequest, RawSirenadaTestCase +from argus.common.sirenada_types import RawSirenadaRequest, RawSirenadaTestCase from argus.client.base import ArgusClient diff --git a/argus/db/.gitkeep b/argus/client/tests/__init__.py similarity index 100% rename from argus/db/.gitkeep rename to argus/client/tests/__init__.py diff --git a/argus/client/tests/conftest.py b/argus/client/tests/conftest.py new file mode 100644 index 00000000..f6c8bdd3 --- /dev/null +++ b/argus/client/tests/conftest.py @@ -0,0 +1,19 @@ +import shutil +from pathlib import Path + +import pytest + + +@pytest.fixture(scope="module", autouse=True) +def test_dir(): + return Path(__file__).parent + + +@pytest.fixture(scope="module", autouse=True) +def env_dir(test_dir): + env_dir = test_dir / 'test_env' + if env_dir.exists(): + shutil.rmtree(env_dir) + yield env_dir + if env_dir.exists(): + shutil.rmtree(env_dir) diff --git a/argus/client/tests/test_package.py b/argus/client/tests/test_package.py new file mode 100644 index 00000000..8300c91d --- /dev/null +++ b/argus/client/tests/test_package.py @@ -0,0 +1,48 @@ +import subprocess +import venv +from pathlib import Path + +import pytest + + +def run_command(command: list[str], cwd: str = None, env=None): + result = subprocess.run(command, cwd=cwd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env) + print(result.stdout) + return result + + +def create_virtualenv(env_dir: Path): + venv.create(env_dir, with_pip=True) + pip_path = env_dir / 'bin' / 'pip' + return pip_path + + +def extract_version_from_pyproject(pyproject_file: Path) -> str: + with pyproject_file.open() as file: + for line in file: + if line.strip().startswith("version"): + return line.split('=')[1].strip().strip('"') + + +def test_should_build_package(): + run_command(['poetry', 'build']) + + +def test_should_create_env_and_install(test_dir: Path, env_dir: Path) -> None: + pyproject_file = test_dir.parents[2] / 'pyproject.toml' + version = extract_version_from_pyproject(pyproject_file) + + dist_dir = test_dir.parents[2] / 'dist' + package_path = dist_dir / f"argus_alm-{version}-py3-none-any.whl" + + pip_path = create_virtualenv(env_dir) + run_command([pip_path, 'install', str(package_path)]) + + +def test_should_import_installed_package(env_dir): + python_path = env_dir / 'bin' / 'python' + + run_command([python_path, '-c', 'import argus.client; import argus.common; ' + 'from argus.client.sct.client import ArgusSCTClient'], env={"PYTHONPATH": str(env_dir)}) + with pytest.raises(subprocess.CalledProcessError): + run_command([python_path, '-c', 'import argus.client.tests.test_package'], env={"PYTHONPATH": str(env_dir)}) diff --git a/argus/common/__init__.py b/argus/common/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/argus/backend/util/enums.py b/argus/common/enums.py similarity index 100% rename from argus/backend/util/enums.py rename to argus/common/enums.py diff --git a/argus/backend/plugins/sct/types.py b/argus/common/sct_types.py similarity index 100% rename from argus/backend/plugins/sct/types.py rename to argus/common/sct_types.py diff --git a/argus/backend/plugins/sirenada/types.py b/argus/common/sirenada_types.py similarity index 100% rename from argus/backend/plugins/sirenada/types.py rename to argus/common/sirenada_types.py diff --git a/argus/db/argus_json.py b/argus/db/argus_json.py deleted file mode 100644 index 3d1a18eb..00000000 --- a/argus/db/argus_json.py +++ /dev/null @@ -1,14 +0,0 @@ -from datetime import datetime -import json -from uuid import UUID - - -class ArgusJSONEncoder(json.JSONEncoder): - def default(self, obj): - match obj: - case UUID(): - return str(obj) - case datetime(): - return obj.isoformat(sep=' ', timespec='milliseconds') - case _: - return json.JSONEncoder.default(self, obj) diff --git a/argus/db/cloud_types.py b/argus/db/cloud_types.py deleted file mode 100644 index 11b991d2..00000000 --- a/argus/db/cloud_types.py +++ /dev/null @@ -1,125 +0,0 @@ -import ipaddress -import time -from enum import Enum -from typing import Optional -from pydantic.dataclasses import dataclass -from pydantic import ValidationError, validator -from argus.db.db_types import ArgusUDTBase - - -@dataclass(init=True, repr=True) -class CloudInstanceDetails(ArgusUDTBase): - provider: str = "" - region: str = "" - public_ip: str = "" - private_ip: str = "" - creation_time: int = 0 - termination_time: int = 0 - termination_reason: str = "" - shards_amount: Optional[int] = 0 - _typename = "CloudInstanceDetails_v3" - - @classmethod - def from_db_udt(cls, udt): - return cls(provider=udt.provider, region=udt.region, public_ip=udt.public_ip, private_ip=udt.private_ip, - creation_time=udt.creation_time, termination_time=udt.termination_time, - termination_reason=udt.termination_reason, shards_amount=udt.shards_amount) - - @validator("public_ip") - def valid_public_ip_address(cls, v): - try: - ipaddress.ip_address(v) - except ValueError: - raise ValidationError(f"Not a valid IPv4(v6) address: {v}") - - return v - - @validator("private_ip") - def valid_private_ip_address(cls, v): - try: - ipaddress.ip_address(v) - except ValueError: - raise ValidationError(f"Not a valid IPv4(v6) address: {v}") - - return v - - -@dataclass(init=True, repr=True) -class CloudNodesInfo(ArgusUDTBase): - image_id: str - instance_type: str - node_amount: int - post_behaviour: str - - @classmethod - def from_db_udt(cls, udt): - return cls(image_id=udt.image_id, instance_type=udt.instance_type, - node_amount=udt.node_amount, post_behaviour=udt.post_behaviour) - - -@dataclass(init=True, repr=True) -class BaseCloudSetupDetails(ArgusUDTBase): - db_node: CloudNodesInfo - loader_node: CloudNodesInfo - monitor_node: CloudNodesInfo - backend: str = None - _typename = "CloudSetupDetails" - - @classmethod - def from_db_udt(cls, udt): - db_node = CloudNodesInfo(*udt.db_node) - loader_node = CloudNodesInfo(*udt.loader_node) - monitor_node = CloudNodesInfo(*udt.monitor_node) - backend = udt.backend - return cls(db_node=db_node, loader_node=loader_node, monitor_node=monitor_node, backend=backend) - - -@dataclass(init=True, repr=True) -class AWSSetupDetails(BaseCloudSetupDetails): - backend: str = "aws" - - -@dataclass(init=True, repr=True) -class GCESetupDetails(BaseCloudSetupDetails): - backend: str = "gce" - - -class ResourceState(str, Enum): - RUNNING = "running" - STOPPED = "stopped" - TERMINATED = "terminated" - - -@dataclass(init=True, repr=True) -class CloudResource(ArgusUDTBase): - name: str - state: ResourceState - resource_type: str - instance_info: CloudInstanceDetails - _typename = "CloudResource_v3" - - def __eq__(self, other) -> bool: - if (isinstance(other, CloudResource)): - return self.name == other.name - return False - - @classmethod - def from_db_udt(cls, udt): - instance_info = CloudInstanceDetails.from_db_udt(udt.instance_info) - return cls(name=udt.name, state=udt.state, resource_type=udt.resource_type, instance_info=instance_info) - - @validator("state") - def valid_state(cls, v): - try: - ResourceState(v) - except ValueError: - raise ValidationError(f"Not a valid resource state: {v}") - return v - - def terminate(self, reason): - self.state = ResourceState.TERMINATED - self.instance_info.termination_time = int(time.time()) - self.instance_info.termination_reason = reason - - def stop(self): - self.state = ResourceState.STOPPED diff --git a/argus/db/config.py b/argus/db/config.py deleted file mode 100644 index bc2a899f..00000000 --- a/argus/db/config.py +++ /dev/null @@ -1,135 +0,0 @@ -from abc import ABC -from typing import Hashable, Any -from pathlib import Path -from os import getenv -import logging -import yaml - -LOGGER = logging.getLogger(__name__) - - -class ConfigLocationError(Exception): - pass - - -class BaseConfig(ABC): - def __init__(self): - pass - - @property - def as_dict(self) -> dict[Hashable, Any]: - raise NotImplementedError() - - @property - def username(self) -> str: - raise NotImplementedError() - - @property - def password(self) -> str: - raise NotImplementedError() - - @property - def contact_points(self) -> list[str]: - raise NotImplementedError() - - @property - def keyspace_name(self) -> str: - raise NotImplementedError() - - @property - def address_mapping(self) -> dict: - return NotImplementedError() - - -class FileConfig(BaseConfig): - @property - def username(self): - return self.as_dict.get("username") - - @property - def password(self) -> str: - return self.as_dict.get("password") - - @property - def contact_points(self) -> list[str]: - return self.as_dict.get("contact_points") - - @property - def keyspace_name(self) -> str: - return self.as_dict.get("keyspace_name") - - @property - def address_mapping(self) -> dict: - return self.as_dict.get("address_mapping") - - DEFAULT_CONFIG_PATHS = ( - "./config/argus.local.yaml", - "argus.local.yaml", - "argus.yaml", - getenv("HOME") + "/.argus.yaml" - ) - - def __init__(self, filepath: str = None): - super().__init__() - if not filepath: - for file in self.DEFAULT_CONFIG_PATHS: - LOGGER.info("Trying %s", file) - if Path(file).exists(): - filepath = file - break - - if not filepath: - LOGGER.error("All config locations were tried and no config file found") - raise ConfigLocationError("No config file supplied and no config exists at default location") - - self.filepath = filepath - self._credentials = None - - @property - def as_dict(self) -> dict[Hashable, Any]: - if self._credentials: - return self._credentials - path = Path(self.filepath) - if not path.exists(): - raise ConfigLocationError(f"File not found: {self.filepath}") - - with open(path.absolute(), "rt", encoding="utf-8") as file: - self._credentials = yaml.safe_load(file) - - return self._credentials - - -class Config(BaseConfig): - @property - def username(self) -> str: - return self.as_dict.get("username") - - @property - def password(self) -> str: - return self.as_dict.get("password") - - @property - def contact_points(self) -> list[str]: - return self.as_dict.get("contact_points") - - @property - def keyspace_name(self) -> str: - return self.as_dict.get("keyspace_name") - - @property - def address_mapping(self) -> dict: - return self.as_dict.get("address_mapping") - - def __init__(self, username: str, password: str, contact_points: list[str], keyspace_name: str, address_mapping: dict | None = None): - super().__init__() - self._config = { - "username": username, - "password": password, - "contact_points": contact_points, - "keyspace_name": keyspace_name, - "address_mapping": address_mapping, - } - - @property - def as_dict(self) -> dict[Hashable, Any]: - return self._config diff --git a/argus/db/db_types.py b/argus/db/db_types.py deleted file mode 100644 index 2514e42b..00000000 --- a/argus/db/db_types.py +++ /dev/null @@ -1,139 +0,0 @@ -import re -import time -from enum import Enum -from typing import Any, Union, Type, TypeVar, Optional -from pydantic.dataclasses import dataclass -from pydantic import validator, ValidationError - - -class ArgusUDTBase: - _typename = None - - @classmethod - def basename(cls): - return cls._typename if cls._typename else cls.__name__ - - -@dataclass(init=True, repr=True) -class NodeDescription(ArgusUDTBase): - name: str - ip: str - shards: int - - @classmethod - def from_db_udt(cls, udt): - return cls(name=udt.name, ip=udt.ip, shards=udt.shards) - - @validator("ip") - def valid_ip_address(cls, value): - ip_addr_re = r"(\d{1,3}\.){3}\d{1,3}" - if not re.match(ip_addr_re, value): - raise ValidationError(f"Not a valid ip address: {value}") - - ip_by_octets = [int(octet) for octet in value.split(".") if int(octet) <= 255] - if len(ip_by_octets) != 4: - raise ValidationError(f"Octets out of range (0, 255): {value}") - - return value - - -@dataclass(init=True, repr=True) -class PackageVersion(ArgusUDTBase): - name: str - version: str - date: str - revision_id: str - build_id: Optional[str] = "" - _typename = "PackageVersion_v2" - - @classmethod - def from_db_udt(cls, udt): - return cls(name=udt.name, version=udt.version, - date=udt.date, revision_id=udt.revision_id, build_id=udt.build_id) - - -class NemesisStatus(str, Enum): - STARTED = "started" - RUNNING = "running" - FAILED = "failed" - SKIPPED = "skipped" - SUCCEEDED = "succeeded" - TERMINATED = "terminated" - - -class TestStatus(str, Enum): - CREATED = "created" - RUNNING = "running" - FAILED = "failed" - PASSED = "passed" - ABORTED = "aborted" - NOT_PLANNED = "not_planned" - NOT_RUN = "not_run" - - -class TestInvestigationStatus(str, Enum): - NOT_INVESTIGATED = "not_investigated" - IN_PROGRESS = "in_progress" - INVESTIGATED = "investigated" - - -@dataclass(init=True, repr=True) -class NemesisRunInfo(ArgusUDTBase): - class_name: str - name: str - duration: int - target_node: NodeDescription - status: str - start_time: int - end_time: int = 0 - stack_trace: str = "" - - @property - def nemesis_status(self): - return NemesisStatus(self.status) - - @nemesis_status.setter - def nemesis_status(self, value: NemesisStatus): - self.status = NemesisStatus(value).value - - @classmethod - def from_db_udt(cls, udt): - target_node = NodeDescription.from_db_udt(udt.target_node) - return cls(class_name=udt.class_name, name=udt.name, duration=udt.duration, - target_node=target_node, status=udt.status, start_time=udt.start_time, end_time=udt.end_time, - stack_trace=udt.stack_trace) - - def complete(self, stack_trace=None): - self.end_time = int(time.time()) - if stack_trace: - self.nemesis_status = NemesisStatus.FAILED - self.stack_trace = stack_trace - else: - self.nemesis_status = NemesisStatus.SUCCEEDED - - -@dataclass(init=True, repr=True) -class EventsBySeverity(ArgusUDTBase): - severity: str - event_amount: int - last_events: list[str] - - @classmethod - def from_db_udt(cls, udt): - return cls(severity=udt.severity, event_amount=udt.event_amount, last_events=udt.last_events) - - -TypeHint = TypeVar("TypeHint") - - -@dataclass(init=True, repr=True) -class CollectionHint: - stored_type: TypeHint - - -@dataclass(init=True, repr=True) -class ColumnInfo: - name: str - type: Type[Union[CollectionHint, int, str, TypeHint]] - value: Any - constraints: list[str] diff --git a/argus/db/interface.py b/argus/db/interface.py deleted file mode 100644 index 3cf02197..00000000 --- a/argus/db/interface.py +++ /dev/null @@ -1,370 +0,0 @@ -# TODO: Deprecated, will be removed once REST API client is ready -from datetime import datetime -import re -import logging -import json -from uuid import UUID -from hashlib import sha1 -from dataclasses import fields as dataclass_fields -from typing import KeysView, Union, Optional, Any, get_args as get_type_args, get_origin as get_type_origin -from types import GenericAlias - -import cassandra.cluster -import cassandra.cqltypes -from cassandra import ConsistencyLevel -from cassandra.auth import PlainTextAuthProvider -from cassandra.query import named_tuple_factory -from cassandra.policies import WhiteListRoundRobinPolicy, AddressTranslator -from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT -from cassandra.cqlengine import connection -from cassandra.cqlengine import models -from cassandra.cqlengine import management -from argus.db.argus_json import ArgusJSONEncoder - -from argus.db.config import BaseConfig, FileConfig -from argus.db.db_types import ColumnInfo, CollectionHint, ArgusUDTBase -from argus.db.cloud_types import ResourceState -from argus.backend.models.web import ArgusSchedule, ArgusScheduleAssignee, \ - ArgusScheduleGroup, ArgusScheduleTest, ArgusRelease, ArgusGroup, ArgusTest - -LOGGER = logging.getLogger(__name__) - - -class ArgusInterfaceSingletonError(Exception): - pass - - -class ArgusInterfaceDatabaseConnectionError(Exception): - pass - - -class ArgusInterfaceSchemaError(Exception): - pass - - -class ArgusInterfaceNameError(Exception): - pass - - -class PrivateToPublicAddressTranslator(AddressTranslator): - def __init__(self, address_mapping): - self.address_mapping = address_mapping - - def translate(self, addr): - return self.address_mapping.get(addr, addr) - - -class ArgusDatabase: - # pylint: disable=too-many-instance-attributes - CQL_ENGINE_CONNECTION_NAME = 'argus_cql_engine_conn' - ARGUS_EXECUTION_PROFILE = "argus_named_tuple" - REQUIRED_CQL_ENGINE_MODELS = [ - ArgusSchedule, - ArgusScheduleGroup, - ArgusScheduleTest, - ArgusScheduleAssignee, - ArgusTest, - ArgusGroup, - ArgusRelease, - ] - PYTHON_SCYLLA_TYPE_MAPPING = { - int: cassandra.cqltypes.IntegerType.typename, - float: cassandra.cqltypes.FloatType.typename, - str: cassandra.cqltypes.VarcharType.typename, - UUID: cassandra.cqltypes.UUIDType.typename, - datetime: cassandra.cqltypes.DateType.typename, - ResourceState: cassandra.cqltypes.VarcharType.typename, - Optional[str]: cassandra.cqltypes.VarcharType.typename, - Optional[int]: cassandra.cqltypes.IntegerType.typename, - } - - _INSTANCE: Union['ArgusDatabase', Any] = None - - def __init__(self, config: BaseConfig = None): - if not config: - config = FileConfig() - self.config = config - self.execution_profile = ExecutionProfile( - load_balancing_policy=WhiteListRoundRobinPolicy(hosts=self.config.contact_points), - consistency_level=ConsistencyLevel.QUORUM, - ) - self.exec_profile_named_tuple = ExecutionProfile( - load_balancing_policy=WhiteListRoundRobinPolicy(hosts=self.config.contact_points), - consistency_level=ConsistencyLevel.QUORUM, - row_factory=named_tuple_factory - ) - address_translator = None - if self.config.address_mapping: - address_translator = PrivateToPublicAddressTranslator(self.config.address_mapping) - - self.cluster = cassandra.cluster.Cluster(contact_points=self.config.contact_points, - protocol_version=4, - auth_provider=PlainTextAuthProvider( - username=self.config.username, - password=self.config.password), - execution_profiles={ - EXEC_PROFILE_DEFAULT: self.execution_profile, - self.ARGUS_EXECUTION_PROFILE: self.exec_profile_named_tuple - }, - address_translator=address_translator, - ) - self.session = self.cluster.connect() - self._keyspace_initialized = False - self.prepared_statements = {} - self.initialized_tables = {} - self._table_keys = {} - self._mapped_udts = {} - self._current_keyspace = self.init_keyspace(name=self.config.keyspace_name) - connection.register_connection(self.CQL_ENGINE_CONNECTION_NAME, session=self.session) - for model in self.REQUIRED_CQL_ENGINE_MODELS: - management.sync_table(model, keyspaces=(self._current_keyspace,), - connections=(self.CQL_ENGINE_CONNECTION_NAME,)) - if not models.DEFAULT_KEYSPACE: - models.DEFAULT_KEYSPACE = self._current_keyspace - elif models.DEFAULT_KEYSPACE != self._current_keyspace: - LOGGER.warning( - "CQL Engine DEFAULT_KEYSPACE has been set already and differs from interface keyspace, this could cause issues") - - @classmethod - def get(cls, config: BaseConfig = None): - if cls._INSTANCE: - LOGGER.debug("Found valid db session.") - return cls._INSTANCE - - if not config: - config = FileConfig() - - LOGGER.debug("Initializing db session from default config") - cls._INSTANCE = cls(config=config) - return cls._INSTANCE - - @classmethod - def destroy(cls): - if not cls._INSTANCE: - LOGGER.warning("ArgusDatabase::destroy called with no valid session.") - return False - - LOGGER.info("Shutting down the cluster connection.") - cls._INSTANCE.cluster.shutdown() - cls._INSTANCE = None - return True - - @classmethod - def from_config(cls, config: BaseConfig = None): - return cls.get(config) - - def prepare_query_for_table(self, table_name, query_type, query): - prepared_statement = self.session.prepare(query=query) - self.prepared_statements[f"{table_name}_{query_type}"] = prepared_statement - - return prepared_statement - - @staticmethod - def _verify_keyspace_name(name: str): - incorrect_keyspace_name_re = r"\." - if match := re.search(incorrect_keyspace_name_re, name): - raise ArgusInterfaceNameError("Keyspace name does not conform to the " - f"keyspace naming rules: {name} (pos: {match.pos})") - return name - - @staticmethod - def _get_hash_from_keys(keys: Union[list[str], KeysView]): - key_string = ".".join(keys).encode(encoding="utf-8") - return sha1(key_string).hexdigest() - - def init_keyspace(self, name="argus", prefix="", suffix="") -> str: - keyspace_name = self._verify_keyspace_name(f"{prefix}{name}{suffix}") - query = f"CREATE KEYSPACE IF NOT EXISTS {keyspace_name} " \ - "WITH replication={'class': 'SimpleStrategy', 'replication_factor' : 3}" - LOGGER.debug("Running query: %s", query) - self.session.execute(query=query, execution_profile=self.ARGUS_EXECUTION_PROFILE) - self.session.set_keyspace(keyspace_name) - self._keyspace_initialized = True - return keyspace_name - - def is_native_type(self, object_type): - return self.PYTHON_SCYLLA_TYPE_MAPPING.get(object_type, False) - - def init_table(self, table_name: str, column_info: dict[str, ColumnInfo]): - # pylint: disable=too-many-locals - if not self._keyspace_initialized: - raise ArgusInterfaceDatabaseConnectionError("Uninitialized keyspace, cannot continue") - - if self.initialized_tables.get(table_name): - return True, f"Table {table_name} already initialized" - - primary_keys_info: dict = column_info.pop("$tablekeys$") - clustering_order_info: dict = column_info.pop("$clustering_order$") - indices_info: dict = column_info.pop("$indices$") - partition_keys = [key for key, (cls, pk_type) in primary_keys_info.items() if pk_type == "partition"] - partition_key_def = partition_keys[0] if len(partition_keys) == 1 else f"({', '.join(partition_keys)})" - clustering_columns = [key for key, (cls, pk_type) in primary_keys_info.items() if pk_type == "clustering"] - clustering_column_def = ", ".join(clustering_columns) - clustering_order = [f"{key} {clustering_order_info[key]}" for key in clustering_columns + - partition_keys if clustering_order_info.get(key)] - clustering_order_def = ", ".join(clustering_order) - clustering_statement = f"WITH CLUSTERING ORDER BY ({clustering_order_def})" if len(clustering_order) > 0 else "" - - self._table_keys[table_name] = primary_keys_info - primary_key_def = f"{partition_key_def}, {clustering_column_def}" if len( - clustering_column_def) > 0 else partition_key_def - query = "CREATE TABLE IF NOT EXISTS {table_name}({columns}, PRIMARY KEY ({pk})) {cs}" - columns_query = [] - for column in column_info.values(): - if mapped_type := self.is_native_type(column.type): - column_type = mapped_type - elif column.type is CollectionHint: - column_type = self.create_collection_declaration(column.value.stored_type) - else: - # UDT - column_type = f"frozen<{self._init_user_data_type(column.type)}>" - - constraints = " ".join(column.constraints) - column_query = f"{column.name} {column_type} {constraints}" - columns_query.append(column_query) - - columns_query = ", ".join(columns_query) - completed_query = query.format(table_name=table_name, columns=columns_query, - pk=primary_key_def, cs=clustering_statement) - LOGGER.debug("About to execute: \"%s\"", completed_query) - self.session.execute(query=completed_query, execution_profile=self.ARGUS_EXECUTION_PROFILE) - self.create_indices(table_name, indices=indices_info) - self.initialized_tables[table_name] = True - return True, "Initialization complete" - - def create_indices(self, table_name, indices): - for index in indices: - self.session.execute(f"CREATE INDEX IF NOT EXISTS ON {table_name}({index})") - return True - - def create_collection_declaration(self, hint: GenericAlias): - collection_type = get_type_origin(hint) - collection_types = get_type_args(hint) - - declaration_type = collection_type.__name__ - - declared_types = [] - for inner_hint in collection_types: - type_class = get_type_origin(inner_hint) if isinstance(inner_hint, GenericAlias) else inner_hint - - if type_class is tuple or type_class is list: - declaration = f"frozen<{self.create_collection_declaration(inner_hint)}>" - elif matched_type := self.PYTHON_SCYLLA_TYPE_MAPPING.get(type_class): - declaration = matched_type - else: - declaration = f"frozen<{self._init_user_data_type(type_class)}>" - - declared_types.append(declaration) - - declaration_query = ", ".join(declared_types) if collection_type is tuple else str(declared_types[0]) - - return f"{declaration_type}<{declaration_query}>" - - def _init_user_data_type(self, cls: ArgusUDTBase): - if not self._keyspace_initialized: - raise ArgusInterfaceDatabaseConnectionError("Uninitialized keyspace, cannot continue") - - udt_name = cls.basename() - - if cls in self._mapped_udts.get(self._current_keyspace, []): - return udt_name - - query = "CREATE TYPE IF NOT EXISTS {name} ({fields})" - fields = [] - for field in dataclass_fields(cls): - name = field.name - field_type = get_type_origin(field.type) if isinstance(field.type, GenericAlias) else field.type - if field_type is list or field_type is tuple: - field_declaration = self.create_collection_declaration(field.type) - elif matched_type := self.PYTHON_SCYLLA_TYPE_MAPPING.get(field_type): - field_declaration = matched_type - else: - field_declaration = f"frozen<{self._init_user_data_type(field.type)}>" - fields.append(f"{name} {field_declaration}") - - joined_fields = ", ".join(fields) - - completed_query = query.format(name=udt_name, fields=joined_fields) - LOGGER.debug("About to execute: \"%s\"", completed_query) - self.session.execute(query=completed_query, execution_profile=self.ARGUS_EXECUTION_PROFILE) - - existing_udts = self._mapped_udts.get(self._current_keyspace, []) - existing_udts.append(cls) - self._mapped_udts[self._current_keyspace] = existing_udts - - return udt_name - - def fetch(self, table_name: str, run_id: UUID, where_clause="WHERE id = ?"): - return self._fetch(table_name, (run_id,), where_clause) - - def _fetch(self, table_name: str, params: tuple | list, where_clause: str): - if not self._keyspace_initialized: - raise ArgusInterfaceDatabaseConnectionError("Uninitialized keyspace, cannot continue") - - query = self.prepared_statements.get(f"{table_name}_select_{where_clause.lower().replace(' ', '-')}", - self.prepare_query_for_table(table_name=table_name, query_type="insert", - query=f"SELECT * FROM {table_name} " - f"{where_clause}")) - - cursor = self.session.execute(query=query, parameters=params, execution_profile=self.ARGUS_EXECUTION_PROFILE) - - return cursor.one() - - def fetch_generic(self, table_name: str, params: tuple | list, where_clause: str): - return self._fetch(table_name, params, where_clause) - - def insert(self, table_name: str, run_data: dict): - if not self._keyspace_initialized: - raise ArgusInterfaceDatabaseConnectionError("Uninitialized keyspace, cannot continue") - - query = self.prepared_statements.get(f"{table_name}_insert", - self.prepare_query_for_table(table_name=table_name, query_type="insert", - query=f"INSERT INTO {table_name} JSON ?")) - - self.session.execute(query=query, parameters=(json.dumps(run_data, cls=ArgusJSONEncoder),), - execution_profile=self.ARGUS_EXECUTION_PROFILE) - - def update(self, table_name: str, run_data: dict): - # pylint: disable=too-many-locals - def _convert_data_to_sequence(data: dict) -> list: - data_list = list(data.values()) - for idx, value in enumerate(data_list): - if isinstance(value, dict): - data_list[idx] = _convert_data_to_sequence(value) - elif isinstance(value, list) and len(value) > 0 and isinstance(value[0], dict): - data_list[idx] = [_convert_data_to_sequence(d) for d in value] - return data_list - - primary_keys: dict = self._table_keys.get(table_name) - if not primary_keys: - raise ArgusInterfaceSchemaError(f"Table \"{table_name}\" is not initialized!") - - LOGGER.debug("Primary keys for table %s: %s", table_name, primary_keys) - where_clause = [] - where_params = [] - for key, (_, _) in primary_keys.items(): - LOGGER.debug("Ejecting %s from update set as it is a part of the primary key", key) - try: - data_value = run_data.pop(key) - except KeyError as exc: - raise ArgusInterfaceSchemaError("Missing key from update set", key) from exc - field = f"{key} = ?" - where_clause.append(field) - where_params.append(data_value) - - where_clause_joined = " AND ".join(where_clause) - - if not (prepared_statement := self.prepared_statements.get(f"update_{table_name}")): - field_parameters = [f"\"{field_name}\" = ?" for field_name in run_data.keys()] - fields_joined = ", ".join(field_parameters) - - query = f"UPDATE {table_name} SET {fields_joined} WHERE {where_clause_joined}" - LOGGER.debug("Formatted query: %s", query) - prepared_statement = self.session.prepare(query=query) - self.prepared_statements[f"update_{table_name}"] = prepared_statement - - LOGGER.debug("Bound query for update: %s", prepared_statement.query_string) - query_parameters = _convert_data_to_sequence(run_data) - parameters = [*query_parameters, *where_params] - LOGGER.debug("Parameters: %s", parameters) - self.session.execute(prepared_statement, parameters=parameters, execution_profile=self.ARGUS_EXECUTION_PROFILE) diff --git a/argus/db/testrun.py b/argus/db/testrun.py deleted file mode 100644 index 2dcf2542..00000000 --- a/argus/db/testrun.py +++ /dev/null @@ -1,740 +0,0 @@ -# TODO: Deprecated, will be removed once REST API client is ready -import logging -import datetime -import time -import traceback -import sys -import threading -from dataclasses import asdict, is_dataclass, fields, Field, dataclass -from typing import Any -from uuid import uuid4, UUID - -from argus.db.config import BaseConfig -from argus.db.utils import is_list_homogeneous -from argus.db.cloud_types import CloudResource, CloudInstanceDetails, BaseCloudSetupDetails -from argus.db.interface import ArgusDatabase -from argus.db.db_types import ColumnInfo, CollectionHint, NemesisRunInfo, TestStatus, TestInvestigationStatus, \ - EventsBySeverity, PackageVersion -from argus.backend.models.web import ArgusRelease, ArgusGroup, ArgusTest, ArgusSchedule, ArgusScheduleAssignee, ArgusScheduleGroup, \ - ArgusScheduleTest - -LOGGER = logging.getLogger(__name__) - - -class TestInfoSerializationError(Exception): - pass - - -class TestInfoSchemaError(Exception): - pass - - -class TestInfoValueError(Exception): - pass - - -class BaseTestInfo: - EXPOSED_ATTRIBUTES = {} - ATTRIBUTE_CONSTRAINTS = {} - COLLECTION_HINTS = {} - - def __init__(self, *args, **kwargs): - pass - - @classmethod - def create_skeleton(cls): - pass - - @classmethod - def schema(cls): - data = {} - for attr, column_type in cls.EXPOSED_ATTRIBUTES.items(): - value = None - if column_type is list or column_type is tuple: - value = cls.schema_process_collection(attr) - column_type = CollectionHint - constraints = cls.ATTRIBUTE_CONSTRAINTS.get(attr, []) - column_info = ColumnInfo( - name=attr, type=column_type, value=value, constraints=constraints) - data[attr] = column_info - - return data - - @classmethod - def schema_process_collection(cls, attr_name: str): - hint = cls.COLLECTION_HINTS.get(attr_name) - if not hint: - raise TestInfoSchemaError( - "Encountered a collection and no collection hint was found") - - return hint - - def serialize(self): - data = {} - for attr in self.EXPOSED_ATTRIBUTES: - attribute_value = getattr(self, attr) - if isinstance(attribute_value, list): - attribute_value = self._process_list(attribute_value) - elif is_dataclass(attribute_value): - attribute_value = asdict(attribute_value) - - data[attr] = attribute_value - - return data - - @staticmethod - def _process_list(list_to_check: list[Any]): - if len(list_to_check) == 0: - return list_to_check - - if not is_list_homogeneous(list_to_check): - raise TestInfoSerializationError("Detected a non-homogenous list") - - contains_dataclass = is_dataclass(list_to_check[0]) - - if contains_dataclass: - return [asdict(dc) for dc in list_to_check] - - return list_to_check - - @classmethod - def __get_validators__(cls): - yield cls.validate - - @classmethod - def validate(cls, value, field): # pylint: disable=unused-argument - return value - - -class TestDetails(BaseTestInfo): - # pylint: disable=too-many-instance-attributes - EXPOSED_ATTRIBUTES = {"scm_revision_id": str, "started_by": str, "build_job_url": str, - "start_time": datetime.datetime, "end_time": datetime.datetime, - "config_files": list, "packages": list, "scylla_version": str, - "yaml_test_duration": int, - } - COLLECTION_HINTS = { - "packages": CollectionHint(list[PackageVersion]), - "config_files": CollectionHint(list[str]), - } - - def __init__(self, scm_revision_id: str, - started_by: str, build_job_url: str, - yaml_test_duration: int, start_time: datetime, - config_files: list[str], packages: list[PackageVersion], - end_time: datetime.datetime = datetime.datetime.utcfromtimestamp(0), - scylla_version: str | None = None): - # pylint: disable=too-many-arguments - super().__init__() - self.scm_revision_id = scm_revision_id - self.started_by = started_by - self.build_job_url = build_job_url - self.start_time = start_time - self.yaml_test_duration = yaml_test_duration - if not (is_list_homogeneous(packages) or ( - len(packages) > 0 and isinstance(next(iter(packages)), PackageVersion))): - raise TestInfoValueError( - "Package list contains incorrect values", packages) - self.packages = packages - self.config_files = config_files - self.end_time = end_time - self.scylla_version = scylla_version - - @classmethod - def from_db_row(cls, row): - if row.packages: - packages = [PackageVersion.from_db_udt( - udt) for udt in row.packages] - else: - packages = [] - - config_files = row.config_files if row.config_files else [] - - return cls(scm_revision_id=row.scm_revision_id, started_by=row.started_by, build_job_url=row.build_job_url, - start_time=row.start_time, end_time=row.end_time, yaml_test_duration=row.yaml_test_duration, - config_files=config_files, scylla_version=row.scylla_version, - packages=packages) - - def set_test_end_time(self): - self.end_time = datetime.datetime.utcnow().replace(microsecond=0) - - -class TestResourcesSetup(BaseTestInfo): - EXPOSED_ATTRIBUTES = { - "sct_runner_host": CloudInstanceDetails, - "region_name": list, - "cloud_setup": BaseCloudSetupDetails - } - COLLECTION_HINTS = { - "region_name": CollectionHint(list[str]), - } - - def __init__(self, sct_runner_host: CloudInstanceDetails, - region_name: list[str], cloud_setup: BaseCloudSetupDetails): - super().__init__() - self.sct_runner_host = sct_runner_host - self.region_name = region_name - self.cloud_setup = cloud_setup - - @classmethod - def from_db_row(cls, row): - runner = CloudInstanceDetails.from_db_udt(row.sct_runner_host) - cloud_setup = BaseCloudSetupDetails.from_db_udt(row.cloud_setup) - - regions = row.region_name if row.region_name else [] - - return cls(sct_runner_host=runner, region_name=regions, - cloud_setup=cloud_setup) - - -class TestLogs(BaseTestInfo): - EXPOSED_ATTRIBUTES = {"logs": list} - COLLECTION_HINTS = { - "logs": CollectionHint(list[tuple[str, str]]) - } - - def __init__(self): - super().__init__() - self._log_collection = [] - - def add_log(self, log_type: str, log_url: str | list[str]) -> None: - if isinstance(log_url, str): - self._log_collection.append((log_type, log_url)) - elif isinstance(log_url, list): - for log in log_url: - self._log_collection.append((log_type, log)) - else: - LOGGER.warning("Unknown log type encountered: %s", log_url) - - @property - def logs(self) -> list[tuple[str, str]]: - return self._log_collection - - @classmethod - def from_db_row(cls, row): - logs = cls() - if row.logs: - for log_type, log_url in row.logs: - logs.add_log(log_type, log_url) - - return logs - - -class TestResources(BaseTestInfo): - EXPOSED_ATTRIBUTES = {"allocated_resources": list} - COLLECTION_HINTS = { - "allocated_resources": CollectionHint(list[CloudResource]), - } - - def __init__(self): - super().__init__() - self._allocated_resources = [] - - def attach_resource(self, resource: CloudResource): - self._allocated_resources.append(resource) - self._allocated_resources.sort(key=lambda v: v.name) - - def detach_resource(self, resource: CloudResource, reason: str = "unspecified reason"): - resource_to_detach = next( - r for r in self._allocated_resources if r == resource) - resource_to_detach.terminate(reason=reason) - - @property - def allocated_resources(self) -> list[CloudResource]: - return self._allocated_resources - - @classmethod - def from_db_row(cls, row): - resources = cls() - resource_row = row.allocated_resources if row.allocated_resources else [] - for resource in resource_row: - cloud_resource = CloudResource.from_db_udt(resource) - resources.allocated_resources.append(cloud_resource) - resources.allocated_resources.sort(key=lambda v: v.name) - return resources - - -class TestResults(BaseTestInfo): - # pylint: disable=too-many-arguments - EXPOSED_ATTRIBUTES = {"status": str, "events": list, - "nemesis_data": list, "screenshots": list} - COLLECTION_HINTS = { - "events": CollectionHint(list[EventsBySeverity]), - "nemesis_data": CollectionHint(list[NemesisRunInfo]), - "screenshots": CollectionHint(list[str]), - } - - def __init__(self, status: TestStatus, events: list[EventsBySeverity] = None, - nemesis_data: list[NemesisRunInfo] = None, screenshots: list[str] = None, - max_stored_events=25): - super().__init__() - if isinstance(status, TestStatus): - self._status = status.value - else: - self._status = TestStatus(status).value - self.events = events if events else [] - self.nemesis_data = nemesis_data if nemesis_data else [] - self.screenshots = screenshots if screenshots else [] - self.max_stored_events = max_stored_events - - @classmethod - def from_db_row(cls, row): - if row.events: - events = [EventsBySeverity.from_db_udt( - event) for event in row.events] - else: - events = [] - - if row.nemesis_data: - nemesis_data = [NemesisRunInfo.from_db_udt( - nemesis) for nemesis in row.nemesis_data] - else: - nemesis_data = [] - - if row.screenshots: - screenshots = row.screenshots - else: - screenshots = [] - - return cls(status=row.status, events=events, nemesis_data=nemesis_data, screenshots=screenshots) - - def _add_new_event_type(self, event: EventsBySeverity): - if len([v for v in self.events if v.severity == event.severity]) > 0: - raise TestInfoValueError( - f"Severity event collection {event.severity} already exists in TestResults") - - self.events.append(event) - - def _collect_event_message(self, event: EventsBySeverity, message: str): - if len(event.last_events) >= self.max_stored_events: - event.last_events = event.last_events[1:] - - event.event_amount += 1 - event.last_events.append(message) - - def add_nemesis(self, nemesis: NemesisRunInfo): - self.nemesis_data.append(nemesis) - - def add_event(self, event_severity: str, event_message: str): - try: - event = next(filter(lambda v: v.severity == - event_severity, self.events)) - except StopIteration: - event = EventsBySeverity( - severity=event_severity, event_amount=0, last_events=[]) - self._add_new_event_type(event) - - self._collect_event_message(event, event_message) - - def add_screenshot(self, screenshot_link: str): - self.screenshots.append(screenshot_link) - - @property - def status(self) -> TestStatus: - return TestStatus(self._status) - - @status.setter - def status(self, value: TestStatus): - self._status = TestStatus(value).value - - -@dataclass -class TestRunInfo: - details: TestDetails - setup: TestResourcesSetup - resources: TestResources - logs: TestLogs - results: TestResults - - -class TestRun: - # pylint: disable=too-many-instance-attributes - EXPOSED_ATTRIBUTES = {"id": UUID, "group_id": UUID, "release_id": UUID, - "build_id": str, "test_id": UUID, - "assignee": UUID, "heartbeat": int, "investigation_status": str} - ATTRIBUTE_CONSTRAINTS = { - } - PRIMARY_KEYS = { - "build_id": (str, "partition"), - "start_time": (datetime.datetime, "clustering"), - } - CLUSTERING_ORDER = { - "start_time": "DESC", - } - INDICES = ["release_id", "group_id", "test_id", "id", "assignee", "status"] - _USING_RUNINFO = TestRunInfo - _TABLE_NAME = "test_runs_v8" - _IS_TABLE_INITIALIZED = False - _ARGUS_DB_INTERFACE = None - - def __init__(self, test_id: UUID, assignee: UUID, build_id: str, - run_info: TestRunInfo, config: BaseConfig = None, argus_interface: ArgusDatabase = None, - investigation_status: str = TestInvestigationStatus.NOT_INVESTIGATED): - # pylint: disable=too-many-arguments - if not test_id: - test_id = uuid4() - self._save_lock = threading.Lock() - self._id = test_id - self._build_id = build_id - self._group_id = None - self._release_id = None - self._test_id = None - self._assignee = assignee - self._investigation_status = investigation_status - self._run_info = run_info - self._heartbeat = int(time.time()) - self._config = config - for field in fields(run_info): - setattr(self, field.name, getattr(run_info, field.name)) - - if argus_interface: - self.argus = argus_interface - - @classmethod - def table_name(cls) -> str: - return cls._TABLE_NAME - - @classmethod - def from_db_row(cls, row, config: BaseConfig = None): - if not cls._IS_TABLE_INITIALIZED: - cls.init_own_table(config) - nested_fields = {} - for field in fields(cls._USING_RUNINFO): - nested_fields[field.name] = field.type.from_db_row(row) - - run_info = cls._USING_RUNINFO(**nested_fields) - run = cls(test_id=row.id, assignee=row.assignee, build_id=row.build_id, - run_info=run_info, investigation_status=row.investigation_status) - run.heartbeat = row.heartbeat - run.group_id = row.group_id - run.release_id = row.release_id - run.test_id = row.test_id - return run - - @classmethod - def from_id(cls, test_id: UUID, config: BaseConfig = None): - if not cls._IS_TABLE_INITIALIZED: - cls.init_own_table(config) - database = cls.get_argus() - if row := database.fetch(cls._TABLE_NAME, test_id): - return cls.from_db_row(row) - - return None - - @classmethod - def from_pk(cls, pk: tuple[str, datetime.datetime], config: BaseConfig = None): - if not cls._IS_TABLE_INITIALIZED: - cls.init_own_table(config) - database = cls.get_argus() - if row := database.fetch_generic(cls._TABLE_NAME, pk, "WHERE build_id = ? and start_time = ?"): - return cls.from_db_row(row) - - return None - - @classmethod - def get_argus(cls, config: BaseConfig = None) -> ArgusDatabase: - if not cls._ARGUS_DB_INTERFACE: - cls._ARGUS_DB_INTERFACE = ArgusDatabase(config=config) - return cls._ARGUS_DB_INTERFACE - - @classmethod - def set_argus(cls, argus_interface: ArgusDatabase): - cls._ARGUS_DB_INTERFACE = argus_interface - cls._IS_TABLE_INITIALIZED = False - - @property - def argus(self) -> ArgusDatabase: - if not self._ARGUS_DB_INTERFACE: - self.get_argus(self._config) - return self._ARGUS_DB_INTERFACE - - @argus.setter - def argus(self, interface: ArgusDatabase | None): - self._ARGUS_DB_INTERFACE = interface # pylint: disable=invalid-name - self._IS_TABLE_INITIALIZED = False # pylint: disable=invalid-name - - @property - def heartbeat(self) -> int: - return self._heartbeat - - @heartbeat.setter - def heartbeat(self, value: int | float): - self._heartbeat = int(value) - - @property - def build_id(self) -> str: - return self._build_id - - @build_id.setter - def build_id(self, value: str) -> None: - self._build_id = str(value) - - @property - def id(self) -> UUID: # pylint: disable=invalid-name - return self._id - - @property - def group_id(self) -> UUID: - return self._group_id - - @property - def release_id(self) -> UUID: - return self._release_id - - @property - def test_id(self) -> UUID: - return self._test_id - - @release_id.setter - def release_id(self, value: UUID) -> None: - self._release_id = value - - @group_id.setter - def group_id(self, value: UUID) -> None: - self._group_id = value - - @test_id.setter - def test_id(self, value: UUID) -> None: - self._test_id = value - - @property - def assignee(self) -> UUID: - return self._assignee - - @assignee.setter - def assignee(self, value): - self._assignee = value - - @property - def investigation_status(self) -> str: - return self._investigation_status - - @investigation_status.setter - def investigation_status(self, value: TestInvestigationStatus | str): - self._investigation_status = TestInvestigationStatus(value) - - def serialize(self) -> dict[str, Any]: - LOGGER.debug("Serializing test run...") - nested_data = {} - for field in fields(self._USING_RUNINFO): - field: Field - value: BaseTestInfo = getattr(self, field.name) - nested_data = { - **nested_data, - **value.serialize() - } - - data = { - "build_id": self._build_id, - "id": self._id, - "group_id": self._group_id, - "release_id": self._release_id, - "test_id": self._test_id, - "assignee": self._assignee, - "heartbeat": self._heartbeat, - "investigation_status": self._investigation_status, - **nested_data - } - LOGGER.debug("Serialized Data: %s", data) - return data - - @classmethod - def init_own_table(cls, config: BaseConfig = None): - LOGGER.debug("Initializing TestRun table...") - cls.get_argus(config).init_table( - table_name=cls._TABLE_NAME, column_info=cls.schema()) - cls._IS_TABLE_INITIALIZED = True - - @classmethod - def set_table_name(cls, new_table_name: str): - cls._TABLE_NAME = new_table_name - cls._IS_TABLE_INITIALIZED = False - - @classmethod - def schema(cls) -> dict[str, ColumnInfo]: - data = {} - LOGGER.debug("Dumping full schema...") - for attr, column_type in cls.EXPOSED_ATTRIBUTES.items(): - value = None - constraints = cls.ATTRIBUTE_CONSTRAINTS.get(attr, []) - column_info = ColumnInfo( - name=attr, type=column_type, value=value, constraints=constraints) - data[attr] = column_info - - schema_dump = {} - for field in fields(cls._USING_RUNINFO): - schema_dump = { - **schema_dump, - **field.type.schema(), - } - - full_schema = dict( - **{"$tablekeys$": cls.PRIMARY_KEYS}, - **{"$clustering_order$": cls.CLUSTERING_ORDER}, - **{"$indices$": cls.INDICES}, - **data, - **schema_dump - ) - LOGGER.debug("Full Schema: %s", full_schema) - return full_schema - - def save(self): - with self._save_lock: - if not self._IS_TABLE_INITIALIZED: - self.init_own_table(self._config) - if not self.exists(): - self._assign_categories() - if not self.assignee: - try: - self.assignee = self._get_current_assignee_from_schedule() - except Exception: # pylint: disable=broad-except - LOGGER.warning("Error getting assignee from database") - LOGGER.debug("Details: ", exc_info=True) - - LOGGER.debug("Inserting data for test run: %s", self.id) - self.argus.insert(table_name=self._TABLE_NAME, - run_data=self.serialize()) - else: - LOGGER.debug("Updating data for test run: %s", self.id) - self.argus.update(table_name=self._TABLE_NAME, - run_data=self.serialize()) - - def exists(self) -> bool: - if not self._IS_TABLE_INITIALIZED: - self.init_own_table(self._config) - - if self.argus.fetch(table_name=self._TABLE_NAME, run_id=self.id): - return True - return False - - def _assign_categories(self): - key = self._build_id - try: - test: ArgusTest = ArgusTest.using( - connection=self.argus.CQL_ENGINE_CONNECTION_NAME - ).get(build_system_id=key) - self.release_id = test.release_id - self.group_id = test.group_id - self.test_id = test.id - except ArgusTest.DoesNotExist: - LOGGER.warning( - "Test entity missing for key \"%s\", run won't be visible until this is corrected", key) - - def _get_current_assignee_from_schedule(self) -> UUID: - """ - Iterate over all schedules (groups and tests) and return first available assignee - """ - associated_test = ArgusTest.using( - connection=self.argus.CQL_ENGINE_CONNECTION_NAME - ).get(build_system_id=self.build_id) - associated_group = ArgusGroup.using( - connection=self.argus.CQL_ENGINE_CONNECTION_NAME - ).get(id=associated_test.group_id) - associated_release = ArgusRelease.using( - connection=self.argus.CQL_ENGINE_CONNECTION_NAME - ).get(id=associated_test.release_id) - - scheduled_groups = ArgusScheduleGroup.filter( - release_id=associated_release.id, group_id=associated_group.id - ).all().using( - connection=self.argus.CQL_ENGINE_CONNECTION_NAME - ) - - scheduled_tests = ArgusScheduleTest.filter( - release_id=associated_release.id, test_id=associated_test.id - ).all().using( - connection=self.argus.CQL_ENGINE_CONNECTION_NAME - ) - - unique_schedule_ids = {scheduled_obj.schedule_id for scheduled_obj in [ - *scheduled_tests, *scheduled_groups]} - - schedules = ArgusSchedule.filter( - release_id=associated_release.id, id__in=tuple( - unique_schedule_ids) - ).all().using( - connection=self.argus.CQL_ENGINE_CONNECTION_NAME - ) - - today = datetime.datetime.utcnow() - - valid_schedules = [] - for schedule in schedules: - if schedule.period_start <= today <= schedule.period_end: - valid_schedules.append(schedule) - - assignees_uuids = [] - for schedule in valid_schedules: - assignees = ArgusScheduleAssignee.filter( - schedule_id=schedule.id - ).all().using( - connection=self.argus.CQL_ENGINE_CONNECTION_NAME - ) - assignees_uuids.append( - *[assignee.assignee for assignee in assignees]) - - return assignees_uuids[0] if len(assignees_uuids) > 0 else None - - def shutdown(self): - LOGGER.debug("Shutting down cluster connection...") - self.argus.cluster.shutdown() - - @property - def run_info(self) -> TestRunInfo: - return self._run_info - - -class TestRunWithHeartbeat(TestRun): - def __init__(self, test_id: UUID, assignee: UUID, build_id: str, - run_info: TestRunInfo, heartbeat_interval=30, config: BaseConfig = None, - argus_interface: ArgusDatabase = None, investigation_status: str = TestInvestigationStatus.NOT_INVESTIGATED,): - # pylint: disable=too-many-arguments - self._heartbeat_interval = heartbeat_interval - self._shutdown_event = threading.Event() - super().__init__(test_id=test_id, assignee=assignee, build_id=build_id, - investigation_status=investigation_status, run_info=run_info, - config=config, argus_interface=argus_interface) - self._thread = threading.Thread(target=self._heartbeat_entry, - name=f"{self.__class__.__name__}-{self.id}-heartbeat", daemon=True) - self._heartbeat_statement = self.argus.session.prepare( - f"UPDATE {TestRun.table_name()} SET heartbeat = ? WHERE build_id = ? AND start_time = ?") - self._thread.start() - - @property - def heartbeat_interval(self) -> int: - return self._heartbeat_interval - - @heartbeat_interval.setter - def heartbeat_interval(self, value: float | int): - self._heartbeat_interval = value - - @property - def thread(self): - return self._thread - - def _heartbeat_entry(self): - while True: - time.sleep(self.heartbeat_interval) - if self._shutdown_event.is_set(): - break - LOGGER.debug("Sending heartbeat...") - self.heartbeat = time.time() - bound_statement = self._heartbeat_statement.bind( - (self.heartbeat, self.build_id, self.run_info.details.start_time)) - self.argus.session.execute(bound_statement) - LOGGER.debug("Heartbeat exit") - - def shutdown(self): - self._shutdown_event.set() - LOGGER.debug("Waiting for the heartbeat thread to exit...") - self._thread.join(timeout=self.heartbeat_interval + 10) - if self._thread.is_alive(): - LOGGER.warning( - "Heartbeat thread was not able to shut down correctly. Stack trace:") - # pylint: disable=protected-access - current_threads = sys._current_frames() - stack_trace = traceback.extract_stack( - current_threads[self._thread.ident]) - LOGGER.warning( - "\n".join([f'#{lineno:3} : {line:50}: {fname}' for fname, lineno, _, line in stack_trace])) - super().shutdown() diff --git a/argus/db/utils.py b/argus/db/utils.py deleted file mode 100644 index 7be1daf8..00000000 --- a/argus/db/utils.py +++ /dev/null @@ -1,15 +0,0 @@ -from typing import TypeVar - -T = TypeVar("T") - - -def is_list_homogeneous(list_to_check: list[T]) -> bool: - if not len(list_to_check): - return True - - first, *_ = list_to_check - first_type = type(first) - - filtered_list = list(filter(lambda val: type(val) != first_type, list_to_check)) - - return len(filtered_list) == 0 diff --git a/pyproject.toml b/pyproject.toml index 09f9cb0e..8a8d8e91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,12 +1,17 @@ [tool.poetry] name = "argus-alm" -version = "0.12.10" +version = "0.13.0" description = "Argus" authors = ["Alexey Kartashov ", "Łukasz Sójka "] license = "Apache-2.0" repository = "https://github.com/scylladb/argus" readme = "README.md" -packages = [{include = "argus"}] +packages = [ + { include = "argus/client" }, + { include = "argus/common" }, + { include = "argus/__init__.py" }, +] +exclude = ["argus/client/tests"] [tool.poetry.group.web-backend] optional = true diff --git a/scripts/download_runs_from_prod.py b/scripts/download_runs_from_prod.py deleted file mode 100644 index 728fc92c..00000000 --- a/scripts/download_runs_from_prod.py +++ /dev/null @@ -1,38 +0,0 @@ -import os -import json -import logging - -from pathlib import Path -from time import time -from argus.db.argus_json import ArgusJSONEncoder -from argus.db.testrun import TestRun -from argus.db.interface import ArgusDatabase -from argus.db.config import FileConfig - -logging.basicConfig() -LOGGER = logging.getLogger() -LOGGER.setLevel(logging.DEBUG) - -SAVE_DIR = Path(f"./migration/{TestRun.table_name()}-{int(time())}") -PROD_INTERFACE = ArgusDatabase(FileConfig("./argus.local.prod.yaml")) -PROD_INTERFACE.execution_profile.request_timeout = 600 - -ALL_ROWS = list(PROD_INTERFACE.session.execute("SELECT id FROM test_runs_v8").all()) -TestRun.set_argus(PROD_INTERFACE) - -if not (p := Path("./migration")).exists(): - os.mkdir(p.absolute()) - -if Path("./migration/latest").exists(): - os.unlink("./migration/latest") - -os.mkdir(SAVE_DIR.absolute()) -os.symlink(SAVE_DIR.absolute(), "./migration/latest", target_is_directory=True) - -total_rows = len(ALL_ROWS) -print(f"Total rows fetched: {total_rows}") -for idx, row in enumerate(ALL_ROWS): - LOGGER.info(f"[%s/%s] Processing id:%s", idx+1, total_rows, row["id"]) - tr = TestRun.from_id(row["id"]) - with open(SAVE_DIR / f"{row['id']}.json", "wt") as f: - json.dump(tr.serialize(), f, cls=ArgusJSONEncoder) diff --git a/scripts/fix_run_metadata.py b/scripts/fix_run_metadata.py deleted file mode 100644 index 0405354b..00000000 --- a/scripts/fix_run_metadata.py +++ /dev/null @@ -1,20 +0,0 @@ -import logging -from argus.backend.db import ScyllaCluster -from argus.db.testrun import TestRun - -logging.basicConfig() -LOGGER = logging.getLogger() -LOGGER.setLevel(logging.DEBUG) - - -DB = ScyllaCluster.get() -DB.execution_profile.request_timeout = 600 -ALL_RUNS = list(DB.session.execute(f"SELECT id FROM {TestRun.table_name()}").all()) -TOTAL_RUNS = len(ALL_RUNS) - -for idx, row in enumerate(ALL_RUNS): - LOGGER.info("[%s/%s] Fixing %s...", idx+1, TOTAL_RUNS, row["id"]) - tr = TestRun.from_id(test_id=row["id"]) - LOGGER.info("Loaded %s:%s/%s/%s", tr.build_id, tr.test_id, tr.group_id, tr.release_id) - tr._assign_categories() - tr.save() diff --git a/scripts/migration/migrate_testruns_2022-06-12.py b/scripts/migration/migrate_testruns_2022-06-12.py deleted file mode 100644 index 40525f71..00000000 --- a/scripts/migration/migrate_testruns_2022-06-12.py +++ /dev/null @@ -1,35 +0,0 @@ -import logging -from collections import namedtuple - -from cassandra.cluster import Session - -from argus.db.testrun import TestRun -from argus.backend.db import ScyllaCluster - -logging.basicConfig() -LOGGER = logging.getLogger() -LOGGER.setLevel(logging.DEBUG) - - -db = ScyllaCluster.get() - -session: Session = db.session -OLD_TABLE_NAME = "test_runs_v7" -stmt = db.prepare(f"INSERT INTO {TestRun.table_name()} JSON ?") - -rows: list[dict] = list(session.execute(f"SELECT * FROM {OLD_TABLE_NAME}").all()) -total_rows = len(rows) -MigratedRow = namedtuple("MigratedRow", [*rows[0].keys(), "scylla_version"]) - -for idx, row in enumerate(rows): - LOGGER.info("[%s/%s] Migrating %s to the new table...", idx + 1, total_rows, row["id"]) - try: - scylla_package = next(p for p in row["packages"] if p.name == "scylla-server") - version = scylla_package.version.replace("~", ".") - except TypeError: - version = None - except StopIteration: - version = None - migrated_row = MigratedRow(**row, scylla_version=version) - tr = TestRun.from_db_row(migrated_row) - tr.save() diff --git a/scripts/upload_runs_to_active_db.py b/scripts/upload_runs_to_active_db.py deleted file mode 100644 index f89e1960..00000000 --- a/scripts/upload_runs_to_active_db.py +++ /dev/null @@ -1,34 +0,0 @@ -import logging -import glob - -from pathlib import Path -from argus.backend.db import ScyllaCluster -from argus.db.testrun import TestRun - -logging.basicConfig() -LOGGER = logging.getLogger() -LOGGER.setLevel(logging.DEBUG) - -LOAD_DIR = Path("./migration/latest") -DB = ScyllaCluster.get() - -if not LOAD_DIR.exists(): - exit(1) - -ALL_RUNS = glob.glob(f"{LOAD_DIR.absolute()}/*.json") -TOTAL_RUNS = len(ALL_RUNS) -LOAD_STMT = DB.prepare(f"INSERT INTO {TestRun.table_name()} JSON ?") -EXISTING_ROWS = list(DB.session.execute(f"SELECT id FROM {TestRun.table_name()}").all()) - -if len(EXISTING_ROWS) > 0: - LOGGER.error("Found rows in the TestRun table, please truncate it manually") - exit(1) - -for idx, filepath in enumerate(ALL_RUNS): - LOGGER.info("[%s/%s] Loading %s...", idx+1, TOTAL_RUNS, filepath) - f = open(filepath, "rt", encoding="utf-8") - content = f.read() - try: - DB.session.execute(LOAD_STMT, parameters=(content,)) - except Exception as e: - LOGGER.error("[%s] Failed to load! JSON: %s", e.args[0], content)