Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code): Add API Budget #314

Merged
merged 16 commits into from
Feb 12, 2025
166 changes: 166 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,168 @@ definitions:
$parameters:
type: object
additional_properties: true
APIBudget:
title: API Budget
description: Component that defines how many requests can be made to the API in a given time frame.
type: object
required:
- type
properties:
type:
type: string
enum: [APIBudget]
policies:
title: Policies
description: List of policies that define the rate limits for different types of requests.
type: array
items:
anyOf:
- "$ref": "#/definitions/FixedWindowCallRatePolicy"
- "$ref": "#/definitions/MovingWindowCallRatePolicy"
- "$ref": "#/definitions/UnlimitedCallRatePolicy"
ratelimit_reset_header:
title: Rate Limit Reset Header
description: The name of the header that contains the timestamp for when the rate limit will reset.
type: string
default: "ratelimit-reset"
ratelimit_remaining_header:
title: Rate Limit Remaining Header
description: The name of the header that contains the number of remaining requests.
type: string
default: "ratelimit-remaining"
status_codes_for_ratelimit_hit:
title: Status Codes for Rate Limit Hit
description: List of HTTP status codes that indicate a rate limit has been hit.
type: array
items:
type: integer
default: [429]
maximum_attempts_to_acquire:
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
title: Maximum Attempts to Acquire
description: The maximum number of attempts to acquire a call before giving up.
type: integer
default: 100000
additionalProperties: true
FixedWindowCallRatePolicy:
title: Fixed Window Call Rate Policy
description: A policy that allows a fixed number of calls within a specific time window.
type: object
required:
- type
- next_reset_ts
- period
- call_limit
- matchers
properties:
type:
type: string
enum: [FixedWindowCallRatePolicy]
next_reset_ts:
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
title: Next Reset Timestamp
description: The timestamp when the rate limit will reset.
type: string
format: date-time
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
period:
title: Period
description: The time interval for the rate limit window.
type: string
format: duration
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
call_limit:
title: Call Limit
description: The maximum number of calls allowed within the period.
type: integer
matchers:
title: Matchers
description: List of matchers that define which requests this policy applies to.
type: array
items:
"$ref": "#/definitions/HttpRequestMatcher"
additionalProperties: true
MovingWindowCallRatePolicy:
title: Moving Window Call Rate Policy
description: A policy that allows a fixed number of calls within a moving time window.
type: object
required:
- type
- rates
- matchers
properties:
type:
type: string
enum: [MovingWindowCallRatePolicy]
rates:
title: Rates
description: List of rates that define the call limits for different time intervals.
type: array
items:
"$ref": "#/definitions/Rate"
matchers:
title: Matchers
description: List of matchers that define which requests this policy applies to.
type: array
items:
"$ref": "#/definitions/HttpRequestMatcher"
additionalProperties: true
UnlimitedCallRatePolicy:
title: Unlimited Call Rate Policy
description: A policy that allows unlimited calls for specific requests.
type: object
required:
- type
- matchers
properties:
type:
type: string
enum: [UnlimitedCallRatePolicy]
matchers:
title: Matchers
description: List of matchers that define which requests this policy applies to.
type: array
items:
"$ref": "#/definitions/HttpRequestMatcher"
additionalProperties: true
Rate:
title: Rate
description: Defines a rate limit with a specific number of calls allowed within a time interval.
type: object
required:
- limit
- interval
properties:
limit:
title: Limit
description: The maximum number of calls allowed within the interval.
type: integer
interval:
title: Interval
description: The time interval for the rate limit.
type: string
format: duration
additionalProperties: true
HttpRequestMatcher:
title: HTTP Request Matcher
description: Matches HTTP requests based on method, URL, parameters, and headers.
type: object
properties:
method:
title: Method
description: The HTTP method to match (e.g., GET, POST).
type: string
url:
title: URL
description: The URL to match.
type: string
params:
title: Parameters
description: The query parameters to match.
type: object
additionalProperties: true
headers:
title: Headers
description: The headers to match.
type: object
additionalProperties: true
additionalProperties: true
DefaultErrorHandler:
title: Default Error Handler
description: Component defining how to handle errors. Default behavior includes only retrying server errors (HTTP 5XX) and too many requests (HTTP 429) with an exponential backoff.
Expand Down Expand Up @@ -1637,6 +1799,10 @@ definitions:
- "$ref": "#/definitions/DefaultErrorHandler"
- "$ref": "#/definitions/CustomErrorHandler"
- "$ref": "#/definitions/CompositeErrorHandler"
api_budget:
title: API Budget
description: Component that defines how many requests can be made to the API in a given time frame.
"$ref": "#/definitions/APIBudget"
http_method:
title: HTTP Method
description: The HTTP method used to fetch data from the source (can be GET or POST).
Expand Down
130 changes: 130 additions & 0 deletions airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Union

