From d3bc9f078c23db473706d5ef7c39fa65b477c902 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Mon, 29 Jan 2024 08:44:58 +0100 Subject: [PATCH] Add a first version of `system_status` module (#22) --- pyproject.toml | 1 + src/crawlee/__init__.py | 1 - src/crawlee/autoscaling/__init__.py | 2 + src/crawlee/autoscaling/py.typed | 0 src/crawlee/autoscaling/snapshotter.py | 131 +++++++++++++ src/crawlee/autoscaling/system_status.py | 240 +++++++++++++++++++++++ src/crawlee/main.py | 2 - src/crawlee/utils.py | 23 +++ 8 files changed, 397 insertions(+), 3 deletions(-) create mode 100644 src/crawlee/autoscaling/__init__.py create mode 100644 src/crawlee/autoscaling/py.typed create mode 100644 src/crawlee/autoscaling/snapshotter.py create mode 100644 src/crawlee/autoscaling/system_status.py delete mode 100644 src/crawlee/main.py create mode 100644 src/crawlee/utils.py diff --git a/pyproject.toml b/pyproject.toml index 08a4e6d73..70206b8d3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ requires-python = ">=3.8" # https://github.com/apify/apify-sdk-python/pull/154 dependencies = [ "colorama >= 0.4.6", + "more_itertools >= 10.2.0", "typing-extensions >= 4.1.0", ] diff --git a/src/crawlee/__init__.py b/src/crawlee/__init__.py index f4b384961..e69de29bb 100644 --- a/src/crawlee/__init__.py +++ b/src/crawlee/__init__.py @@ -1 +0,0 @@ -from .main import BasicCrawler diff --git a/src/crawlee/autoscaling/__init__.py b/src/crawlee/autoscaling/__init__.py new file mode 100644 index 000000000..47496851a --- /dev/null +++ b/src/crawlee/autoscaling/__init__.py @@ -0,0 +1,2 @@ +from .snapshotter import Snapshotter +from .system_status import SystemStatus diff --git a/src/crawlee/autoscaling/py.typed b/src/crawlee/autoscaling/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/src/crawlee/autoscaling/snapshotter.py b/src/crawlee/autoscaling/snapshotter.py new file mode 100644 index 000000000..f70b19023 --- /dev/null +++ b/src/crawlee/autoscaling/snapshotter.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +from dataclasses import dataclass +from logging import getLogger +from typing import TYPE_CHECKING, Sequence, Union + +if TYPE_CHECKING: + from datetime import datetime + +logger = getLogger(__name__) + + +@dataclass +class MemorySnapshot: + """A snapshot of memory usage.""" + + created_at: datetime + is_overloaded: bool + used_bytes: int | None + + +@dataclass +class CpuSnapshot: + """A snapshot of CPU usage.""" + + created_at: datetime + is_overloaded: bool + used_ratio: float + ticks: dict | None = None + + +@dataclass +class EventLoopSnapshot: + """A snapshot of event loop usage.""" + + created_at: datetime + is_overloaded: bool + exceeded_millis: float + + +@dataclass +class ClientSnapshot: + """A snapshot of client usage.""" + + created_at: datetime + is_overloaded: bool + rate_limit_error_count: int + + +Snapshot = Union[MemorySnapshot, CpuSnapshot, EventLoopSnapshot, ClientSnapshot] + + +class Snapshotter: + """Creates snapshots of system resources at given intervals. + + Creates snapshots of system resources at given intervals and marks the resource as either overloaded or not during + the last interval. Keeps a history of the snapshots. It tracks the following resources: Memory, EventLoop, API + and CPU. The class is used by the `AutoscaledPool` class. + + When running on the Apify platform, the CPU and memory statistics are provided by the platform, as collected from + the running Docker container. When running locally, `Snapshotter` makes its own statistics by querying the OS. + + CPU becomes overloaded locally when its current use exceeds the `maxUsedCpuRatio` option or when Apify platform + marks it as overloaded. + + Memory becomes overloaded if its current use exceeds the `max_used_memory_ratio` option. It's computed using + the total memory available to the container when running on the Apify platform and a quarter of total system + memory when running locally. Max total memory when running locally may be overridden by using + the `CRAWLEE_MEMORY_MBYTES` environment variable. + + Event loop becomes overloaded if it slows down by more than the `max_blocked_millis` option. + + Client becomes overloaded when rate limit errors (429 - Too Many Requests) exceeds the `max_client_errors` option, + typically received from the request queue, exceed the set limit within the set interval. + """ + + def get_memory_sample( + self: Snapshotter, + sample_duration_millis: float | None = None, + ) -> Sequence[MemorySnapshot]: + """Returns a sample of the latest memory snapshots. + + Args: + sample_duration_millis: The size of the sample in milliseconds. Defaults to None. + If omitted, it returns a full snapshot history. + + Returns: + A sample of memory snapshots. + """ + + def get_event_loop_sample( + self: Snapshotter, + sample_duration_millis: float | None = None, + ) -> Sequence[EventLoopSnapshot]: + """Returns a sample of the latest event loop snapshots. + + Args: + sample_duration_millis: The size of the sample in milliseconds. Defaults to None. + If omitted, it returns a full snapshot history. + + Returns: + A sample of event loop snapshots. + """ + + def get_cpu_sample( + self: Snapshotter, + sample_duration_millis: float | None = None, + ) -> Sequence[CpuSnapshot]: + """Returns a sample of the latest CPU snapshots. + + Args: + sample_duration_millis: The size of the sample in milliseconds. Defaults to None. + If omitted, it returns a full snapshot history. + + Returns: + A sample of CPU snapshots. + """ + + def get_client_sample( + self: Snapshotter, + sample_duration_millis: float | None = None, + ) -> Sequence[ClientSnapshot]: + """Returns a sample of the latest client snapshots. + + Args: + sample_duration_millis: The size of the sample in milliseconds. Defaults to None. + If omitted, it returns a full snapshot history. + + Returns: + A sample of client snapshots. + """ diff --git a/src/crawlee/autoscaling/system_status.py b/src/crawlee/autoscaling/system_status.py new file mode 100644 index 000000000..022b108cd --- /dev/null +++ b/src/crawlee/autoscaling/system_status.py @@ -0,0 +1,240 @@ +from __future__ import annotations + +from dataclasses import dataclass +from logging import getLogger +from typing import TYPE_CHECKING, Sequence + +from more_itertools import pairwise + +from crawlee.utils import weighted_avg + +if TYPE_CHECKING: + from datetime import datetime + + from crawlee.autoscaling.snapshotter import Snapshot, Snapshotter + +logger = getLogger(__name__) + + +@dataclass +class LoadRatioInfo: + """Represents the load ratio of a resource.""" + + is_overloaded: bool + limit_ratio: float + actual_ratio: float + + +@dataclass +class SystemInfo: + """Represents the current status of the system.""" + + is_system_idle: bool # Indicates whether the system is currently idle or overloaded + mem_info: LoadRatioInfo + event_loop_info: LoadRatioInfo + cpu_info: LoadRatioInfo + client_info: LoadRatioInfo + mem_current_bytes: int | None = None # Platform only property + cpu_current_usage: int | None = None # Platform only property + is_cpu_overloaded: bool | None = None # Platform only property + created_at: datetime | None = None + + +@dataclass +class FinalStatistics: + """Represents final statistics.""" + + requests_finished: int + requests_failed: int + retry_histogram: list[int] + request_avg_failed_duration_millis: float + request_avg_finished_duration_millis: float + requests_finished_per_minute: float + requests_failed_per_minute: float + request_total_duration_millis: float + requests_total: int + crawler_runtime_millis: float + + +class SystemStatus: + """Provides a simple interface to reading system status from a `Snapshotter` instance. + + It only exposes two functions `SystemStatus.get_current_status` and `SystemStatus.get_historical_status`. + The system status is calculated using a weighted average of overloaded messages in the snapshots, with the weights + being the time intervals between the snapshots. Each resource is calculated separately, and the system + is overloaded whenever at least one resource is overloaded. The class is used by the `AutoscaledPool` class. + + `SystemStatus.get_current_status` returns a boolean that represents the current status of the system. The length + of the current timeframe in seconds is configurable by the `currentHistorySecs` option and represents the max age + of snapshots to be considered for the calculation. + + `SystemStatus.get_historical_status` returns a boolean that represents the long-term status of the system. It + considers the full snapshot history available in the `Snapshotter` instance. + """ + + def __init__( + self: SystemStatus, + snapshotter: Snapshotter, + current_history_secs: int = 5, + max_memory_overloaded_ratio: float = 0.2, + max_event_loop_overloaded_ratio: float = 0.6, + max_cpu_overloaded_ratio: float = 0.4, + max_client_overloaded_ratio: float = 0.3, + ) -> None: + """Create a new instance. + + Args: + snapshotter: The `Snapshotter` instance to be queried for `SystemStatus`. + + current_history_secs: Defines max age of snapshots used in the `SystemStatus.get_current_status` + measurement. Defaults to 5. + + max_memory_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a memory sample. + If the sample exceeds this ratio, the system will be overloaded. Defaults to 0.2. + + max_event_loop_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in an event loop sample. + If the sample exceeds this ratio, the system will be overloaded. Defaults to 0.6. + + max_cpu_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a CPU sample. If the sample + exceeds this ratio, the system will be overloaded. Defaults to 0.4. + + max_client_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a Client sample. + If the sample exceeds this ratio, the system will be overloaded. Defaults to 0.3. + """ + self.snapshotter = snapshotter + self.current_history_secs = current_history_secs + self.max_memory_overloaded_ratio = max_memory_overloaded_ratio + self.max_event_loop_overloaded_ratio = max_event_loop_overloaded_ratio + self.max_cpu_overloaded_ratio = max_cpu_overloaded_ratio + self.max_client_overloaded_ratio = max_client_overloaded_ratio + + def get_current_status(self: SystemStatus) -> SystemInfo: + """Get the current system status. + + Returns a `SystemInfo` object where the `is_system_idle` property is `False` if the system has been overloaded + in the last `current_history_secs` seconds, and `True` otherwise. + + Returns: + An object representing the current system status. + """ + return self._is_system_idle(self.current_history_secs) + + def get_historical_status(self: SystemStatus) -> SystemInfo: + """Get the historical system status. + + Returns a `SystemInfo` where the `is_system_idle` property is set to `False` if the system has been overloaded + in the full history of the `Snapshotter` (which is configurable in the `Snapshotter`, and `True` otherwise. + + Returns: + An object representing the historical system status. + """ + return self._is_system_idle() + + def _is_system_idle(self: SystemStatus, sample_duration_millis: int | None = None) -> SystemInfo: + """Determine if the system is currently idle or overloaded. + + Args: + sample_duration_millis: The duration (in milliseconds) within which to analyze system status. + + Returns: + An object representing the system status with an `is_system_idle` property set to `True` if the system + has not been overloaded within the specified time duration, and `False` otherwise. + """ + mem_info = self._is_memory_overloaded(sample_duration_millis) + event_loop_info = self._is_event_loop_overloaded(sample_duration_millis) + cpu_info = self._is_cpu_overloaded(sample_duration_millis) + client_info = self._is_client_overloaded(sample_duration_millis) + + return SystemInfo( + is_system_idle=( + not mem_info.is_overloaded + and not event_loop_info.is_overloaded + and not cpu_info.is_overloaded + and not client_info.is_overloaded + ), + mem_info=mem_info, + event_loop_info=event_loop_info, + cpu_info=cpu_info, + client_info=client_info, + ) + + def _is_memory_overloaded(self: SystemStatus, sample_duration_millis: int | None = None) -> LoadRatioInfo: + """Determine if memory has been overloaded within a specified time duration. + + Args: + sample_duration_millis: The duration (in milliseconds) within which to analyze memory snapshots. + + Returns: + An object with an `is_overloaded` property set to `True` if memory has been overloaded within the specified + time duration. Otherwise, `is_overloaded` is set to `False`. + """ + sample = self.snapshotter.get_memory_sample(sample_duration_millis) + return self._is_sample_overloaded(sample, self.max_memory_overloaded_ratio) + + def _is_event_loop_overloaded(self: SystemStatus, sample_duration_millis: int | None = None) -> LoadRatioInfo: + """Determine if the event loop has been overloaded within a specified time duration. + + Args: + sample_duration_millis: The duration (in milliseconds) within which to analyze event loop snapshots. + + Returns: + An object with an `is_overloaded` property set to `True` if the event loop has been overloaded within + the specified time duration. Otherwise, `is_overloaded` is set to `False`. + """ + sample = self.snapshotter.get_event_loop_sample(sample_duration_millis) + return self._is_sample_overloaded(sample, self.max_event_loop_overloaded_ratio) + + def _is_cpu_overloaded(self: SystemStatus, sample_duration_millis: int | None = None) -> LoadRatioInfo: + """Determine if the CPU has been overloaded within a specified time duration. + + Args: + sample_duration_millis: The duration (in milliseconds) within which to analyze CPU snapshots. + + Returns: + An object with an `is_overloaded` property set to `True` if the CPU has been overloaded within + the specified time duration. Otherwise, `is_overloaded` is set to `False`. + """ + sample = self.snapshotter.get_cpu_sample(sample_duration_millis) + return self._is_sample_overloaded(sample, self.max_cpu_overloaded_ratio) + + def _is_client_overloaded(self: SystemStatus, sample_duration_millis: int | None = None) -> LoadRatioInfo: + """Determine if the client has been overloaded within a specified time duration. + + Args: + sample_duration_millis: The duration (in milliseconds) within which to analyze client snapshots. + + Returns: + An object with an `is_overloaded` property set to `True` if the client has been overloaded within + the specified time duration. Otherwise, `is_overloaded` is set to `False`. + """ + sample = self.snapshotter.get_client_sample(sample_duration_millis) + return self._is_sample_overloaded(sample, self.max_client_overloaded_ratio) + + def _is_sample_overloaded(self: SystemStatus, sample: Sequence[Snapshot], ratio: float) -> LoadRatioInfo: + """Determine if a sample of snapshot data is overloaded based on a specified ratio. + + Args: + sample: A sequence of snapshot data to analyze. + ratio: The ratio threshold to consider the sample as overloaded. + + Returns: + An object with an `is_overloaded` property set to `True` if the sample is considered overloaded based + on the specified ratio. Otherwise, `is_overloaded` is set to `False`. + """ + if not sample: + return LoadRatioInfo(is_overloaded=False, limit_ratio=ratio, actual_ratio=0) + + weights, values = [], [] + + for previous, current in pairwise(sample): + weight = (current.created_at - previous.created_at).total_seconds() or 0.001 # Avoid zero + weights.append(weight) + values.append(float(current.is_overloaded)) + + w_avg = values[0] if len(sample) == 1 else weighted_avg(values, weights) + + return LoadRatioInfo( + is_overloaded=w_avg > ratio, + limit_ratio=ratio, + actual_ratio=round(w_avg, 3), + ) diff --git a/src/crawlee/main.py b/src/crawlee/main.py deleted file mode 100644 index 72fbf7bbb..000000000 --- a/src/crawlee/main.py +++ /dev/null @@ -1,2 +0,0 @@ -class BasicCrawler: - """BasicCrawler class.""" diff --git a/src/crawlee/utils.py b/src/crawlee/utils.py new file mode 100644 index 000000000..ef845b2cb --- /dev/null +++ b/src/crawlee/utils.py @@ -0,0 +1,23 @@ +from __future__ import annotations + + +def weighted_avg(values: list[float], weights: list[float]) -> float: + """Computes a weighted average of an array of numbers, complemented by an array of weights. + + Args: + values: List of values. + weights: List of weights. + + Raises: + ValueError: If total weight is zero. + + Returns: + float: Weighted average. + """ + result = sum(value * weight for value, weight in zip(values, weights, strict=True)) + total_weight = sum(weights) + + if total_weight == 0: + raise ValueError('Total weight cannot be zero') + + return result / total_weight