From c4261fe1c51fd852689ba8864c566553874deb61 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 26 Sep 2023 17:45:32 +0200 Subject: [PATCH 1/3] Issue #74 prime-caches tool: add initial CLI for config loading and failure handling --- .../background/prime_caches.py | 78 +++++++++++++++---- 1 file changed, 64 insertions(+), 14 deletions(-) diff --git a/src/openeo_aggregator/background/prime_caches.py b/src/openeo_aggregator/background/prime_caches.py index d2a269c4..4ba21990 100644 --- a/src/openeo_aggregator/background/prime_caches.py +++ b/src/openeo_aggregator/background/prime_caches.py @@ -1,3 +1,6 @@ +import argparse +import contextlib +import functools import logging from pathlib import Path from typing import Any, Optional, Sequence, Union @@ -8,7 +11,11 @@ from openeo_aggregator.app import get_aggregator_logging_config from openeo_aggregator.backend import AggregatorBackendImplementation -from openeo_aggregator.config import AggregatorConfig, get_config +from openeo_aggregator.config import ( + OPENEO_AGGREGATOR_CONFIG, + AggregatorConfig, + get_config, +) from openeo_aggregator.connection import MultiBackendConnection _log = logging.getLogger(__name__) @@ -34,13 +41,45 @@ def __getattr__(self, name): return getattr(self.target, name) -def main(config: Union[str, Path, AggregatorConfig, None] = None): - """CLI entrypoint""" - setup_logging(config=get_aggregator_logging_config(context="background-task")) - prime_caches(config=config) +FAIL_MODE_FAILFAST = "failfast" +FAIL_MODE_WARN = "warn" + +def main(): + """CLI entrypoint""" + cli = argparse.ArgumentParser() + cli.add_argument( + "--config", + default=None, + help=f"Optional: aggregator config to load (instead of env var {OPENEO_AGGREGATOR_CONFIG} based resolution).", + ) + cli.add_argument( + "--require-zookeeper-writes", + action="store_true", + help="Fail if no ZooKeeper writes were done", + ) + cli.add_argument( + "--fail-mode", + choices=[FAIL_MODE_FAILFAST, FAIL_MODE_WARN], + default=FAIL_MODE_FAILFAST, + help="Fail mode: fail fast or warn (just log failures but keep going if possible).", + ) + + arguments = cli.parse_args() -def prime_caches(config: Union[str, Path, AggregatorConfig, None] = None): + setup_logging(config=get_aggregator_logging_config(context="background-task")) + prime_caches( + config=arguments.config, + require_zookeeper_writes=arguments.require_zookeeper_writes, + fail_mode=arguments.fail_mode, + ) + + +def prime_caches( + config: Union[str, Path, AggregatorConfig, None] = None, + require_zookeeper_writes: bool = False, + fail_mode: str = FAIL_MODE_FAILFAST, +): with TimingLogger(title="Prime caches", logger=_log): config: AggregatorConfig = get_config(config) _log.info(f"Using config: {config.get('config_source')=}") @@ -53,15 +92,23 @@ def prime_caches(config: Union[str, Path, AggregatorConfig, None] = None): backends = MultiBackendConnection.from_config(config) backend_implementation = AggregatorBackendImplementation(backends=backends, config=config) + if fail_mode == FAIL_MODE_FAILFAST: + # Do not intercept any exceptions. + fail_handler = contextlib.nullcontext + elif fail_mode == FAIL_MODE_WARN: + fail_handler = functools.partial(just_log_exceptions, log=_log) + else: + raise ValueError(fail_mode) + with TimingLogger(title="General capabilities", logger=_log): - with just_log_exceptions(log=_log): + with fail_handler(): backends.get_api_versions() - with just_log_exceptions(log=_log): + with fail_handler(): backend_implementation.file_formats() - with just_log_exceptions(log=_log): + with fail_handler(): backend_implementation.secondary_services.service_types() - with just_log_exceptions(log=_log): + with fail_handler(): with TimingLogger(title="Get full collection listing", logger=_log): collections_metadata = backend_implementation.catalog.get_all_metadata() @@ -69,14 +116,17 @@ def prime_caches(config: Union[str, Path, AggregatorConfig, None] = None): collection_ids = [m["id"] for m in collections_metadata] for c, collection_id in enumerate(collection_ids): _log.info(f"get collection {c+1}/{len(collection_ids)} {collection_id}") - with just_log_exceptions(log=_log): + with fail_handler(): backend_implementation.catalog.get_collection_metadata(collection_id=collection_id) with TimingLogger(title="Get merged processes", logger=_log): - with just_log_exceptions(log=_log): + with fail_handler(): backend_implementation.processing.get_merged_process_metadata() - _log.info(f"Zookeeper stats: {kazoo_stats}") + zk_writes = sum(kazoo_stats.get(k, 0) for k in ["create", "set"]) + _log.info(f"ZooKeeper stats: {kazoo_stats=} {zk_writes=}") + if require_zookeeper_writes and zk_writes == 0: + raise RuntimeError("No Zookeeper writes.") def _patch_config_for_kazoo_client_stats(config: AggregatorConfig, stats: dict): @@ -95,4 +145,4 @@ def kazoo_client_factory(**kwargs): if __name__ == "__main__": - main(config=Path(__file__).parent.parent.parent.parent / "conf/aggregator.dev.py") + main() From f20e67a9d14fc8dc685c7fbc77d3a484d7ea9d40 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 27 Sep 2023 15:43:41 +0200 Subject: [PATCH 2/3] Issue #74 add cli options for logging e.g. with rotating file expanded test coverage quite a bit too --- .gitignore | 2 + setup.py | 2 +- src/openeo_aggregator/app.py | 10 +- .../background/prime_caches.py | 39 ++++- src/openeo_aggregator/testing.py | 32 ++-- tests/background/test_prime_caches.py | 151 ++++++++++++++++-- tests/conftest.py | 3 + tests/test_caching.py | 5 +- 8 files changed, 211 insertions(+), 33 deletions(-) diff --git a/.gitignore b/.gitignore index f02be8bb..5a90346a 100644 --- a/.gitignore +++ b/.gitignore @@ -116,3 +116,5 @@ temp* # pytest basetemp root (e.g. for tmp_path fixture). pytest-tmp + +openeo_python.log* diff --git a/setup.py b/setup.py index 7faa9f88..fc5e6b09 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ "requests", "attrs", "openeo>=0.17.0", - "openeo_driver>=0.67.0.dev", + "openeo_driver>=0.68.0.dev", "flask~=2.0", "gunicorn~=20.0", "python-json-logger>=2.0.0", diff --git a/src/openeo_aggregator/app.py b/src/openeo_aggregator/app.py index 5dbf9a90..4383525f 100644 --- a/src/openeo_aggregator/app.py +++ b/src/openeo_aggregator/app.py @@ -3,12 +3,14 @@ """ import logging import os -from typing import Any +from pathlib import Path +from typing import Any, List, Optional, Union import flask import openeo_driver.views from openeo_driver.config.load import ConfigGetter from openeo_driver.util.logging import ( + LOG_HANDLER_STDERR_JSON, LOGGING_CONTEXT_FLASK, get_logging_config, setup_logging, @@ -74,10 +76,13 @@ def agg_backends(): def get_aggregator_logging_config( + *, context: str = LOGGING_CONTEXT_FLASK, handler_default_level: str = "DEBUG", + root_handlers: Optional[List[str]] = None, + log_file: Optional[Union[str, Path]] = None, ) -> dict: - root_handlers = ["stderr_json"] + root_handlers = root_handlers or [LOG_HANDLER_STDERR_JSON] if smart_bool(os.environ.get("OPENEO_AGGREGATOR_SIMPLE_LOGGING")): root_handlers = None @@ -94,6 +99,7 @@ def get_aggregator_logging_config( "kazoo": {"level": "WARN"}, }, context=context, + log_file=log_file, ) diff --git a/src/openeo_aggregator/background/prime_caches.py b/src/openeo_aggregator/background/prime_caches.py index 4ba21990..88531980 100644 --- a/src/openeo_aggregator/background/prime_caches.py +++ b/src/openeo_aggregator/background/prime_caches.py @@ -3,11 +3,18 @@ import functools import logging from pathlib import Path -from typing import Any, Optional, Sequence, Union +from typing import Any, List, Optional, Sequence, Union from kazoo.client import KazooClient from openeo.util import TimingLogger -from openeo_driver.util.logging import just_log_exceptions, setup_logging +from openeo_driver.util.logging import ( + LOG_HANDLER_FILE_JSON, + LOG_HANDLER_ROTATING_FILE_JSON, + LOG_HANDLER_STDERR_BASIC, + LOG_HANDLER_STDERR_JSON, + just_log_exceptions, + setup_logging, +) from openeo_aggregator.app import get_aggregator_logging_config from openeo_aggregator.backend import AggregatorBackendImplementation @@ -45,7 +52,7 @@ def __getattr__(self, name): FAIL_MODE_WARN = "warn" -def main(): +def main(args: Optional[List[str]] = None): """CLI entrypoint""" cli = argparse.ArgumentParser() cli.add_argument( @@ -64,10 +71,29 @@ def main(): default=FAIL_MODE_FAILFAST, help="Fail mode: fail fast or warn (just log failures but keep going if possible).", ) + cli.add_argument( + "--log-handler", + dest="log_handlers", + choices=[ + LOG_HANDLER_STDERR_BASIC, + LOG_HANDLER_STDERR_JSON, + LOG_HANDLER_FILE_JSON, + LOG_HANDLER_ROTATING_FILE_JSON, + ], + action="append", + help="Log handlers.", + ) + cli.add_argument( + "--log-file", + help="Log file to use for (rotating) file log handlers.", + ) - arguments = cli.parse_args() + arguments = cli.parse_args(args=args) - setup_logging(config=get_aggregator_logging_config(context="background-task")) + logging_config = get_aggregator_logging_config( + context="background-task", root_handlers=arguments.log_handlers, log_file=arguments.log_file + ) + setup_logging(config=logging_config) prime_caches( config=arguments.config, require_zookeeper_writes=arguments.require_zookeeper_writes, @@ -130,9 +156,10 @@ def prime_caches( def _patch_config_for_kazoo_client_stats(config: AggregatorConfig, stats: dict): + orig_kazoo_client_factory = config.kazoo_client_factory or KazooClient def kazoo_client_factory(**kwargs): _log.info(f"AttrStatsProxy-wrapping KazooClient with {kwargs=}") - zk = KazooClient(**kwargs) + zk = orig_kazoo_client_factory(**kwargs) return AttrStatsProxy( target=zk, to_track=["start", "stop", "create", "get", "set"], diff --git a/src/openeo_aggregator/testing.py b/src/openeo_aggregator/testing.py index 54db1bfb..d0ba9c3f 100644 --- a/src/openeo_aggregator/testing.py +++ b/src/openeo_aggregator/testing.py @@ -1,9 +1,11 @@ +import collections +import dataclasses import datetime import itertools import json import pathlib import time -from typing import Any, List, Optional, Union +from typing import Any, Dict, List, Optional, Tuple, Union from unittest import mock import kazoo @@ -15,6 +17,11 @@ from openeo_aggregator.utils import Clock +@dataclasses.dataclass +class DummyZnodeStat: + last_modified: float = dataclasses.field(default_factory=Clock.time) + + class DummyKazooClient: """ Stand-in object for KazooClient for testing. @@ -22,9 +29,9 @@ class DummyKazooClient: def __init__(self): self.state = "closed" - self.data = {} + self.data: Dict[str, Tuple[bytes, DummyZnodeStat]] = {} - def start(self): + def start(self, timeout: float = 15): assert self.state == "closed" self.state = "open" @@ -32,6 +39,10 @@ def stop(self): assert self.state == "open" self.state = "closed" + @property + def connected(self): + return self.state == "open" + def _assert_open(self): if not self.state == "open": raise kazoo.exceptions.ConnectionClosedError("Connection has been closed") @@ -46,7 +57,7 @@ def create(self, path: str, value, makepath: bool = False): self.create(parent, b"", makepath=makepath) else: raise kazoo.exceptions.NoNodeError - self.data[path] = value + self.data[path] = value, DummyZnodeStat() def exists(self, path): self._assert_open() @@ -56,7 +67,7 @@ def get(self, path): self._assert_open() if path not in self.data: raise kazoo.exceptions.NoNodeError() - return (self.data[path], None) + return self.data[path] def get_children(self, path): self._assert_open() @@ -69,22 +80,19 @@ def set(self, path, value, version=-1): self._assert_open() if path not in self.data: raise kazoo.exceptions.NoNodeError() - self.data[path] = value + self.data[path] = (value, DummyZnodeStat()) def get_data_deserialized(self, drop_empty=False) -> dict: """Dump whole db as a dict, but deserialize values""" def deserialize(b: bytes): - if b[:2] == b'{"' and b[-1:] == b"}": + # simple JSON format sniffing + if (b[:1], b[-1:]) in {(b"{", b"}"), (b"[", b"]")}: return json.loads(b.decode("utf8")) else: return b.decode("utf8") - return { - k: deserialize(v) - for (k, v) in self.data.items() - if v or not drop_empty - } + return {k: deserialize(v) for (k, (v, stats)) in self.data.items() if v or not drop_empty} def approx_now(abs=10): diff --git a/tests/background/test_prime_caches.py b/tests/background/test_prime_caches.py index dc945d8a..68de568b 100644 --- a/tests/background/test_prime_caches.py +++ b/tests/background/test_prime_caches.py @@ -1,4 +1,38 @@ -from openeo_aggregator.background.prime_caches import AttrStatsProxy, prime_caches +import json +import textwrap +from pathlib import Path +from typing import Any + +import pytest +from openeo_driver.testing import DictSubSet + +from openeo_aggregator.background.prime_caches import AttrStatsProxy, main, prime_caches +from openeo_aggregator.config import AggregatorConfig +from openeo_aggregator.testing import DummyKazooClient + +FILE_FORMATS_JUST_GEOTIFF = { + "input": {"GTiff": {"gis_data_types": ["raster"], "parameters": {}, "title": "GeoTiff"}}, + "output": {"GTiff": {"gis_data_types": ["raster"], "parameters": {}, "title": "GeoTiff"}}, +} + + +@pytest.fixture +def config(backend1, backend2, backend1_id, backend2_id, zk_client) -> AggregatorConfig: + conf = AggregatorConfig() + conf.aggregator_backends = { + backend1_id: backend1, + backend2_id: backend2, + } + conf.kazoo_client_factory = lambda **kwargs: zk_client + conf.zookeeper_prefix = "/oa/" + conf.memoizer = { + "type": "zookeeper", + "config": { + "zk_hosts": "localhost:2181", + "default_ttl": 24 * 60 * 60, + }, + } + return conf class TestAttrStatsProxy: @@ -18,16 +52,11 @@ def meh(self, x): assert foo.stats == {"bar": 1} -def test_prime_caches_basic(config, backend1, backend2, requests_mock, mbldr, caplog): +def test_prime_caches_basic(config, backend1, backend2, requests_mock, mbldr, caplog, zk_client): """Just check that bare basics of `prime_caches` work.""" - # TODO: check that (zookeeper) caches are actually updated/written. - just_geotiff = { - "input": {"GTiff": {"gis_data_types": ["raster"], "parameters": {}, "title": "GeoTiff"}}, - "output": {"GTiff": {"gis_data_types": ["raster"], "parameters": {}, "title": "GeoTiff"}}, - } mocks = [ - requests_mock.get(backend1 + "/file_formats", json=just_geotiff), - requests_mock.get(backend2 + "/file_formats", json=just_geotiff), + requests_mock.get(backend1 + "/file_formats", json=FILE_FORMATS_JUST_GEOTIFF), + requests_mock.get(backend2 + "/file_formats", json=FILE_FORMATS_JUST_GEOTIFF), requests_mock.get(backend1 + "/collections", json=mbldr.collections("S2")), requests_mock.get(backend1 + "/collections/S2", json=mbldr.collection("S2")), requests_mock.get(backend2 + "/collections", json=mbldr.collections("S2")), @@ -37,3 +66,107 @@ def test_prime_caches_basic(config, backend1, backend2, requests_mock, mbldr, ca prime_caches(config=config) assert all([m.call_count == 1 for m in mocks]) + + assert zk_client.get_data_deserialized() == DictSubSet( + { + "/oa/cache/CollectionCatalog/all": [ + [DictSubSet({"id": "S2"})], + DictSubSet({"_jsonserde": DictSubSet()}), + ], + "/oa/cache/CollectionCatalog/collection/S2": DictSubSet({"id": "S2"}), + "/oa/cache/Processing/all/1.1.0": DictSubSet({"load_collection": DictSubSet({"id": "load_collection"})}), + "/oa/cache/general/file_formats": FILE_FORMATS_JUST_GEOTIFF, + "/oa/cache/mbcon/api_versions": ["1.1.0"], + "/oa/cache/SecondaryServices/service_types": { + "service_types": {}, + "supporting_backend_ids": [], + }, + } + ) + + +def _is_primitive_construct(data: Any) -> bool: + """Consists only of Python primitives int, float, dict, list, str, ...?""" + if isinstance(data, dict): + return all(_is_primitive_construct(k) and _is_primitive_construct(v) for k, v in data.items()) + elif isinstance(data, (list, tuple, set)): + return all(_is_primitive_construct(x) for x in data) + else: + return isinstance(data, (bool, int, float, str, bytes)) or data is None + + +def _get_primitive_config(config: AggregatorConfig) -> dict: + return {k: v for k, v in config.items() if _is_primitive_construct(v)} + + +def _build_config_file(config: AggregatorConfig, path: Path): + """Best effort AggregatorConfig to config file conversion.""" + path.write_text( + textwrap.dedent( + f""" + from openeo_aggregator.config import AggregatorConfig + config = AggregatorConfig({_get_primitive_config(config)}) + """ + ) + ) + + +def test_prime_caches_main_basic(backend1, backend2, requests_mock, mbldr, caplog, tmp_path, backend1_id, backend2_id): + """Just check that bare basics of `prime_caches` main work.""" + mocks = [ + requests_mock.get(backend1 + "/file_formats", json=FILE_FORMATS_JUST_GEOTIFF), + requests_mock.get(backend2 + "/file_formats", json=FILE_FORMATS_JUST_GEOTIFF), + requests_mock.get(backend1 + "/collections", json=mbldr.collections("S2")), + requests_mock.get(backend1 + "/collections/S2", json=mbldr.collection("S2")), + requests_mock.get(backend2 + "/collections", json=mbldr.collections("S2")), + requests_mock.get(backend2 + "/collections/S2", json=mbldr.collection("S2")), + ] + + # Construct config file + config = AggregatorConfig() + config.aggregator_backends = { + backend1_id: backend1, + backend2_id: backend2, + } + config_file = tmp_path / "conf.py" + _build_config_file(config, config_file) + + main(args=["--config", str(config_file)]) + + assert all([m.call_count == 1 for m in mocks]) + + +def test_prime_caches_main_logging(backend1, backend2, mbldr, caplog, tmp_path, backend1_id, backend2_id, pytester): + """Run main in subprocess (so no request mocks, and probably a lot of failures) to see if logging setup works.""" + + config = AggregatorConfig() + config.aggregator_backends = { + backend1_id: backend1, + backend2_id: backend2, + } + config_file = tmp_path / "conf.py" + _build_config_file(config, config_file) + + main(args=["--config", str(config_file)]) + log_file = tmp_path / "agg.log" + + result = pytester.run( + "openeo-aggregator-prime-caches", + "--config", + str(config_file), + "--log-handler", + "rotating_file_json", + "--log-file", + str(log_file), + ) + + assert result.outlines == [] + assert result.errlines == [] + + with log_file.open("r") as f: + log_entries = [json.loads(line) for line in f] + + assert any(log["message"] == f"Loading config from Python file {config_file}" for log in log_entries) + assert any(log["message"].startswith("Prime caches: start") for log in log_entries) + assert any(log["message"].startswith("Prime caches: fail") for log in log_entries) + assert any("cache miss" in log["message"] for log in log_entries) diff --git a/tests/conftest.py b/tests/conftest.py index 11ff84cc..ce2dda76 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,9 @@ from openeo_aggregator.config import AggregatorConfig from openeo_aggregator.testing import DummyKazooClient, MetadataBuilder +pytest_plugins = "pytester" + + _DEFAULT_PROCESSES = [ "load_collection", "load_result", diff --git a/tests/test_caching.py b/tests/test_caching.py index 7c6eb88b..84778ac3 100644 --- a/tests/test_caching.py +++ b/tests/test_caching.py @@ -22,7 +22,7 @@ memoizer_from_config, ) from openeo_aggregator.config import AggregatorConfig -from openeo_aggregator.testing import clock_mock +from openeo_aggregator.testing import DummyZnodeStat, clock_mock from openeo_aggregator.utils import Clock @@ -410,14 +410,13 @@ def test_failing_callback(self, caplog): assert dm2.get_or_call(key="count", callback=callback) == 1003 -DummyZnodeStat = collections.namedtuple("DummyZnodeStat", ["last_modified"]) - class TestZkMemoizer(_TestMemoizer): @pytest.fixture def zk_client(self) -> mock.Mock: """Simple ad-hoc ZooKeeper client fixture using a dictionary for storage.""" + # TODO: unify with DummyKazooClient? zk_client = mock.Mock() db = {} zk_client.connected = False From f4fd5ed0bb013fd28443c809874758632df03c96 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 27 Sep 2023 16:08:02 +0200 Subject: [PATCH 3/3] Issue #74 bump to 0.10.6 --- src/openeo_aggregator/about.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/openeo_aggregator/about.py b/src/openeo_aggregator/about.py index 669cf4e7..b53d6017 100644 --- a/src/openeo_aggregator/about.py +++ b/src/openeo_aggregator/about.py @@ -1 +1 @@ -__version__ = "0.10.5a1" +__version__ = "0.10.6a1"