Skip to content

Commit

Permalink
🐛 Source Recharge: fix pagination and slicing (#30756)
Browse files Browse the repository at this point in the history
  • Loading branch information
davydov-d authored Sep 26, 2023
1 parent 5bd2c1c commit 769e69b
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 139 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-recharge/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.1
LABEL io.airbyte.version=1.1.0
LABEL io.airbyte.name=airbyte/source-recharge
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"stream": "addresses", "data": {"id": 69105381, "customer_id": 64817252, "payment_method_id": 12482012, "address1": "1 9th Ave", "address2": "1", "city": "San Francisco", "company": null, "country_code": "US", "created_at": "2021-05-12T12:04:06+00:00", "discounts": [], "first_name": "Jane", "last_name": "Doe", "order_attributes": [], "order_note": null, "phone": "1234567890", "presentment_currency": "USD", "province": "California", "shipping_lines_conserved": [], "shipping_lines_override": [], "updated_at": "2023-01-16T09:59:09+00:00", "zip": "94118"}, "emitted_at": 1680895024611}
{"stream": "charges", "data": {"id": 386976088, "address_id": 69105381, "analytics_data": {"utm_params": []}, "billing_address": {"address1": "1 9th Ave", "address2": "1", "city": "San Francisco", "company": null, "country_code": "US", "first_name": "Karina", "last_name": "Kuznetsova", "phone": null, "province": "California", "zip": "94118"}, "charge_attempts": 6, "client_details": {"browser_ip": null, "user_agent": null}, "created_at": "2021-05-12T12:04:07+00:00", "currency": "USD", "customer": {"id": 64817252, "email": "[email protected]", "external_customer_id": {"ecommerce": "5212085977259"}, "hash": "23dee52d73734a81"}, "discounts": [], "error": "None\r\n [May 12, 12:06AM] ['Inventory unavailable S / Black T1 6642695864491 requested qty. 1, inventory was: -1']\r\n [May 13, 4:10PM] ['Inventory unavailable S / Black T1 6642695864491 requested qty. 1, inventory was: -1']\r\n [May 19, 4:10PM] ['Inventory unavailable S / Black T1 6642695864491 requested qty. 1, inventory was: -1']\r\n [May 25, 4:10PM] ['Inventory unavailable S / Black T1 6642695864491 requested qty. 1, inventory was: -1']\r\n [May 31, 4:09PM] ['Inventory unavailable S / Black T1 6642695864491 requested qty. 1, inventory was: -1']\r\n [Jun 06, 4:10PM] ['Inventory unavailable S / Black T1 6642695864491 requested qty. 1, inventory was: -1']", "error_type": "CLOSED_MAX_RETRIES_REACHED", "external_order_id": {"ecommerce": null}, "external_transaction_id": {"payment_processor": null}, "external_variant_not_found": null, "has_uncommitted_changes": false, "last_charge_attempt": "2022-06-06T20:10:19+00:00", "line_items": [{"purchase_item_id": 153224593, "external_product_id": {"ecommerce": "6642695864491"}, "external_variant_id": {"ecommerce": "39684722131115"}, "grams": 0, "handle": null, "images": {"large": "https://cdn.shopify.com/s/files/1/0565/0628/6251/products/t_neon_green_47f548d4-fda5-4e21-8066-1d4caadbe581_large.jpg", "medium": "https://cdn.shopify.com/s/files/1/0565/0628/6251/products/t_neon_green_47f548d4-fda5-4e21-8066-1d4caadbe581_medium.jpg", "original": "https://cdn.shopify.com/s/files/1/0565/0628/6251/products/t_neon_green_47f548d4-fda5-4e21-8066-1d4caadbe581.jpg", "small": "https://cdn.shopify.com/s/files/1/0565/0628/6251/products/t_neon_green_47f548d4-fda5-4e21-8066-1d4caadbe581_small.jpg"}, "original_price": "24.30", "properties": [], "purchase_item_type": "subscription", "quantity": 1, "sku": "T1", "tax_due": "0.00", "tax_lines": [], "taxable": true, "taxable_amount": "24.30", "title": "Airbit Box Corner Short sleeve t-shirt", "total_price": "24.30", "unit_price": "24.30", "unit_price_includes_tax": false, "variant_title": "S / Black"}], "note": null, "order_attributes": [], "orders_count": 0, "payment_processor": "shopify_payments", "processed_at": null, "retry_date": "2022-06-12T04:00:00+00:00", "scheduled_at": "2022-05-12", "shipping_address": {"address1": "1 9th Ave", "address2": "1", "city": "San Francisco", "company": null, "country_code": "US", "first_name": "Jane", "last_name": "Doe", "phone": "1234567890", "province": "California", "zip": "94118"}, "shipping_lines": [{"code": "Economy", "price": "4.90", "source": "shopify", "tax_lines": [], "taxable": false, "title": "Economy"}], "status": "error", "subtotal_price": "24.30", "tags": "Subscription, Subscription Recurring Order", "tax_lines": "[]", "taxable": true, "taxes_included": false, "total_discounts": "0.00", "total_duties": "0.00", "total_line_items_price": "24.30", "total_price": "29.20", "total_refunds": "0.00", "total_tax": "0.00", "total_weight_grams": 0, "type": "recurring", "updated_at": "2023-01-16T18:08:54+00:00"}, "emitted_at": 1687184458990}
{"stream": "customers", "data": {"id": 64817252, "analytics_data": {"utm_params": []}, "created_at": "2021-05-12T12:04:06+00:00", "email": "[email protected]", "external_customer_id": {"ecommerce": "5212085977259"}, "first_charge_processed_at": "2021-05-12T16:03:59+00:00", "first_name": "Karina", "has_payment_method_in_dunning": false, "has_valid_payment_method": true, "hash": "23dee52d73734a81", "last_name": "Kuznetsova", "phone": null, "subscriptions_active_count": 0, "subscriptions_total_count": 1, "tax_exempt": false, "updated_at": "2023-01-16T18:08:45+00:00"}, "emitted_at": 1687184599794}
{"stream": "customers", "data": {"id": 64817252, "analytics_data": {"utm_params": []}, "apply_credit_to_next_recurring_charge": false, "created_at": "2021-05-12T12:04:06+00:00", "email": "[email protected]", "external_customer_id": {"ecommerce": "5212085977259"}, "first_charge_processed_at": "2021-05-12T16:03:59+00:00", "first_name": "Karina", "has_payment_method_in_dunning": false, "has_valid_payment_method": true, "hash": "23dee52d73734a81", "last_name": "Kuznetsova", "phone": null, "subscriptions_active_count": 0, "subscriptions_total_count": 1, "tax_exempt": false, "updated_at": "2023-01-16T18:08:45+00:00"}, "emitted_at": 1695738229285}
{"stream": "products", "data": {"collection_id": null, "created_at": "2021-05-13T07:27:34", "discount_amount": 5.0, "discount_type": "percentage", "handle": "i-make-beats-wool-blend-snapback", "id": 1853639, "images": {"large": "https://cdn.shopify.com/s/files/1/0565/0628/6251/products/c_black1_large.jpg", "medium": "https://cdn.shopify.com/s/files/1/0565/0628/6251/products/c_black1_medium.jpg", "original": "https://cdn.shopify.com/s/files/1/0565/0628/6251/products/c_black1.jpg", "small": "https://cdn.shopify.com/s/files/1/0565/0628/6251/products/c_black1_small.jpg"}, "product_id": 6644278001835, "shopify_product_id": 6644278001835, "subscription_defaults": {"apply_cutoff_date_to_checkout": false, "charge_interval_frequency": 30, "cutoff_day_of_month": null, "cutoff_day_of_week": null, "expire_after_specific_number_of_charges": null, "modifiable_properties": [], "number_charges_until_expiration": null, "order_day_of_month": null, "order_day_of_week": null, "order_interval_frequency_options": ["30"], "order_interval_unit": "day", "storefront_purchase_options": "subscription_and_onetime"}, "title": "I Make Beats Wool Blend Snapback", "updated_at": "2021-05-13T07:27:34"}, "emitted_at": 1680895030371}
{"stream": "products", "data": {"collection_id": null, "created_at": "2021-05-13T08:20:10", "discount_amount": 0.0, "discount_type": "percentage", "handle": "new-mug", "id": 1853655, "images": {"large": "https://cdn.shopify.com/s/files/1/0565/0628/6251/products/m_black_red_large.jpg", "medium": "https://cdn.shopify.com/s/files/1/0565/0628/6251/products/m_black_red_medium.jpg", "original": "https://cdn.shopify.com/s/files/1/0565/0628/6251/products/m_black_red.jpg", "small": "https://cdn.shopify.com/s/files/1/0565/0628/6251/products/m_black_red_small.jpg"}, "product_id": 6688261701803, "shopify_product_id": 6688261701803, "subscription_defaults": {"apply_cutoff_date_to_checkout": false, "charge_interval_frequency": 30, "cutoff_day_of_month": null, "cutoff_day_of_week": null, "expire_after_specific_number_of_charges": null, "modifiable_properties": [], "number_charges_until_expiration": null, "order_day_of_month": null, "order_day_of_week": null, "order_interval_frequency_options": ["30"], "order_interval_unit": "day", "storefront_purchase_options": "subscription_and_onetime"}, "title": "NEW!!! MUG", "updated_at": "2021-05-13T08:20:10"}, "emitted_at": 1680895030371}
{"stream": "shop", "data": {"shop": {"allow_customers_to_skip_delivery": 1, "checkout_logo_url": null, "created_at": "Wed, 21 Apr 2021 11:44:38 GMT", "currency": "USD", "customer_portal_domain": "", "disabled_currencies_historical": [], "domain": "airbyte.myshopify.com", "email": "[email protected]", "enabled_presentment_currencies": ["USD"], "enabled_presentment_currencies_symbols": [{"currency": "USD", "location": "before", "suffix": " USD", "symbol": "$"}], "external_platform": "shopify", "iana_timezone": "Europe/Zaporozhye", "id": 126593, "my_shopify_domain": "airbyte.myshopify.com", "name": "airbyte", "payment_processor": "shopify_payments", "platform_domain": "airbyte.myshopify.com", "shop_email": "[email protected]", "shop_phone": "1111111111", "subscriptions_enabled": 1, "test_mode": false, "timezone": "(GMT+02:00) Europe/Zaporozhye", "updated_at": "Wed, 05 Apr 2023 02:44:22 GMT"}, "store": {"checkout_logo_url": null, "checkout_platform": "shopify", "created_at": "Wed, 21 Apr 2021 11:44:38 GMT", "currency": "USD", "customer_portal_domain": "", "disabled_currencies_historical": [], "domain": "airbyte.myshopify.com", "email": "[email protected]", "enabled_presentment_currencies": ["USD"], "enabled_presentment_currencies_symbols": [{"currency": "USD", "location": "before", "suffix": " USD", "symbol": "$"}], "external_platform": "shopify", "iana_timezone": "Europe/Zaporozhye", "id": 126593, "my_shopify_domain": "airbyte.myshopify.com", "name": "airbyte", "payment_processor": "shopify_payments", "platform_domain": "airbyte.myshopify.com", "shop_email": "[email protected]", "shop_phone": "1111111111", "subscriptions_enabled": 1, "test_mode": false, "timezone": "(GMT+02:00) Europe/Zaporozhye", "updated_at": "Wed, 05 Apr 2023 02:44:22 GMT"}}, "emitted_at": 1680895031312}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 45d2e135-2ede-49e1-939f-3e3ec357a65e
dockerImageTag: 1.0.1
dockerImageTag: 1.1.0
dockerRepository: airbyte/source-recharge
githubIssueLabel: source-recharge
icon: recharge.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from abc import ABC
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

