Skip to content

Commit

Permalink
updated draft
Browse files Browse the repository at this point in the history
  • Loading branch information
bazarnov committed Feb 20, 2024
1 parent dc9b4ee commit 0b8030c
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from dataclasses import dataclass
from typing import Any, Mapping, Optional

from airbyte_cdk.sources.declarative.types import Record, StreamSlice
from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor


@dataclass
class RechargeDateTimeBasedCursor(DatetimeBasedCursor):
"""
Override for the default `DatetimeBasedCursor` to make self.close_slice() to produce `min` value instead of `max` value.
This is the ONLY CHANGE MADE HERE, to make the SOURCE STATE proccessed correctly:
The `min` value should be determined, in the first place, since we would skip the records,
if they are updated manually, by the Customer, and the range in not AFTER the STATE value, but before.
"""

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
super().__post_init__(parameters=parameters)

def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
last_record_cursor_value = most_recent_record.get(self.cursor_field.eval(self.config)) if most_recent_record else None
stream_slice_value_end = stream_slice.get(self.partition_field_end.eval(self.config))
cursor_value_str_by_cursor_value_datetime = dict(
map(
lambda datetime_str: (self.parse_date(datetime_str), datetime_str),
filter(lambda item: item, [self._cursor, last_record_cursor_value, stream_slice_value_end]),
)
)
self._cursor = (
cursor_value_str_by_cursor_value_datetime[min(cursor_value_str_by_cursor_value_datetime.keys())]
if cursor_value_str_by_cursor_value_datetime
else None
)
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
version: 0.58.4

# COMMON DEFINITIONS
definitions:

# API VERSION REFERENCE
modern_api_version:
x-recharge-version: "2021-11"
deprecated_api_version:
x-recharge-version: "2021-01"

modern_api_version:
x-recharge-version: "2021-11"

# COMMON PARTS
schema_loader:
type: JsonFileSchemaLoader
Expand All @@ -22,14 +20,17 @@ definitions:
field_path: ["{{ parameters.get('data_path')}}"]
# apply default schema normalization
schema_normalization: Default

authenticator:
type: ApiKeyAuthenticator
api_token: "{{ config['access_token'] }}"
inject_into:
type: RequestOption
inject_into: header
field_name: X-Recharge-Access-Token
paginator:

# PAGINATORS
paginator_deprecated_api:
type: DefaultPaginator
page_token_option:
type: RequestOption
Expand All @@ -44,42 +45,80 @@ definitions:
start_from_page: 1
page_size: 250
inject_on_first_request: false
paginator_modern_api:
type: DefaultPaginator
page_token_option:
type: RequestOption
inject_into: request_parameter
field_name: cursor
page_size_option:
inject_into: request_parameter
type: RequestOption
field_name: limit
pagination_strategy:
type: CursorPagination
page_size: 1
cursor_value: '{{ response.get("next_cursor", {}) }}'
stop_condition: '{{ not response.get("next_cursor", {}) }}'

# REQUESTERS
default_requester:
requester_deprecated_api:
description: >-
Default Base Requester for Full Refresh streams
type: HttpRequester
url_base: https://api.rechargeapps.com/
path: "{{ parameters['name'] }}"
http_method: GET
authenticator:
$ref: "#/definitions/authenticator"
request_headers:
# for deprecated retriever we should use `2021-01` api version
$ref: "#/definitions/deprecated_api_version"
error_handler:
type: "DefaultErrorHandler"

requester_modern_api:
description: >-
Default Base Requester for Full Refresh streams
# TODO:
# WHEN the default HttpRequester is used, there is no option to omit passing additional
# query params along with `next_page_token`, the fix should be probably be made on the CDK lvl.
type: HttpRequester
url_base: https://api.rechargeapps.com/
path: "{{ parameters['name'] }}"
http_method: GET
authenticator:
$ref: "#/definitions/authenticator"
request_headers:
# for modern retriever we should use >= `2021-11` api version
$ref: "#/definitions/modern_api_version"
error_handler:
type: "DefaultErrorHandler"
# DEFAULT REQUESTER FOR MODERN API
default_retriever:
request_parameters:
updated_at_min: '{{ stream_state.get(''updated_at'') if stream_state else config[''start_date''] }}'

