Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code): Add API Budget #314

Merged
merged 16 commits into from
Feb 12, 2025
175 changes: 174 additions & 1 deletion airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ properties:
"$ref": "#/definitions/Spec"
concurrency_level:
"$ref": "#/definitions/ConcurrencyLevel"
api_budget:
title: API Budget
description: Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
anyOf:
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
- "$ref": "#/definitions/HTTPAPIBudget"
metadata:
type: object
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.
Expand Down Expand Up @@ -794,7 +799,7 @@ definitions:
description: This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month)
type: object
required:
- target
- target
properties:
target:
title: Target
Expand Down Expand Up @@ -1365,6 +1370,174 @@ definitions:
$parameters:
type: object
additional_properties: true
HTTPAPIBudget:
title: HTTP API Budget
description: >
An HTTP-specific API budget that extends APIBudget by updating rate limiting information based
on HTTP response headers. It extracts available calls and the next reset timestamp from the HTTP responses.
type: object
required:
- type
- policies
properties:
type:
type: string
enum: [HTTPAPIBudget]
policies:
title: Policies
description: List of call rate policies that define how many calls are allowed.
type: array
items:
anyOf:
- "$ref": "#/definitions/FixedWindowCallRatePolicy"
- "$ref": "#/definitions/MovingWindowCallRatePolicy"
- "$ref": "#/definitions/UnlimitedCallRatePolicy"
ratelimit_reset_header:
title: Rate Limit Reset Header
description: The HTTP response header name that indicates when the rate limit resets.
type: string
default: "ratelimit-reset"
ratelimit_remaining_header:
title: Rate Limit Remaining Header
description: The HTTP response header name that indicates the number of remaining allowed calls.
type: string
default: "ratelimit-remaining"
status_codes_for_ratelimit_hit:
title: Status Codes for Rate Limit Hit
description: List of HTTP status codes that indicate a rate limit has been hit.
type: array
items:
type: integer
default: [429]
maximum_attempts_to_acquire:
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
title: Maximum Attempts to Acquire
description: The maximum number of attempts to acquire a call before giving up.
type: integer
default: 100000
additionalProperties: true
FixedWindowCallRatePolicy:
title: Fixed Window Call Rate Policy
description: A policy that allows a fixed number of calls within a specific time window.
type: object
required:
- type
- period
- call_limit
- matchers
properties:
type:
type: string
enum: [FixedWindowCallRatePolicy]
period:
title: Period
description: The time interval for the rate limit window.
type: string
format: duration
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
call_limit:
title: Call Limit
description: The maximum number of calls allowed within the period.
type: integer
matchers:
title: Matchers
description: List of matchers that define which requests this policy applies to.
type: array
items:
"$ref": "#/definitions/HttpRequestRegexMatcher"
additionalProperties: true
MovingWindowCallRatePolicy:
title: Moving Window Call Rate Policy
description: A policy that allows a fixed number of calls within a moving time window.
type: object
required:
- type
- rates
- matchers
properties:
type:
type: string
enum: [MovingWindowCallRatePolicy]
rates:
title: Rates
description: List of rates that define the call limits for different time intervals.
type: array
items:
"$ref": "#/definitions/Rate"
matchers:
title: Matchers
description: List of matchers that define which requests this policy applies to.
type: array
items:
"$ref": "#/definitions/HttpRequestRegexMatcher"
additionalProperties: true
UnlimitedCallRatePolicy:
title: Unlimited Call Rate Policy
description: A policy that allows unlimited calls for specific requests.
type: object
required:
- type
- matchers
properties:
type:
type: string
enum: [UnlimitedCallRatePolicy]
matchers:
title: Matchers
description: List of matchers that define which requests this policy applies to.
type: array
items:
"$ref": "#/definitions/HttpRequestRegexMatcher"
additionalProperties: true
Rate:
title: Rate
description: Defines a rate limit with a specific number of calls allowed within a time interval.
type: object
required:
- limit
- interval
properties:
limit:
title: Limit
description: The maximum number of calls allowed within the interval.
type: integer
interval:
title: Interval
description: The time interval for the rate limit.
type: string
examples:
- "PT1H"
- "P1D"
additionalProperties: true
HttpRequestRegexMatcher:
title: HTTP Request Matcher
description: >
Matches HTTP requests based on method, base URL, URL path pattern, query parameters, and headers.
Use `url_base` to specify the scheme and host (without trailing slash) and
`url_path_pattern` to apply a regex to the request path.
type: object
properties:
method:
title: Method
description: The HTTP method to match (e.g., GET, POST).
type: string
url_base:
title: URL Base
description: The base URL (scheme and host, e.g. "https://api.example.com") to match.
type: string
url_path_pattern:
title: URL Path Pattern
description: A regular expression pattern to match the URL path.
type: string
params:
title: Parameters
description: The query parameters to match.
type: object
additionalProperties: true
headers:
title: Headers
description: The headers to match.
type: object
additionalProperties: true
additionalProperties: true
DefaultErrorHandler:
title: Default Error Handler
description: Component defining how to handle errors. Default behavior includes only retrying server errors (HTTP 5XX) and too many requests (HTTP 429) with an exponential backoff.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._source_config, config
)