import pendulum
Expand All @@ -11,9 +11,6 @@
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

API_VERSION_2021_11 = "2021-11"
API_VERSION_2021_01 = "2021-01"


class RechargeStream(HttpStream, ABC):
primary_key = "id"
Expand All @@ -35,32 +32,35 @@ def __init__(self, config, **kwargs):
def data_path(self):
return self.name

@property
@abstractmethod
def api_version(self) -> str:
pass

def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
return {"x-recharge-version": API_VERSION_2021_11}
return {"x-recharge-version": self.api_version}

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return self.name

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
cursor = response.json().get("next_cursor")
if cursor:
return {"cursor": cursor}
pass

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = {
"limit": self.limit,
"updated_at_min": (stream_slice or {}).get("start_date", self._start_date),
"updated_at_max": (stream_slice or {}).get("end_date", self._start_date),
}

if next_page_token:
params.update(next_page_token)
else:
params.update({"updated_at_min": (stream_state or {}).get("updated_at", self._start_date)})

return params

Expand Down Expand Up @@ -92,20 +92,37 @@ def stream_slices(

now = pendulum.now()

start_date = pendulum.parse(start_date)
# dates are inclusive, so we add 1 second so that time periods do not overlap
start_date = pendulum.parse(start_date).add(seconds=1)

while start_date <= now:
end_date = start_date.add(months=self.period_in_months)
yield {"start_date": start_date.strftime("%Y-%m-%d %H:%M:%S"), "end_date": end_date.strftime("%Y-%m-%d %H:%M:%S")}
start_date = end_date
start_date = end_date.add(seconds=1)


class RechargeStreamModernAPI(RechargeStream):
api_version = "2021-11"

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
cursor = response.json().get("next_cursor")
if cursor:
return {"cursor": cursor}


class RechargeStreamDeprecatedAPI(RechargeStream):
api_version = "2021-01"

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
stream_data = self.get_stream_data(response.json())
if len(stream_data) == self.limit:
self.page_num += 1
return {"page": self.page_num}


class IncrementalRechargeStream(RechargeStream, ABC):
cursor_field = "updated_at"

@property
def state_checkpoint_interval(self):
return self.limit
state_checkpoint_interval = 250

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
latest_benchmark = latest_record[self.cursor_field]
Expand All @@ -114,37 +131,37 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
return {self.cursor_field: latest_benchmark}


class Addresses(IncrementalRechargeStream):
class Addresses(RechargeStreamModernAPI, IncrementalRechargeStream):
"""
Addresses Stream: https://developer.rechargepayments.com/v1-shopify?python#list-addresses
"""


class Charges(IncrementalRechargeStream):
class Charges(RechargeStreamModernAPI, IncrementalRechargeStream):
"""
Charges Stream: https://developer.rechargepayments.com/v1-shopify?python#list-charges
"""


class Collections(RechargeStream):
class Collections(RechargeStreamModernAPI):
"""
Collections Stream
"""


class Customers(IncrementalRechargeStream):
class Customers(RechargeStreamModernAPI, IncrementalRechargeStream):
"""
Customers Stream: https://developer.rechargepayments.com/v1-shopify?python#list-customers
"""


class Discounts(IncrementalRechargeStream):
class Discounts(RechargeStreamModernAPI, IncrementalRechargeStream):
"""
Discounts Stream: https://developer.rechargepayments.com/v1-shopify?python#list-discounts
"""


class Metafields(RechargeStream):
class Metafields(RechargeStreamModernAPI):
"""
Metafields Stream: https://developer.rechargepayments.com/v1-shopify?python#list-metafields
"""
Expand All @@ -165,37 +182,27 @@ def stream_slices(
yield from [{"owner_resource": owner} for owner in owner_resources]


class Onetimes(IncrementalRechargeStream):
class Onetimes(RechargeStreamModernAPI, IncrementalRechargeStream):
"""
Onetimes Stream: https://developer.rechargepayments.com/v1-shopify?python#list-onetimes
"""


class Orders(IncrementalRechargeStream):
class Orders(RechargeStreamDeprecatedAPI, IncrementalRechargeStream):
"""
Orders Stream: https://developer.rechargepayments.com/v1-shopify?python#list-orders
Using old API version to avoid schema changes and loosing email, first_name, last_name columns, because in new version it not present
"""

def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
return {"x-recharge-version": API_VERSION_2021_01}


class Products(RechargeStream):
class Products(RechargeStreamDeprecatedAPI):
"""
Products Stream: https://developer.rechargepayments.com/v1-shopify?python#list-products
Products endpoint has 422 error with 2021-11 API version
"""

def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
return {"x-recharge-version": API_VERSION_2021_01}


class Shop(RechargeStream):
class Shop(RechargeStreamDeprecatedAPI):
"""
Shop Stream: https://developer.rechargepayments.com/v1-shopify?python#shop
Shop endpoint is not available in 2021-11 API version
Expand All @@ -204,13 +211,18 @@ class Shop(RechargeStream):
primary_key = ["shop", "store"]
data_path = None

def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
return {"x-recharge-version": API_VERSION_2021_01}
def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
return [{}]

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
return {}


class Subscriptions(IncrementalRechargeStream):
class Subscriptions(RechargeStreamModernAPI, IncrementalRechargeStream):
"""
Subscriptions Stream: https://developer.rechargepayments.com/v1-shopify?python#list-subscriptions
"""
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@
"updated_at": {
"type": ["null", "string"],
"format": "date-time"
},
"apply_credit_to_next_recurring_charge": {
"type": ["null", "boolean"]
}
}
}
Loading

0 comments on commit 769e69b

Please sign in to comment.