# RETRIEVER FOR `DEPRECATED API`
retriever_api_deprecated:
description: >-
Base Deprecated Recharge API Retriever for Full Refresh streams.
Doc: https://developer.rechargepayments.com/2021-01/versioning
Default Retriever for Deprecated API `2021-01` Full Refresh streams.
record_selector:
$ref: "#/definitions/selector"
requester:
$ref: "#/definitions/default_requester"
request_headers:
$ref: "#/definitions/modern_api_version"
$ref: "#/definitions/requester_deprecated_api"
paginator:
$ref: "#/definitions/paginator"
$ref: "#/definitions/paginator_deprecated_api"

# RETRIEVER FOR `DEPRECATED API`
retriever_api_deprecated:
$ref: "#/definitions/default_retriever"
# Override to have deprecated api version calls
# RETRIEVER FOR `MODERN API`
retriever_api_modern:
description: >-
Default Retriever for Modern API `2021-11` Full Refresh streams.
record_selector:
$ref: "#/definitions/selector"
requester:
$ref: "#/definitions/default_requester"
request_headers:
# for deprecated retriever we should use `2021-01` api version
$ref: "#/definitions/deprecated_api_version"
$ref: "#/definitions/requester_modern_api"
paginator:
$ref: "#/definitions/paginator_modern_api"

# RETRIEVER FOR `SHOP` STREAM
retriever_shop_stream:
$ref: "#/definitions/retriever_api_deprecated"
Expand All @@ -97,28 +136,82 @@ definitions:
$ref: "#/definitions/schema_loader"
retriever:
$ref: "#/definitions/retriever_api_deprecated"
$parameters:
start_date: "{{ config['start_date'] }}"
raise_on_http_errors: true
# $parameters:
# start_date: "{{ config['start_date'] }}"
# raise_on_http_errors: true

base_modern_api_stream:
primary_key: "id"
schema_loader:
$ref: "#/definitions/schema_loader"
retriever:
$ref: "#/definitions/retriever_api_modern"

# BASE INCREMENTAL STREAMS
base_deprecated_api_incremental_stream:
$ref: "#/definitions/base_deprecated_api_stream"
# incremental_sync:
# type: CustomIncrementalSync
# class_name: source_recharge.components.RechargeDateTimeBasedCursor
# cursor_field: "updated_at"
# cursor_datetime_formats:
# - '%Y-%m-%dT%H:%M:%S%z'
# datetime_format: '%Y-%m-%dT%H:%M:%S%z'
# start_datetime:
# type: MinMaxDatetime
# datetime: '{{ config[''start_date''] }}'
# datetime_format: '%Y-%m-%dT%H:%M:%SZ'
# start_time_option:
# inject_into: request_parameter
# type: RequestOption
# field_name: 'updated_at_min'

base_modern_api_incremental_stream:
$ref: "#/definitions/base_modern_api_stream"
incremental_sync:
# type: DatetimeBasedCursor
# The custom incremental sync was applied because,
# the cursor for the close_slice() method is determined as NOW(),
# instead of the real cursor field (updated_at) value from the record.
type: CustomIncrementalSync
class_name: source_recharge.components.datetime_based_cursor.RechargeDateTimeBasedCursor
cursor_field: "updated_at"
cursor_datetime_formats:
- '%Y-%m-%dT%H:%M:%S%z'
datetime_format: '%Y-%m-%dT%H:%M:%S%z'
start_datetime:
type: MinMaxDatetime
datetime: '{{ config[''start_date''] }}'
datetime_format: '%Y-%m-%dT%H:%M:%SZ'

# FULL-REFRESH STREAMS
# SHOP
shop_stream:
$ref: "#/definitions/base_deprecated_api_stream"
retriever:
$ref: "#/definitions/retriever_shop_stream"
primary_key: ["shop", "store"]
$parameters:
name: "shop"

# PRODUCTS
products_stream:
$ref: "#/definitions/base_deprecated_api_stream"
$parameters:
name: "products"
data_path: "products"

# INCREMENTAL STREAMS
# ORDERS
orders_stream:
$ref: "#/definitions/base_modern_api_incremental_stream"
$parameters:
name: "orders"
data_path: "orders"

streams:
- "#/definitions/shop_stream"
- "#/definitions/products_stream"
- "#/definitions/orders_stream"

check:
type: CheckStream
Expand Down

0 comments on commit 0b8030c

Please sign in to comment.