api_budget_model = self._source_config.get("api_budget")
if api_budget_model:
self._constructor.set_api_budget(api_budget_model, config)

source_streams = [
self._constructor.create_component(
DeclarativeStreamModel,
Expand Down
140 changes: 140 additions & 0 deletions airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

from datetime import timedelta
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Union

Expand Down Expand Up @@ -642,6 +643,48 @@ class OAuthAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class Rate(BaseModel):
class Config:
extra = Extra.allow

limit: int = Field(
...,
description="The maximum number of calls allowed within the interval.",
title="Limit",
)
interval: str = Field(
...,
description="The time interval for the rate limit.",
examples=["PT1H", "P1D"],
title="Interval",
)


class HttpRequestRegexMatcher(BaseModel):
class Config:
extra = Extra.allow

method: Optional[str] = Field(
None, description="The HTTP method to match (e.g., GET, POST).", title="Method"
)
url_base: Optional[str] = Field(
None,
description='The base URL (scheme and host, e.g. "https://api.example.com") to match.',
title="URL Base",
)
url_path_pattern: Optional[str] = Field(
None,
description="A regular expression pattern to match the URL path.",
title="URL Path Pattern",
)
params: Optional[Dict[str, Any]] = Field(
None, description="The query parameters to match.", title="Parameters"
)
headers: Optional[Dict[str, Any]] = Field(
None, description="The headers to match.", title="Headers"
)


class DpathExtractor(BaseModel):
type: Literal["DpathExtractor"]
field_path: List[str] = Field(
Expand Down Expand Up @@ -1584,6 +1627,55 @@ class DatetimeBasedCursor(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FixedWindowCallRatePolicy(BaseModel):
class Config:
extra = Extra.allow

type: Literal["FixedWindowCallRatePolicy"]
period: timedelta = Field(
..., description="The time interval for the rate limit window.", title="Period"
)
call_limit: int = Field(
...,
description="The maximum number of calls allowed within the period.",
title="Call Limit",
)
matchers: List[HttpRequestRegexMatcher] = Field(
...,
description="List of matchers that define which requests this policy applies to.",
title="Matchers",
)


class MovingWindowCallRatePolicy(BaseModel):
class Config:
extra = Extra.allow

type: Literal["MovingWindowCallRatePolicy"]
rates: List[Rate] = Field(
...,
description="List of rates that define the call limits for different time intervals.",
title="Rates",
)
matchers: List[HttpRequestRegexMatcher] = Field(
...,
description="List of matchers that define which requests this policy applies to.",
title="Matchers",
)


class UnlimitedCallRatePolicy(BaseModel):
class Config:
extra = Extra.allow

type: Literal["UnlimitedCallRatePolicy"]
matchers: List[HttpRequestRegexMatcher] = Field(
...,
description="List of matchers that define which requests this policy applies to.",
title="Matchers",
)


class DefaultErrorHandler(BaseModel):
type: Literal["DefaultErrorHandler"]
backoff_strategies: Optional[
Expand Down Expand Up @@ -1715,6 +1807,44 @@ class CompositeErrorHandler(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class HTTPAPIBudget(BaseModel):
class Config:
extra = Extra.allow

type: Literal["HTTPAPIBudget"]
policies: List[
Union[
FixedWindowCallRatePolicy,
MovingWindowCallRatePolicy,
UnlimitedCallRatePolicy,
]
] = Field(
...,
description="List of call rate policies that define how many calls are allowed.",
title="Policies",
)
ratelimit_reset_header: Optional[str] = Field(
"ratelimit-reset",
description="The HTTP response header name that indicates when the rate limit resets.",
title="Rate Limit Reset Header",
)
ratelimit_remaining_header: Optional[str] = Field(
"ratelimit-remaining",
description="The HTTP response header name that indicates the number of remaining allowed calls.",
title="Rate Limit Remaining Header",
)
status_codes_for_ratelimit_hit: Optional[List[int]] = Field(
[429],
description="List of HTTP status codes that indicate a rate limit has been hit.",
title="Status Codes for Rate Limit Hit",
)
maximum_attempts_to_acquire: Optional[int] = Field(
100000,
description="The maximum number of attempts to acquire a call before giving up.",
title="Maximum Attempts to Acquire",
)


class ZipfileDecoder(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -1748,6 +1878,11 @@ class Config:
definitions: Optional[Dict[str, Any]] = None
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = Field(
None,
description="Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.",
title="API Budget",
)
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand All @@ -1774,6 +1909,11 @@ class Config:
definitions: Optional[Dict[str, Any]] = None
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = Field(
None,
description="Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.",
title="API Budget",
)
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand Down
Loading
Loading