diff --git a/airbyte-integrations/connectors/source-recharge/source_recharge/components/datetime_based_cursor.py b/airbyte-integrations/connectors/source-recharge/source_recharge/components/datetime_based_cursor.py new file mode 100644 index 000000000000..0bf213b881bf --- /dev/null +++ b/airbyte-integrations/connectors/source-recharge/source_recharge/components/datetime_based_cursor.py @@ -0,0 +1,39 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from dataclasses import dataclass +from typing import Any, Mapping, Optional + +from airbyte_cdk.sources.declarative.types import Record, StreamSlice +from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor + + +@dataclass +class RechargeDateTimeBasedCursor(DatetimeBasedCursor): + """ + Override for the default `DatetimeBasedCursor` to make self.close_slice() to produce `min` value instead of `max` value. + + This is the ONLY CHANGE MADE HERE, to make the SOURCE STATE proccessed correctly: + The `min` value should be determined, in the first place, since we would skip the records, + if they are updated manually, by the Customer, and the range in not AFTER the STATE value, but before. + """ + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + super().__post_init__(parameters=parameters) + + def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None: + last_record_cursor_value = most_recent_record.get(self.cursor_field.eval(self.config)) if most_recent_record else None + stream_slice_value_end = stream_slice.get(self.partition_field_end.eval(self.config)) + cursor_value_str_by_cursor_value_datetime = dict( + map( + lambda datetime_str: (self.parse_date(datetime_str), datetime_str), + filter(lambda item: item, [self._cursor, last_record_cursor_value, stream_slice_value_end]), + ) + ) + self._cursor = ( + cursor_value_str_by_cursor_value_datetime[min(cursor_value_str_by_cursor_value_datetime.keys())] + if cursor_value_str_by_cursor_value_datetime + else None + ) diff --git a/airbyte-integrations/connectors/source-recharge/source_recharge/manifest.yaml b/airbyte-integrations/connectors/source-recharge/source_recharge/manifest.yaml index f826d3bb4939..f4bf72154dde 100644 --- a/airbyte-integrations/connectors/source-recharge/source_recharge/manifest.yaml +++ b/airbyte-integrations/connectors/source-recharge/source_recharge/manifest.yaml @@ -1,14 +1,12 @@ version: 0.58.4 -# COMMON DEFINITIONS definitions: - # API VERSION REFERENCE - modern_api_version: - x-recharge-version: "2021-11" deprecated_api_version: x-recharge-version: "2021-01" - + modern_api_version: + x-recharge-version: "2021-11" + # COMMON PARTS schema_loader: type: JsonFileSchemaLoader @@ -22,6 +20,7 @@ definitions: field_path: ["{{ parameters.get('data_path')}}"] # apply default schema normalization schema_normalization: Default + authenticator: type: ApiKeyAuthenticator api_token: "{{ config['access_token'] }}" @@ -29,7 +28,9 @@ definitions: type: RequestOption inject_into: header field_name: X-Recharge-Access-Token - paginator: + + # PAGINATORS + paginator_deprecated_api: type: DefaultPaginator page_token_option: type: RequestOption @@ -44,42 +45,80 @@ definitions: start_from_page: 1 page_size: 250 inject_on_first_request: false + paginator_modern_api: + type: DefaultPaginator + page_token_option: + type: RequestOption + inject_into: request_parameter + field_name: cursor + page_size_option: + inject_into: request_parameter + type: RequestOption + field_name: limit + pagination_strategy: + type: CursorPagination + page_size: 1 + cursor_value: '{{ response.get("next_cursor", {}) }}' + stop_condition: '{{ not response.get("next_cursor", {}) }}' # REQUESTERS - default_requester: + requester_deprecated_api: + description: >- + Default Base Requester for Full Refresh streams + type: HttpRequester + url_base: https://api.rechargeapps.com/ + path: "{{ parameters['name'] }}" + http_method: GET + authenticator: + $ref: "#/definitions/authenticator" + request_headers: + # for deprecated retriever we should use `2021-01` api version + $ref: "#/definitions/deprecated_api_version" + error_handler: + type: "DefaultErrorHandler" + + requester_modern_api: description: >- Default Base Requester for Full Refresh streams + # TODO: + # WHEN the default HttpRequester is used, there is no option to omit passing additional + # query params along with `next_page_token`, the fix should be probably be made on the CDK lvl. type: HttpRequester url_base: https://api.rechargeapps.com/ path: "{{ parameters['name'] }}" http_method: GET authenticator: $ref: "#/definitions/authenticator" + request_headers: + # for modern retriever we should use >= `2021-11` api version + $ref: "#/definitions/modern_api_version" error_handler: type: "DefaultErrorHandler" - # DEFAULT REQUESTER FOR MODERN API - default_retriever: + request_parameters: + updated_at_min: '{{ stream_state.get(''updated_at'') if stream_state else config[''start_date''] }}' + + # RETRIEVER FOR `DEPRECATED API` + retriever_api_deprecated: description: >- - Base Deprecated Recharge API Retriever for Full Refresh streams. - Doc: https://developer.rechargepayments.com/2021-01/versioning + Default Retriever for Deprecated API `2021-01` Full Refresh streams. record_selector: $ref: "#/definitions/selector" requester: - $ref: "#/definitions/default_requester" - request_headers: - $ref: "#/definitions/modern_api_version" + $ref: "#/definitions/requester_deprecated_api" paginator: - $ref: "#/definitions/paginator" + $ref: "#/definitions/paginator_deprecated_api" - # RETRIEVER FOR `DEPRECATED API` - retriever_api_deprecated: - $ref: "#/definitions/default_retriever" - # Override to have deprecated api version calls + # RETRIEVER FOR `MODERN API` + retriever_api_modern: + description: >- + Default Retriever for Modern API `2021-11` Full Refresh streams. + record_selector: + $ref: "#/definitions/selector" requester: - $ref: "#/definitions/default_requester" - request_headers: - # for deprecated retriever we should use `2021-01` api version - $ref: "#/definitions/deprecated_api_version" + $ref: "#/definitions/requester_modern_api" + paginator: + $ref: "#/definitions/paginator_modern_api" + # RETRIEVER FOR `SHOP` STREAM retriever_shop_stream: $ref: "#/definitions/retriever_api_deprecated" @@ -97,11 +136,56 @@ definitions: $ref: "#/definitions/schema_loader" retriever: $ref: "#/definitions/retriever_api_deprecated" - $parameters: - start_date: "{{ config['start_date'] }}" - raise_on_http_errors: true + # $parameters: + # start_date: "{{ config['start_date'] }}" + # raise_on_http_errors: true + + base_modern_api_stream: + primary_key: "id" + schema_loader: + $ref: "#/definitions/schema_loader" + retriever: + $ref: "#/definitions/retriever_api_modern" + + # BASE INCREMENTAL STREAMS + base_deprecated_api_incremental_stream: + $ref: "#/definitions/base_deprecated_api_stream" + # incremental_sync: + # type: CustomIncrementalSync + # class_name: source_recharge.components.RechargeDateTimeBasedCursor + # cursor_field: "updated_at" + # cursor_datetime_formats: + # - '%Y-%m-%dT%H:%M:%S%z' + # datetime_format: '%Y-%m-%dT%H:%M:%S%z' + # start_datetime: + # type: MinMaxDatetime + # datetime: '{{ config[''start_date''] }}' + # datetime_format: '%Y-%m-%dT%H:%M:%SZ' + # start_time_option: + # inject_into: request_parameter + # type: RequestOption + # field_name: 'updated_at_min' + + base_modern_api_incremental_stream: + $ref: "#/definitions/base_modern_api_stream" + incremental_sync: + # type: DatetimeBasedCursor + # The custom incremental sync was applied because, + # the cursor for the close_slice() method is determined as NOW(), + # instead of the real cursor field (updated_at) value from the record. + type: CustomIncrementalSync + class_name: source_recharge.components.datetime_based_cursor.RechargeDateTimeBasedCursor + cursor_field: "updated_at" + cursor_datetime_formats: + - '%Y-%m-%dT%H:%M:%S%z' + datetime_format: '%Y-%m-%dT%H:%M:%S%z' + start_datetime: + type: MinMaxDatetime + datetime: '{{ config[''start_date''] }}' + datetime_format: '%Y-%m-%dT%H:%M:%SZ' # FULL-REFRESH STREAMS + # SHOP shop_stream: $ref: "#/definitions/base_deprecated_api_stream" retriever: @@ -109,16 +193,25 @@ definitions: primary_key: ["shop", "store"] $parameters: name: "shop" - + # PRODUCTS products_stream: $ref: "#/definitions/base_deprecated_api_stream" $parameters: name: "products" data_path: "products" + + # INCREMENTAL STREAMS + # ORDERS + orders_stream: + $ref: "#/definitions/base_modern_api_incremental_stream" + $parameters: + name: "orders" + data_path: "orders" streams: - "#/definitions/shop_stream" - "#/definitions/products_stream" + - "#/definitions/orders_stream" check: type: CheckStream