diff --git a/airbyte-integrations/connectors/source-airtable/source_airtable/auth.py b/airbyte-integrations/connectors/source-airtable/source_airtable/auth.py index 692e09ecae40..0dd6f75b5d42 100644 --- a/airbyte-integrations/connectors/source-airtable/source_airtable/auth.py +++ b/airbyte-integrations/connectors/source-airtable/source_airtable/auth.py @@ -6,19 +6,23 @@ import requests from airbyte_cdk.models import FailureType +from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeSingleUseRefreshTokenOauth2Authenticator +from airbyte_cdk.sources.declarative.auth.token import BearerAuthenticator +from airbyte_cdk.sources.declarative.auth.token_provider import InterpolatedStringTokenProvider from airbyte_cdk.sources.streams.http.requests_native_auth import ( BasicHttpAuthenticator, - SingleUseRefreshTokenOauth2Authenticator, - TokenAuthenticator, ) from airbyte_cdk.utils import AirbyteTracedException -class AirtableOAuth(SingleUseRefreshTokenOauth2Authenticator): +class AirtableOAuth(DeclarativeSingleUseRefreshTokenOauth2Authenticator): """ https://airtable.com/developers/web/api/oauth-reference#token-expiry-refresh-tokens """ + connector_config: dict + token_refresh_endpoint: str + def build_refresh_request_headers(self) -> Mapping[str, Any]: """ https://airtable.com/developers/web/api/oauth-reference#token-refresh-request-headers @@ -56,13 +60,17 @@ def _get_refresh_access_token_response(self) -> Mapping[str, Any]: class AirtableAuth: - def __new__(cls, config: dict) -> Union[TokenAuthenticator, AirtableOAuth]: + config: dict + + def __new__(cls, config: dict) -> Union[BearerAuthenticator, AirtableOAuth]: # for old configs with api_key provided if "api_key" in config: - return TokenAuthenticator(token=(config or {}).get("api_key")) + token_provider = InterpolatedStringTokenProvider(api_token=config["api_key"], config=config, parameters={}) + return BearerAuthenticator(token_provider=token_provider, config=config, parameters={}) # for new oauth configs credentials = config["credentials"] if credentials["auth_method"] == "oauth2.0": return AirtableOAuth(config, "https://airtable.com/oauth2/v1/token") elif credentials["auth_method"] == "api_key": - return TokenAuthenticator(token=credentials["api_key"]) + token_provider = InterpolatedStringTokenProvider(api_token=credentials["api_key"], config=config, parameters={}) + return BearerAuthenticator(token_provider=token_provider, config=config, parameters={})