Skip to content

Commit

Permalink
Merge pull request #2 from tpietruszka/more-flexible-billing
Browse files Browse the repository at this point in the history
Support different ways of "billing" resource usage
  • Loading branch information
tpietruszka authored Aug 17, 2023
2 parents 6871857 + b2ef469 commit 442ba31
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 26 deletions.
68 changes: 59 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,33 @@ those of Large Language Models (LLMs).
- handles timeouts (requests will not hang forever)
- raises an exception if the request fails (or the server returns an error / an "invalid" response)
- requests are independent from each other, do not rely on order, can be retried if failed
- the server verifies rate limits when it gets the request, records usage of resources upon
completion - right before returning the results (this is a pessimistic assumption).
- we want a standard, simple interface for the user - working the same way in a script and in a
notebook (+ most data scientists do not want to deal with asyncio). Therefore, `Runner.run()` is
a blocking call, with the same behavior regardless of the context.
- async use is also supported - via `run_coro()` - but it will not support `KeyboardInterrupt`
handling.

## Installation
```shell
pip install rate_limited
```

## Usage
In short:
- wrap your function with a `Runner()`, describing the rate limits you have
- call `Runner.schedule()` to schedule a request
- call `Runner.schedule()` to schedule a request - with the same arguments as you would call the
original function
- call `Runner.run()` to run the scheduled requests, get the results and any exceptions raised

### Installation
```shell
pip install rate_limited
```
### Creating a Runner
The following arguments are required:
- `function` - the function to be called
- `resources` - a list of `Resource` objects, describing the rate limits you have (see examples below)
- `max_concurrent` - the maximum number of requests to be executed in parallel
Important optional arguments:
- `max_retries` - the maximum number of retries for a single request (default: 5)
- `validation_function` - a function that validates the response and returns `True` if it is valid
(e.g. conforms to the schema you expect). If not valid, the request will be retried.

### OpenAI example
```python
Expand Down Expand Up @@ -107,6 +116,46 @@ resources = [
]
```

### More complex resource descriptions

Overall, the core of an API description is a list of `Resource` objects, each describing a single
resource and its limit - e.g. "requests per minute", "tokens per minute", "images per hour", etc.

Each resource has:
- a name (just for logging/debugging)
- a quota (e.g. 100)
- a time window (e.g. 60 seconds)
- functions that extract the "billing" information:
- `arguments_usage_extractor`
- `results_usage_extractor`
- `max_results_usage_estimator`

Two distinct "billing" models are supported:
- Before the call - we register usage before making the call, based on the arguments of the call.

In these cases, just `arguments_usage_extractor` is needed.
- After the call - we register usage after making the call, based on the results of the call,
and possibly the arguments (if needed for some complex cases).

To avoid sending a flood or requests before the first ones complete (and we register any usage)
we need to estimate the maximum usage of each call, based on its arguments. We
"pre-allocate" this usage, then register the actual usage after the call completes.

In these cases, `results_usage_extractor` and `max_results_usage_estimator` are needed.

Both approaches can be used in the same Resource description if needed.

Note: it is assumed that resource usage "expires" fully after the time window elapses, without a
"gradual" decline. Some APIs (OpenAI) might use the "gradual" approach, and differ in other details,
but this approach seems sufficient to get "close enough" to the actual rate limits, without running
into them.

See [apis.openai.chat](./rate_limited/apis/openai/chat.py) for an example of a more complex API
description with multiple resources.




### Limiting the number of concurrent requests

