Skip to content

Commit

Permalink
Add a first version of system_status module (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdusek authored Jan 29, 2024
1 parent 324b93f commit d3bc9f0
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 3 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ requires-python = ">=3.8"
# https://github.com/apify/apify-sdk-python/pull/154
dependencies = [
"colorama >= 0.4.6",
"more_itertools >= 10.2.0",
"typing-extensions >= 4.1.0",
]

Expand Down
1 change: 0 additions & 1 deletion src/crawlee/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from .main import BasicCrawler
2 changes: 2 additions & 0 deletions src/crawlee/autoscaling/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .snapshotter import Snapshotter
from .system_status import SystemStatus
Empty file.
131 changes: 131 additions & 0 deletions src/crawlee/autoscaling/snapshotter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from __future__ import annotations

from dataclasses import dataclass
from logging import getLogger
from typing import TYPE_CHECKING, Sequence, Union

if TYPE_CHECKING:
from datetime import datetime

logger = getLogger(__name__)


@dataclass
class MemorySnapshot:
"""A snapshot of memory usage."""

created_at: datetime
is_overloaded: bool
used_bytes: int | None


@dataclass
class CpuSnapshot:
"""A snapshot of CPU usage."""

created_at: datetime
is_overloaded: bool
used_ratio: float
ticks: dict | None = None


@dataclass
class EventLoopSnapshot:
"""A snapshot of event loop usage."""

created_at: datetime
is_overloaded: bool
exceeded_millis: float


@dataclass
class ClientSnapshot:
"""A snapshot of client usage."""

created_at: datetime
is_overloaded: bool
rate_limit_error_count: int


Snapshot = Union[MemorySnapshot, CpuSnapshot, EventLoopSnapshot, ClientSnapshot]


class Snapshotter:
"""Creates snapshots of system resources at given intervals.
Creates snapshots of system resources at given intervals and marks the resource as either overloaded or not during
the last interval. Keeps a history of the snapshots. It tracks the following resources: Memory, EventLoop, API
and CPU. The class is used by the `AutoscaledPool` class.
When running on the Apify platform, the CPU and memory statistics are provided by the platform, as collected from
the running Docker container. When running locally, `Snapshotter` makes its own statistics by querying the OS.
CPU becomes overloaded locally when its current use exceeds the `maxUsedCpuRatio` option or when Apify platform
marks it as overloaded.
Memory becomes overloaded if its current use exceeds the `max_used_memory_ratio` option. It's computed using
the total memory available to the container when running on the Apify platform and a quarter of total system
memory when running locally. Max total memory when running locally may be overridden by using
the `CRAWLEE_MEMORY_MBYTES` environment variable.
Event loop becomes overloaded if it slows down by more than the `max_blocked_millis` option.
Client becomes overloaded when rate limit errors (429 - Too Many Requests) exceeds the `max_client_errors` option,
typically received from the request queue, exceed the set limit within the set interval.
"""

def get_memory_sample(
self: Snapshotter,
sample_duration_millis: float | None = None,
) -> Sequence[MemorySnapshot]:
"""Returns a sample of the latest memory snapshots.
Args:
sample_duration_millis: The size of the sample in milliseconds. Defaults to None.
If omitted, it returns a full snapshot history.
Returns:
A sample of memory snapshots.
"""

def get_event_loop_sample(
self: Snapshotter,
sample_duration_millis: float | None = None,
) -> Sequence[EventLoopSnapshot]:
"""Returns a sample of the latest event loop snapshots.
Args:
sample_duration_millis: The size of the sample in milliseconds. Defaults to None.
If omitted, it returns a full snapshot history.
Returns:
A sample of event loop snapshots.
"""

def get_cpu_sample(
self: Snapshotter,
sample_duration_millis: float | None = None,
) -> Sequence[CpuSnapshot]:
"""Returns a sample of the latest CPU snapshots.
Args:
sample_duration_millis: The size of the sample in milliseconds. Defaults to None.
If omitted, it returns a full snapshot history.
Returns:
A sample of CPU snapshots.
"""

def get_client_sample(
self: Snapshotter,
sample_duration_millis: float | None = None,
) -> Sequence[ClientSnapshot]:
"""Returns a sample of the latest client snapshots.
Args:
sample_duration_millis: The size of the sample in milliseconds. Defaults to None.
If omitted, it returns a full snapshot history.
Returns:
A sample of client snapshots.
"""
240 changes: 240 additions & 0 deletions src/crawlee/autoscaling/system_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
from __future__ import annotations

from dataclasses import dataclass
from logging import getLogger
from typing import TYPE_CHECKING, Sequence

from more_itertools import pairwise

from crawlee.utils import weighted_avg

if TYPE_CHECKING:
from datetime import datetime

from crawlee.autoscaling.snapshotter import Snapshot, Snapshotter

logger = getLogger(__name__)


@dataclass
class LoadRatioInfo:
"""Represents the load ratio of a resource."""

is_overloaded: bool
limit_ratio: float
actual_ratio: float


@dataclass
class SystemInfo:
"""Represents the current status of the system."""

is_system_idle: bool # Indicates whether the system is currently idle or overloaded
mem_info: LoadRatioInfo
event_loop_info: LoadRatioInfo
cpu_info: LoadRatioInfo
client_info: LoadRatioInfo
mem_current_bytes: int | None = None # Platform only property
cpu_current_usage: int | None = None # Platform only property
is_cpu_overloaded: bool | None = None # Platform only property
created_at: datetime | None = None


@dataclass
class FinalStatistics:
"""Represents final statistics."""

requests_finished: int
requests_failed: int
retry_histogram: list[int]
request_avg_failed_duration_millis: float
request_avg_finished_duration_millis: float
requests_finished_per_minute: float
requests_failed_per_minute: float
request_total_duration_millis: float
requests_total: int
crawler_runtime_millis: float


class SystemStatus:
"""Provides a simple interface to reading system status from a `Snapshotter` instance.
It only exposes two functions `SystemStatus.get_current_status` and `SystemStatus.get_historical_status`.
The system status is calculated using a weighted average of overloaded messages in the snapshots, with the weights
being the time intervals between the snapshots. Each resource is calculated separately, and the system
is overloaded whenever at least one resource is overloaded. The class is used by the `AutoscaledPool` class.
`SystemStatus.get_current_status` returns a boolean that represents the current status of the system. The length
of the current timeframe in seconds is configurable by the `currentHistorySecs` option and represents the max age
of snapshots to be considered for the calculation.
`SystemStatus.get_historical_status` returns a boolean that represents the long-term status of the system. It
considers the full snapshot history available in the `Snapshotter` instance.
"""

def __init__(
self: SystemStatus,
snapshotter: Snapshotter,
current_history_secs: int = 5,
max_memory_overloaded_ratio: float = 0.2,
max_event_loop_overloaded_ratio: float = 0.6,
max_cpu_overloaded_ratio: float = 0.4,
max_client_overloaded_ratio: float = 0.3,
) -> None:
"""Create a new instance.
Args:
snapshotter: The `Snapshotter` instance to be queried for `SystemStatus`.
current_history_secs: Defines max age of snapshots used in the `SystemStatus.get_current_status`
measurement. Defaults to 5.
max_memory_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a memory sample.
If the sample exceeds this ratio, the system will be overloaded. Defaults to 0.2.
max_event_loop_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in an event loop sample.
If the sample exceeds this ratio, the system will be overloaded. Defaults to 0.6.
max_cpu_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a CPU sample. If the sample
exceeds this ratio, the system will be overloaded. Defaults to 0.4.
max_client_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a Client sample.
If the sample exceeds this ratio, the system will be overloaded. Defaults to 0.3.
"""
self.snapshotter = snapshotter
self.current_history_secs = current_history_secs
self.max_memory_overloaded_ratio = max_memory_overloaded_ratio
self.max_event_loop_overloaded_ratio = max_event_loop_overloaded_ratio
self.max_cpu_overloaded_ratio = max_cpu_overloaded_ratio
self.max_client_overloaded_ratio = max_client_overloaded_ratio

def get_current_status(self: SystemStatus) -> SystemInfo:
"""Get the current system status.
Returns a `SystemInfo` object where the `is_system_idle` property is `False` if the system has been overloaded
in the last `current_history_secs` seconds, and `True` otherwise.
Returns:
An object representing the current system status.
"""
return self._is_system_idle(self.current_history_secs)

def get_historical_status(self: SystemStatus) -> SystemInfo:
"""Get the historical system status.
Returns a `SystemInfo` where the `is_system_idle` property is set to `False` if the system has been overloaded
in the full history of the `Snapshotter` (which is configurable in the `Snapshotter`, and `True` otherwise.
Returns:
An object representing the historical system status.
"""
return self._is_system_idle()

def _is_system_idle(self: SystemStatus, sample_duration_millis: int | None = None) -> SystemInfo:
"""Determine if the system is currently idle or overloaded.
Args:
sample_duration_millis: The duration (in milliseconds) within which to analyze system status.
Returns:
An object representing the system status with an `is_system_idle` property set to `True` if the system
has not been overloaded within the specified time duration, and `False` otherwise.
"""
mem_info = self._is_memory_overloaded(sample_duration_millis)
event_loop_info = self._is_event_loop_overloaded(sample_duration_millis)
cpu_info = self._is_cpu_overloaded(sample_duration_millis)
client_info = self._is_client_overloaded(sample_duration_millis)

return SystemInfo(
is_system_idle=(
not mem_info.is_overloaded
and not event_loop_info.is_overloaded
and not cpu_info.is_overloaded
and not client_info.is_overloaded
),
mem_info=mem_info,
event_loop_info=event_loop_info,
cpu_info=cpu_info,
client_info=client_info,
)

def _is_memory_overloaded(self: SystemStatus, sample_duration_millis: int | None = None) -> LoadRatioInfo:
"""Determine if memory has been overloaded within a specified time duration.
Args:
sample_duration_millis: The duration (in milliseconds) within which to analyze memory snapshots.
Returns:
An object with an `is_overloaded` property set to `True` if memory has been overloaded within the specified
time duration. Otherwise, `is_overloaded` is set to `False`.
"""
sample = self.snapshotter.get_memory_sample(sample_duration_millis)
return self._is_sample_overloaded(sample, self.max_memory_overloaded_ratio)

def _is_event_loop_overloaded(self: SystemStatus, sample_duration_millis: int | None = None) -> LoadRatioInfo:
"""Determine if the event loop has been overloaded within a specified time duration.
Args:
sample_duration_millis: The duration (in milliseconds) within which to analyze event loop snapshots.
Returns:
An object with an `is_overloaded` property set to `True` if the event loop has been overloaded within
the specified time duration. Otherwise, `is_overloaded` is set to `False`.
"""
sample = self.snapshotter.get_event_loop_sample(sample_duration_millis)
return self._is_sample_overloaded(sample, self.max_event_loop_overloaded_ratio)

def _is_cpu_overloaded(self: SystemStatus, sample_duration_millis: int | None = None) -> LoadRatioInfo:
"""Determine if the CPU has been overloaded within a specified time duration.
Args:
sample_duration_millis: The duration (in milliseconds) within which to analyze CPU snapshots.
Returns:
An object with an `is_overloaded` property set to `True` if the CPU has been overloaded within
the specified time duration. Otherwise, `is_overloaded` is set to `False`.
"""
sample = self.snapshotter.get_cpu_sample(sample_duration_millis)
return self._is_sample_overloaded(sample, self.max_cpu_overloaded_ratio)

def _is_client_overloaded(self: SystemStatus, sample_duration_millis: int | None = None) -> LoadRatioInfo:
"""Determine if the client has been overloaded within a specified time duration.
Args:
sample_duration_millis: The duration (in milliseconds) within which to analyze client snapshots.
Returns:
An object with an `is_overloaded` property set to `True` if the client has been overloaded within
the specified time duration. Otherwise, `is_overloaded` is set to `False`.
"""
sample = self.snapshotter.get_client_sample(sample_duration_millis)
return self._is_sample_overloaded(sample, self.max_client_overloaded_ratio)

def _is_sample_overloaded(self: SystemStatus, sample: Sequence[Snapshot], ratio: float) -> LoadRatioInfo:
"""Determine if a sample of snapshot data is overloaded based on a specified ratio.
Args:
sample: A sequence of snapshot data to analyze.
ratio: The ratio threshold to consider the sample as overloaded.
Returns:
An object with an `is_overloaded` property set to `True` if the sample is considered overloaded based
on the specified ratio. Otherwise, `is_overloaded` is set to `False`.
"""
if not sample:
return LoadRatioInfo(is_overloaded=False, limit_ratio=ratio, actual_ratio=0)

weights, values = [], []

for previous, current in pairwise(sample):
weight = (current.created_at - previous.created_at).total_seconds() or 0.001 # Avoid zero
weights.append(weight)
values.append(float(current.is_overloaded))

w_avg = values[0] if len(sample) == 1 else weighted_avg(values, weights)

return LoadRatioInfo(
is_overloaded=w_avg > ratio,
limit_ratio=ratio,
actual_ratio=round(w_avg, 3),
)
2 changes: 0 additions & 2 deletions src/crawlee/main.py

This file was deleted.

Loading

0 comments on commit d3bc9f0

Please sign in to comment.