Skip to content

Commit

Permalink
updated auth.py
Browse files Browse the repository at this point in the history
  • Loading branch information
darynaishchenko committed Dec 16, 2024
1 parent 067b024 commit 465a7bb
Showing 1 changed file with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@

import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeSingleUseRefreshTokenOauth2Authenticator
from airbyte_cdk.sources.declarative.auth.token import BearerAuthenticator
from airbyte_cdk.sources.declarative.auth.token_provider import InterpolatedStringTokenProvider
from airbyte_cdk.sources.streams.http.requests_native_auth import (
BasicHttpAuthenticator,
SingleUseRefreshTokenOauth2Authenticator,
TokenAuthenticator,
)
from airbyte_cdk.utils import AirbyteTracedException


class AirtableOAuth(SingleUseRefreshTokenOauth2Authenticator):
class AirtableOAuth(DeclarativeSingleUseRefreshTokenOauth2Authenticator):
"""
https://airtable.com/developers/web/api/oauth-reference#token-expiry-refresh-tokens
"""

connector_config: dict
token_refresh_endpoint: str

def build_refresh_request_headers(self) -> Mapping[str, Any]:
"""
https://airtable.com/developers/web/api/oauth-reference#token-refresh-request-headers
Expand Down Expand Up @@ -56,13 +60,17 @@ def _get_refresh_access_token_response(self) -> Mapping[str, Any]:


class AirtableAuth:
def __new__(cls, config: dict) -> Union[TokenAuthenticator, AirtableOAuth]:
config: dict

def __new__(cls, config: dict) -> Union[BearerAuthenticator, AirtableOAuth]:
# for old configs with api_key provided
if "api_key" in config:
return TokenAuthenticator(token=(config or {}).get("api_key"))
token_provider = InterpolatedStringTokenProvider(api_token=config["api_key"], config=config, parameters={})
return BearerAuthenticator(token_provider=token_provider, config=config, parameters={})
# for new oauth configs
credentials = config["credentials"]
if credentials["auth_method"] == "oauth2.0":
return AirtableOAuth(config, "https://airtable.com/oauth2/v1/token")
elif credentials["auth_method"] == "api_key":
return TokenAuthenticator(token=credentials["api_key"])
token_provider = InterpolatedStringTokenProvider(api_token=credentials["api_key"], config=config, parameters={})
return BearerAuthenticator(token_provider=token_provider, config=config, parameters={})

0 comments on commit 465a7bb

Please sign in to comment.