Expand Down Expand Up @@ -642,6 +643,36 @@ class OAuthAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class Rate(BaseModel):
class Config:
extra = Extra.allow

limit: int = Field(
...,
description="The maximum number of calls allowed within the interval.",
title="Limit",
)
interval: timedelta = Field(
..., description="The time interval for the rate limit.", title="Interval"
)


class HttpRequestMatcher(BaseModel):
class Config:
extra = Extra.allow

method: Optional[str] = Field(
None, description="The HTTP method to match (e.g., GET, POST).", title="Method"
)
url: Optional[str] = Field(None, description="The URL to match.", title="URL")
params: Optional[Dict[str, Any]] = Field(
None, description="The query parameters to match.", title="Parameters"
)
headers: Optional[Dict[str, Any]] = Field(
None, description="The headers to match.", title="Headers"
)


class DpathExtractor(BaseModel):
type: Literal["DpathExtractor"]
field_path: List[str] = Field(
Expand Down Expand Up @@ -1578,6 +1609,60 @@ class DatetimeBasedCursor(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FixedWindowCallRatePolicy(BaseModel):
class Config:
extra = Extra.allow

type: Literal["FixedWindowCallRatePolicy"]
next_reset_ts: datetime = Field(
...,
description="The timestamp when the rate limit will reset.",
title="Next Reset Timestamp",
)
period: timedelta = Field(
..., description="The time interval for the rate limit window.", title="Period"
)
call_limit: int = Field(
...,
description="The maximum number of calls allowed within the period.",
title="Call Limit",
)
matchers: List[HttpRequestMatcher] = Field(
...,
description="List of matchers that define which requests this policy applies to.",
title="Matchers",
)


class MovingWindowCallRatePolicy(BaseModel):
class Config:
extra = Extra.allow

type: Literal["MovingWindowCallRatePolicy"]
rates: List[Rate] = Field(
...,
description="List of rates that define the call limits for different time intervals.",
title="Rates",
)
matchers: List[HttpRequestMatcher] = Field(
...,
description="List of matchers that define which requests this policy applies to.",
title="Matchers",
)


class UnlimitedCallRatePolicy(BaseModel):
class Config:
extra = Extra.allow

type: Literal["UnlimitedCallRatePolicy"]
matchers: List[HttpRequestMatcher] = Field(
...,
description="List of matchers that define which requests this policy applies to.",
title="Matchers",
)


class DefaultErrorHandler(BaseModel):
type: Literal["DefaultErrorHandler"]
backoff_strategies: Optional[
Expand Down Expand Up @@ -1709,6 +1794,46 @@ class CompositeErrorHandler(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class APIBudget(BaseModel):
class Config:
extra = Extra.allow

type: Literal["APIBudget"]
policies: Optional[
List[
Union[
FixedWindowCallRatePolicy,
MovingWindowCallRatePolicy,
UnlimitedCallRatePolicy,
]
]
] = Field(
None,
description="List of policies that define the rate limits for different types of requests.",
title="Policies",
)
ratelimit_reset_header: Optional[str] = Field(
"ratelimit-reset",
description="The name of the header that contains the timestamp for when the rate limit will reset.",
title="Rate Limit Reset Header",
)
ratelimit_remaining_header: Optional[str] = Field(
"ratelimit-remaining",
description="The name of the header that contains the number of remaining requests.",
title="Rate Limit Remaining Header",
)
status_codes_for_ratelimit_hit: Optional[List[int]] = Field(
[429],
description="List of HTTP status codes that indicate a rate limit has been hit.",
title="Status Codes for Rate Limit Hit",
)
maximum_attempts_to_acquire: Optional[int] = Field(
100000,
description="The maximum number of attempts to acquire a call before giving up.",
title="Maximum Attempts to Acquire",
)


class ZipfileDecoder(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -1979,6 +2104,11 @@ class HttpRequester(BaseModel):
description="Error handler component that defines how to handle errors.",
title="Error Handler",
)
api_budget: Optional[APIBudget] = Field(
None,
description="Component that defines how many requests can be made to the API in a given time frame.",
title="API Budget",
)
http_method: Optional[HttpMethod] = Field(
HttpMethod.GET,
description="The HTTP method used to fetch data from the source (can be GET or POST).",
Expand Down
Loading
Loading