The `max_concurrent` argument of `Runner` controls the number of concurrent requests.
Expand Down Expand Up @@ -159,14 +208,15 @@ flake8 && black --check . && mypy .
```

## TODOs:
- improved README and usage examples
- more ready-made API descriptions - incl. batched ones?
- fix the "interrupt and resume" test in Python 3.11
### Nice to have:
- (optional) slow start feature - pace the initial requests, instead of sending them all at once
- text-based logging if tqdm is not installed
- if/where possible, detect RateLimitExceeded - notify the user, slow down
- should runner.schedule return futures and enable "streaming" of results?
- support "streaming" and/or continuous operation:
- enable scheduling calls while running and/or getting inputs from generators
- support "streaming" results - perhaps similar to "as_completed" in asyncio?
- add timeouts option? (for now, the user is responsible for handling timeouts)
- OpenAI shares information about rate limits in http response headers - could it be used without
coupling too tightly with their API?
Expand Down
4 changes: 2 additions & 2 deletions rate_limited/apis/openai/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List

from rate_limited.apis.common import get_requests_per_minute
from rate_limited.calls import Call
from rate_limited.calls import Call, Result
from rate_limited.resources import Resource


Expand All @@ -28,7 +28,7 @@ def get_tokens_per_minute(quota: int, model_max_len: int) -> Resource:
)


def get_used_tokens(results: dict) -> int:
def get_used_tokens(call: Call, results: Result) -> int:
total_tokens = results.get("usage", {}).get("total_tokens", None)
if total_tokens is None:
raise ValueError("Could not find total_tokens in results")
Expand Down
4 changes: 3 additions & 1 deletion rate_limited/calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
from inspect import signature
from typing import Any, Callable, List

Result = Any


@dataclass
class Call:
function: Callable
args: tuple
kwargs: dict
num_retries: int = 0
result: Any = None
result: Result = None
exceptions: List[Exception] = field(default_factory=list)

@cached_property
Expand Down
6 changes: 3 additions & 3 deletions rate_limited/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from logging import getLogger
from typing import Collection, Optional

from rate_limited.calls import Call
from rate_limited.calls import Call, Result
from rate_limited.resources import Resource, Unit


Expand Down Expand Up @@ -51,10 +51,10 @@ def pre_allocate(self, call: Call):
if resource.max_results_usage_estimator:
resource.reserve_amount(resource.max_results_usage_estimator(call))

def register_result(self, result):
def register_result(self, call: Call, result: Result):
for resource in self.resources:
if resource.results_usage_extractor:
resource.add_usage(resource.results_usage_extractor(result))
resource.add_usage(resource.results_usage_extractor(call, result))

def remove_pre_allocation(self, call: Call):
# Right now assuming that pre-allocation is only based on the call, this could change
Expand Down
12 changes: 6 additions & 6 deletions rate_limited/resources.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from collections import deque
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Callable, Optional
from typing import Callable, Optional

from rate_limited.calls import Call
from rate_limited.calls import Call, Result

Unit = float

Expand All @@ -21,7 +21,7 @@ def __init__(
quota: Unit,
time_window_seconds: float,
arguments_usage_extractor: Optional[Callable[[Call], Unit]] = None,
results_usage_extractor: Optional[Callable[[Any], Unit]] = None,
results_usage_extractor: Optional[Callable[[Call, Result], Unit]] = None,
max_results_usage_estimator: Optional[Callable[[Call], Unit]] = None,
):
"""
Expand All @@ -32,9 +32,9 @@ def __init__(
quota: maximum amount of the resource that can be used in the time window
time_window_seconds: time window in seconds
arguments_usage_extractor: function that extracts the amount of resource used from
the arguments
the arguments, "billed" before the call is made
results_usage_extractor: function that extracts the amount of resource used from
the results
the results (and the arguments), "billed" after the call is made
max_results_usage_estimator: function that extracts an upper bound on the amount of
resource that might be used when results are returned, based on the arguments
(this is used to pre-allocate usage, pre-allocation is then replaced with the
Expand All @@ -50,7 +50,7 @@ def __init__(

self.arguments_usage_extractor = arguments_usage_extractor
self.results_usage_extractor = results_usage_extractor
self.max_results_usage_estimator = max_results_usage_estimator # TODO: consider renaming
self.max_results_usage_estimator = max_results_usage_estimator

if self.max_results_usage_estimator and not self.results_usage_extractor:
raise ValueError(
Expand Down
8 changes: 4 additions & 4 deletions rate_limited/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from concurrent.futures import ThreadPoolExecutor
from inspect import signature
from logging import getLogger
from typing import Any, Callable, Collection, List, Optional, Tuple
from typing import Callable, Collection, List, Optional, Tuple

from rate_limited.calls import Call
from rate_limited.calls import Call, Result
from rate_limited.exceptions import ValidationError
from rate_limited.progress_bar import ProgressBar
from rate_limited.queue import CompletionTrackingQueue
Expand All @@ -23,7 +23,7 @@ def __init__(
resources: Collection[Resource],
max_concurrent: int,
max_retries: int = 5,
validation_function: Optional[Callable[[Any], bool]] = None,
validation_function: Optional[Callable[[Result], bool]] = None,
progress_interval: float = 1.0,
long_wait_warning_seconds: Optional[float] = 2.0,
):
Expand Down Expand Up @@ -177,7 +177,7 @@ async def worker(self):
self.requests_executor_pool, self.function, *call.args, **call.kwargs
)
# TODO: are there cases where we need to register result-based usage on error?
self.resource_manager.register_result(result)
self.resource_manager.register_result(call, result)
if self.validation_function is not None:
if not self.validation_function(result):
raise ValidationError(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def dummy_resources(
name="points",
quota=num_points,
time_window_seconds=time_window_seconds,
results_usage_extractor=lambda x: x["used_points"],
results_usage_extractor=lambda _, result: result["used_points"],
max_results_usage_estimator=estimator,
),
]
Expand Down

0 comments on commit 442ba31

Please sign in to comment.