From e1144024c0b0f61fde02e658e1404540f0219dac Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 14 Feb 2024 17:12:39 +0200 Subject: [PATCH] added source.py --- .../source-recharge/source_recharge/source.py | 70 +++---------------- 1 file changed, 10 insertions(+), 60 deletions(-) diff --git a/airbyte-integrations/connectors/source-recharge/source_recharge/source.py b/airbyte-integrations/connectors/source-recharge/source_recharge/source.py index 1d1ea875f3e3..2dd2f2794439 100644 --- a/airbyte-integrations/connectors/source-recharge/source_recharge/source.py +++ b/airbyte-integrations/connectors/source-recharge/source_recharge/source.py @@ -2,66 +2,16 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource -from typing import Any, List, Mapping, Tuple, Union +""" +This file provides the necessary constructs to interpret a provided declarative YAML configuration file into +source connector. +WARNING: Do not modify this file. +""" -from airbyte_cdk import AirbyteLogger -from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources import AbstractSource -from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator -from .api import ( - Addresses, - Charges, - Collections, - Customers, - Discounts, - Metafields, - Onetimes, - OrdersDeprecatedApi, - OrdersModernApi, - Products, - Shop, - Subscriptions, -) - - -class RechargeTokenAuthenticator(TokenAuthenticator): - def get_auth_header(self) -> Mapping[str, Any]: - return {"X-Recharge-Access-Token": self._token} - - -class SourceRecharge(AbstractSource): - def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]: - auth = RechargeTokenAuthenticator(token=config["access_token"]) - stream = Shop(config, authenticator=auth) - try: - result = list(stream.read_records(SyncMode.full_refresh))[0] - if stream.name in result.keys(): - return True, None - except Exception as error: - return False, f"Unable to connect to Recharge API with the provided credentials - {repr(error)}" - - def select_orders_stream(self, config: Mapping[str, Any], **kwargs) -> Union[OrdersDeprecatedApi, OrdersModernApi]: - if config.get("use_orders_deprecated_api"): - return OrdersDeprecatedApi(config, **kwargs) - else: - return OrdersModernApi(config, **kwargs) - - def streams(self, config: Mapping[str, Any]) -> List[Stream]: - auth = RechargeTokenAuthenticator(token=config["access_token"]) - return [ - Addresses(config, authenticator=auth), - Charges(config, authenticator=auth), - Collections(config, authenticator=auth), - Customers(config, authenticator=auth), - Discounts(config, authenticator=auth), - Metafields(config, authenticator=auth), - Onetimes(config, authenticator=auth), - # select the Orders stream class, based on the UI toggle "Use `Orders` Deprecated API" - self.select_orders_stream(config, authenticator=auth), - Products(config, authenticator=auth), - Shop(config, authenticator=auth), - Subscriptions(config, authenticator=auth), - ] +# Declarative Source +class SourceRecharge(YamlDeclarativeSource): + def __init__(self): + super().__init__(**{"path_to_yaml": "manifest.yaml"})