Skip to content

Commit

Permalink
✨ Source-Stripe: Add incremental stream support to accounts and to …
Browse files Browse the repository at this point in the history
…`customer_balance_transactions` streams (#46864)

Co-authored-by: Augustin <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
3 people authored Nov 29, 2024
1 parent e3ad816 commit 77e7c0d
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,10 @@
"description": "The date and time when the transaction was created",
"type": ["null", "integer"]
},
"updated": {
"description": "The date and time when the transaction was created",
"type": ["null", "integer"]
},
"credit_note": {
"description": "Credit note related to the balance transaction",
"type": ["null", "string"]
Expand Down Expand Up @@ -3289,6 +3293,10 @@
"description": "The timestamp when the account was created.",
"type": ["null", "integer"]
},
"updated": {
"description": "The timestamp when the account was updated.",
"type": ["null", "integer"]
},
"default_currency": {
"description": "The default currency used for transactions.",
"type": ["null", "string"]
Expand Down Expand Up @@ -3839,9 +3847,10 @@
}
}
},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["id"]],
"is_resumable": true
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated"],
"source_defined_primary_key": [["id"]]
},
{
"name": "shipping_rates",
Expand Down Expand Up @@ -12870,9 +12879,11 @@
}
}
},
"supported_sync_modes": ["full_refresh"],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["created"],
"source_defined_primary_key": [["id"]],
"is_resumable": false
"is_resumable": true
},
{
"name": "usage_records",
Expand Down
2 changes: 2 additions & 0 deletions airbyte-integrations/connectors/source-stripe/erd/source.dbml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ Table "customer_balance_transactions" {
"livemode" boolean
"metadata" object
"type" string
"updated" integer
}

Table "events" {
Expand Down Expand Up @@ -186,6 +187,7 @@ Table "accounts" {
"company" object
"country" string
"created" integer
"updated" integer
"default_currency" string
"details_submitted" boolean
"email" string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,5 +292,26 @@
"stream_state": { "updated": 10000000000 },
"stream_descriptor": { "name": "persons" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "updated": 10000000000 },
"stream_descriptor": { "name": "customer_balance_transactions" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "updated": 10000000000 },
"stream_descriptor": { "name": "accounts" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "created": 10000000000 },
"stream_descriptor": { "name": "transfer_reversals" }
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
"stream": {
"name": "accounts",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"source_defined_primary_key": [["id"]]
},
"primary_key": [["id"]],
"cursor_field": ["updated"],
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
Expand Down Expand Up @@ -183,7 +185,9 @@
"stream": {
"name": "customer_balance_transactions",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated"],
"source_defined_primary_key": [["id"]]
},
"primary_key": [["id"]],
Expand Down Expand Up @@ -599,11 +603,13 @@
{
"stream": {
"name": "transfer_reversals",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["created"],
"source_defined_primary_key": [["id"]]
},
"primary_key": [["id"]],
"cursor_field": ["created"],
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 5.7.0
dockerImageTag: 5.8.0
dockerRepository: airbyte/source-stripe
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
erdUrl: https://dbdocs.io/airbyteio/source-stripe?view=relationships
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "5.7.0"
version = "5.8.0"
name = "source-stripe"
description = "Source implementation for Stripe."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@
"description": "The timestamp when the account was created.",
"type": ["null", "integer"]
},
"updated": {
"description": "The timestamp when the account was updated.",
"type": ["null", "integer"]
},
"default_currency": {
"description": "The default currency used for transactions.",
"type": ["null", "string"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
"description": "The date and time when the transaction was created",
"type": ["null", "integer"]
},
"updated": {
"description": "The date and time when the transaction was created",
"type": ["null", "integer"]
},
"credit_note": {
"description": "Credit note related to the balance transaction",
"type": ["null", "string"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from source_stripe.streams import (
CreatedCursorIncrementalStripeStream,
CustomerBalanceTransactions,
Events,
IncrementalStripeStream,
ParentIncrementalStipeSubStream,
ParentIncrementalStripeSubStream,
SetupAttempts,
StripeLazySubStream,
StripeStream,
Expand Down Expand Up @@ -277,7 +276,6 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:

streams = [
checkout_sessions,
CustomerBalanceTransactions(**args),
Events(**incremental_args),
UpdatedCursorIncrementalStripeStream(
name="external_account_cards",
Expand Down Expand Up @@ -305,7 +303,19 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
**args,
),
SetupAttempts(**incremental_args),
StripeStream(name="accounts", path="accounts", use_cache=USE_CACHE, **args),
UpdatedCursorIncrementalStripeStream(
name="accounts",
path="accounts",
legacy_cursor_field="created",
event_types=[
"account.updated",
"account.external_account.created",
"account.external_account.updated",
"account.external_account.deleted",
],
use_cache=USE_CACHE,
**args,
),
CreatedCursorIncrementalStripeStream(name="shipping_rates", path="shipping_rates", **incremental_args),
CreatedCursorIncrementalStripeStream(name="balance_transactions", path="balance_transactions", **incremental_args),
CreatedCursorIncrementalStripeStream(name="files", path="files", **incremental_args),
Expand Down Expand Up @@ -479,6 +489,14 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
event_types=["topup.canceled", "topup.created", "topup.failed", "topup.reversed", "topup.succeeded"],
**args,
),
UpdatedCursorIncrementalStripeSubStream(
name="customer_balance_transactions",
path=lambda self, stream_slice, *args, **kwargs: f"customers/{stream_slice['parent']['id']}/balance_transactions",
parent=self.customers(**args),
legacy_cursor_field="created",
event_types=["customer_cash_balance_transaction.*"],
**args,
),
UpdatedCursorIncrementalStripeLazySubStream(
name="application_fees_refunds",
path=lambda self, stream_slice, *args, **kwargs: f"application_fees/{stream_slice['parent']['id']}/refunds",
Expand All @@ -504,7 +522,7 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
response_filter=lambda record: record["object"] == "bank_account",
**args,
),
ParentIncrementalStipeSubStream(
ParentIncrementalStripeSubStream(
name="checkout_sessions_line_items",
path=lambda self, stream_slice, *args, **kwargs: f"checkout/sessions/{stream_slice['parent']['id']}/line_items",
parent=checkout_sessions,
Expand Down Expand Up @@ -542,10 +560,11 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
**args,
),
subscription_items,
StripeSubStream(
ParentIncrementalStripeSubStream(
name="transfer_reversals",
path=lambda self, stream_slice, *args, **kwargs: f"transfers/{stream_slice['parent']['id']}/reversals",
parent=transfers,
cursor_field="created",
**args,
),
StripeSubStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,40 +522,6 @@ def read_records(
yield from self.parent_stream.read_records(sync_mode, cursor_field, stream_slice, stream_state)


class CustomerBalanceTransactions(StripeStream):
"""
API docs: https://stripe.com/docs/api/customer_balance_transactions/list
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.parent = IncrementalStripeStream(
name="customers",
path="customers",
use_cache=USE_CACHE,
event_types=["customer.created", "customer.updated", "customer.deleted"],
authenticator=kwargs.get("authenticator"),
account_id=self.account_id,
start_date=self.start_date,
)

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs):
return f"customers/{stream_slice['id']}/balance_transactions"

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
slices = self.parent.stream_slices(sync_mode=SyncMode.full_refresh)
for _slice in slices:
for customer in self.parent.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice):
# we use `get` here because some attributes may not be returned by some API versions
if customer.get("next_invoice_sequence") == 1 and customer.get("balance") == 0:
# We're making this check in order to speed up a sync. if a customer's balance is 0 and there are no
# associated invoices, he shouldn't have any balance transactions. So we're saving time of one API call per customer.
continue
yield customer


class SetupAttempts(CreatedCursorIncrementalStripeStream, HttpSubStream):
"""
Docs: https://stripe.com/docs/api/setup_attempts/list
Expand Down Expand Up @@ -781,7 +747,7 @@ def read_records(
)


class ParentIncrementalStipeSubStream(StripeSubStream):
class ParentIncrementalStripeSubStream(StripeSubStream):
is_resumable = True
"""
This stream differs from others in that it runs parent stream in exactly same sync mode it is run itself to generate stream slices.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import freezegun
import pendulum
import pytest
from source_stripe.streams import CustomerBalanceTransactions, SetupAttempts, StripeStream, UpdatedCursorIncrementalStripeSubStream
from source_stripe.streams import SetupAttempts, StripeStream, UpdatedCursorIncrementalStripeSubStream


def read_from_stream(stream, sync_mode, state):
Expand Down Expand Up @@ -523,26 +523,6 @@ def test_updated_cursor_incremental_stream_read_w_state(requests_mock, stream_by
]
assert records == [{"object": "credit_note", "invoice": "in_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716, "updated": 1691629292}]


def test_customer_balance_transactions_stream_slices(requests_mock, stream_args):
stream_args["start_date"] = pendulum.now().subtract(days=1).int_timestamp
requests_mock.get(
"/v1/customers",
json={
"data": [
{"id": 1, "next_invoice_sequence": 1, "balance": 0, "created": 1653341716},
{"id": 2, "created": 1653341000},
{"id": 3, "next_invoice_sequence": 13, "balance": 343.43, "created": 1651716334},
]
},
)
stream = CustomerBalanceTransactions(**stream_args)
assert list(stream.stream_slices("full_refresh")) == [
{"id": 2, "created": 1653341000, "updated": 1653341000},
{"id": 3, "next_invoice_sequence": 13, "balance": 343.43, "created": 1651716334, "updated": 1651716334},
]


@freezegun.freeze_time("2023-08-23T15:00:15Z")
def test_setup_attempts(requests_mock, incremental_stream_args):
requests_mock.get(
Expand Down
5 changes: 5 additions & 0 deletions docs/integrations/sources/stripe.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ The Stripe source connector supports the following streams:
- [Transactions](https://stripe.com/docs/api/transfers/list) \(Incremental\)
- [Transfers](https://stripe.com/docs/api/transfers/list) \(Incremental\)
- [Transfer Reversals](https://stripe.com/docs/api/transfer_reversals/list)
<<<<<<< HEAD
- [Usage Records](https://stripe.com/docs/api/usage_records/)
=======
- [Usage Records](https://stripe.com/docs/api/usage_records)
>>>>>>> master
### Entity-Relationship Diagram (ERD)
<EntityRelationshipDiagram></EntityRelationshipDiagram>
Expand Down Expand Up @@ -241,6 +245,7 @@ Each record is marked with `is_deleted` flag when the appropriate event happens

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :-------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 5.8.0 | 2024-10-12 | [46864](https://github.com/airbytehq/airbyte/pull/46864) | Add incremental stream support to `accounts` stream |
| 5.7.0 | 2024-10-01 | [45860](https://github.com/airbytehq/airbyte/pull/45860) | Add incremental stream support to `invoice_line_items` and `subscription_items` streams |
| 5.6.2 | 2024-10-05 | [43881](https://github.com/airbytehq/airbyte/pull/43881) | Update dependencies |
| 5.6.1 | 2024-10-03 | [46327](https://github.com/airbytehq/airbyte/pull/46327) | Bump the cdk to 5.10.2 to stop using PrintBuffer optimization due to record count mismatches |
Expand Down

0 comments on commit 77e7c0d

Please sign in to comment.