Skip to content

Commit

Permalink
Merge branch 'issue74-cache-priming'
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 27, 2023
2 parents fbf7e65 + f4fd5ed commit 3b220b9
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 45 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,5 @@ temp*

# pytest basetemp root (e.g. for tmp_path fixture).
pytest-tmp

openeo_python.log*
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"requests",
"attrs",
"openeo>=0.17.0",
"openeo_driver>=0.67.0.dev",
"openeo_driver>=0.68.0.dev",
"flask~=2.0",
"gunicorn~=20.0",
"python-json-logger>=2.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.10.5a1"
__version__ = "0.10.6a1"
10 changes: 8 additions & 2 deletions src/openeo_aggregator/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
"""
import logging
import os
from typing import Any
from pathlib import Path
from typing import Any, List, Optional, Union

import flask
import openeo_driver.views
from openeo_driver.config.load import ConfigGetter
from openeo_driver.util.logging import (
LOG_HANDLER_STDERR_JSON,
LOGGING_CONTEXT_FLASK,
get_logging_config,
setup_logging,
Expand Down Expand Up @@ -74,10 +76,13 @@ def agg_backends():


def get_aggregator_logging_config(
*,
context: str = LOGGING_CONTEXT_FLASK,
handler_default_level: str = "DEBUG",
root_handlers: Optional[List[str]] = None,
log_file: Optional[Union[str, Path]] = None,
) -> dict:
root_handlers = ["stderr_json"]
root_handlers = root_handlers or [LOG_HANDLER_STDERR_JSON]
if smart_bool(os.environ.get("OPENEO_AGGREGATOR_SIMPLE_LOGGING")):
root_handlers = None

Expand All @@ -94,6 +99,7 @@ def get_aggregator_logging_config(
"kazoo": {"level": "WARN"},
},
context=context,
log_file=log_file,
)


Expand Down
111 changes: 94 additions & 17 deletions src/openeo_aggregator/background/prime_caches.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
import argparse
import contextlib
import functools
import logging
from pathlib import Path
from typing import Any, Optional, Sequence, Union
from typing import Any, List, Optional, Sequence, Union

from kazoo.client import KazooClient
from openeo.util import TimingLogger
from openeo_driver.util.logging import just_log_exceptions, setup_logging
from openeo_driver.util.logging import (
LOG_HANDLER_FILE_JSON,
LOG_HANDLER_ROTATING_FILE_JSON,
LOG_HANDLER_STDERR_BASIC,
LOG_HANDLER_STDERR_JSON,
just_log_exceptions,
setup_logging,
)

from openeo_aggregator.app import get_aggregator_logging_config
from openeo_aggregator.backend import AggregatorBackendImplementation
from openeo_aggregator.config import AggregatorConfig, get_config
from openeo_aggregator.config import (
OPENEO_AGGREGATOR_CONFIG,
AggregatorConfig,
get_config,
)
from openeo_aggregator.connection import MultiBackendConnection

_log = logging.getLogger(__name__)
Expand All @@ -34,13 +48,64 @@ def __getattr__(self, name):
return getattr(self.target, name)


def main(config: Union[str, Path, AggregatorConfig, None] = None):
"""CLI entrypoint"""
setup_logging(config=get_aggregator_logging_config(context="background-task"))
prime_caches(config=config)
FAIL_MODE_FAILFAST = "failfast"
FAIL_MODE_WARN = "warn"


def prime_caches(config: Union[str, Path, AggregatorConfig, None] = None):
def main(args: Optional[List[str]] = None):
"""CLI entrypoint"""
cli = argparse.ArgumentParser()
cli.add_argument(
"--config",
default=None,
help=f"Optional: aggregator config to load (instead of env var {OPENEO_AGGREGATOR_CONFIG} based resolution).",
)
cli.add_argument(
"--require-zookeeper-writes",
action="store_true",
help="Fail if no ZooKeeper writes were done",
)
cli.add_argument(
"--fail-mode",
choices=[FAIL_MODE_FAILFAST, FAIL_MODE_WARN],
default=FAIL_MODE_FAILFAST,
help="Fail mode: fail fast or warn (just log failures but keep going if possible).",
)
cli.add_argument(
"--log-handler",
dest="log_handlers",
choices=[
LOG_HANDLER_STDERR_BASIC,
LOG_HANDLER_STDERR_JSON,
LOG_HANDLER_FILE_JSON,
LOG_HANDLER_ROTATING_FILE_JSON,
],
action="append",
help="Log handlers.",
)
cli.add_argument(
"--log-file",
help="Log file to use for (rotating) file log handlers.",
)

arguments = cli.parse_args(args=args)

logging_config = get_aggregator_logging_config(
context="background-task", root_handlers=arguments.log_handlers, log_file=arguments.log_file
)
setup_logging(config=logging_config)
prime_caches(
config=arguments.config,
require_zookeeper_writes=arguments.require_zookeeper_writes,
fail_mode=arguments.fail_mode,
)


def prime_caches(
config: Union[str, Path, AggregatorConfig, None] = None,
require_zookeeper_writes: bool = False,
fail_mode: str = FAIL_MODE_FAILFAST,
):
with TimingLogger(title="Prime caches", logger=_log):
config: AggregatorConfig = get_config(config)
_log.info(f"Using config: {config.get('config_source')=}")
Expand All @@ -53,36 +118,48 @@ def prime_caches(config: Union[str, Path, AggregatorConfig, None] = None):
backends = MultiBackendConnection.from_config(config)
backend_implementation = AggregatorBackendImplementation(backends=backends, config=config)

if fail_mode == FAIL_MODE_FAILFAST:
# Do not intercept any exceptions.
fail_handler = contextlib.nullcontext
elif fail_mode == FAIL_MODE_WARN:
fail_handler = functools.partial(just_log_exceptions, log=_log)
else:
raise ValueError(fail_mode)

with TimingLogger(title="General capabilities", logger=_log):
with just_log_exceptions(log=_log):
with fail_handler():
backends.get_api_versions()
with just_log_exceptions(log=_log):
with fail_handler():
backend_implementation.file_formats()
with just_log_exceptions(log=_log):
with fail_handler():
backend_implementation.secondary_services.service_types()

with just_log_exceptions(log=_log):
with fail_handler():
with TimingLogger(title="Get full collection listing", logger=_log):
collections_metadata = backend_implementation.catalog.get_all_metadata()

with TimingLogger(title="Get per collection metadata", logger=_log):
collection_ids = [m["id"] for m in collections_metadata]
for c, collection_id in enumerate(collection_ids):
_log.info(f"get collection {c+1}/{len(collection_ids)} {collection_id}")
with just_log_exceptions(log=_log):
with fail_handler():
backend_implementation.catalog.get_collection_metadata(collection_id=collection_id)

with TimingLogger(title="Get merged processes", logger=_log):
with just_log_exceptions(log=_log):
with fail_handler():
backend_implementation.processing.get_merged_process_metadata()

_log.info(f"Zookeeper stats: {kazoo_stats}")
zk_writes = sum(kazoo_stats.get(k, 0) for k in ["create", "set"])
_log.info(f"ZooKeeper stats: {kazoo_stats=} {zk_writes=}")
if require_zookeeper_writes and zk_writes == 0:
raise RuntimeError("No Zookeeper writes.")


def _patch_config_for_kazoo_client_stats(config: AggregatorConfig, stats: dict):
orig_kazoo_client_factory = config.kazoo_client_factory or KazooClient
def kazoo_client_factory(**kwargs):
_log.info(f"AttrStatsProxy-wrapping KazooClient with {kwargs=}")
zk = KazooClient(**kwargs)
zk = orig_kazoo_client_factory(**kwargs)
return AttrStatsProxy(
target=zk,
to_track=["start", "stop", "create", "get", "set"],
Expand All @@ -95,4 +172,4 @@ def kazoo_client_factory(**kwargs):


if __name__ == "__main__":
main(config=Path(__file__).parent.parent.parent.parent / "conf/aggregator.dev.py")
main()
32 changes: 20 additions & 12 deletions src/openeo_aggregator/testing.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import collections
import dataclasses
import datetime
import itertools
import json
import pathlib
import time
from typing import Any, List, Optional, Union
from typing import Any, Dict, List, Optional, Tuple, Union
from unittest import mock

import kazoo
Expand All @@ -15,23 +17,32 @@
from openeo_aggregator.utils import Clock


@dataclasses.dataclass
class DummyZnodeStat:
last_modified: float = dataclasses.field(default_factory=Clock.time)


class DummyKazooClient:
"""
Stand-in object for KazooClient for testing.
"""

def __init__(self):
self.state = "closed"
self.data = {}
self.data: Dict[str, Tuple[bytes, DummyZnodeStat]] = {}

def start(self):
def start(self, timeout: float = 15):
assert self.state == "closed"
self.state = "open"

def stop(self):
assert self.state == "open"
self.state = "closed"

@property
def connected(self):
return self.state == "open"

def _assert_open(self):
if not self.state == "open":
raise kazoo.exceptions.ConnectionClosedError("Connection has been closed")
Expand All @@ -46,7 +57,7 @@ def create(self, path: str, value, makepath: bool = False):
self.create(parent, b"", makepath=makepath)
else:
raise kazoo.exceptions.NoNodeError
self.data[path] = value
self.data[path] = value, DummyZnodeStat()

def exists(self, path):
self._assert_open()
Expand All @@ -56,7 +67,7 @@ def get(self, path):
self._assert_open()
if path not in self.data:
raise kazoo.exceptions.NoNodeError()
return (self.data[path], None)
return self.data[path]

def get_children(self, path):
self._assert_open()
Expand All @@ -69,22 +80,19 @@ def set(self, path, value, version=-1):
self._assert_open()
if path not in self.data:
raise kazoo.exceptions.NoNodeError()
self.data[path] = value
self.data[path] = (value, DummyZnodeStat())

def get_data_deserialized(self, drop_empty=False) -> dict:
"""Dump whole db as a dict, but deserialize values"""

def deserialize(b: bytes):
if b[:2] == b'{"' and b[-1:] == b"}":
# simple JSON format sniffing
if (b[:1], b[-1:]) in {(b"{", b"}"), (b"[", b"]")}:
return json.loads(b.decode("utf8"))
else:
return b.decode("utf8")

return {
k: deserialize(v)
for (k, v) in self.data.items()
if v or not drop_empty
}
return {k: deserialize(v) for (k, (v, stats)) in self.data.items() if v or not drop_empty}


def approx_now(abs=10):
Expand Down
Loading

0 comments on commit 3b220b9

Please sign in to comment.