Skip to content

Commit

Permalink
✨ Source Recharge: add new stream Events (#48382)
Browse files Browse the repository at this point in the history
Co-authored-by: Marcos Marx <[email protected]>
  • Loading branch information
jamhouston and marcosmarxm authored Nov 26, 2024
1 parent d93f6f7 commit f9c3c18
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ acceptance_tests:
bypass_reason: "The stream is tested with `Integration Tests`, since no data is available"
- name: onetimes
bypass_reason: "The stream is tested with `Integration Tests`, since no data is available"
- name: events
bypass_reason: "The stream is tested with `Integration Tests`, since no data is available"
- name: shop
bypass_reason: "The stream is not empty, but the primary key is the entire record, so it is constantly changing"
expect_records:
Expand All @@ -21,6 +23,8 @@ acceptance_tests:
bypass_reason: "The stream is tested with `Integration Tests`, since no data is available"
- name: onetimes
bypass_reason: "The stream is tested with `Integration Tests`, since no data is available"
- name: events
bypass_reason: "The stream is tested with `Integration Tests`, since no data is available"
- name: shop
bypass_reason: "The stream is not empty, but the primary key is the entire record, so it is constantly changing"
expect_records:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,12 @@
"stream_state": { "updated_at": "2050-05-18T00:00:00Z" },
"stream_descriptor": { "name": "subscriptions" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "created_at": "2050-05-18T00:00:00Z" },
"stream_descriptor": { "name": "events" }
}
}
]
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
{
"streams": [
{
"stream": {
"name": "events",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["created_at"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["created_at"]
},
{
"stream": {
"name": "subscriptions",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@
"destination_sync_mode": "append",
"cursor_field": ["updated_at"]
},
{
"stream": {
"name": "events",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["created_at"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["created_at"]
},
{
"stream": {
"name": "metafields",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ data:
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:2.0.0@sha256:c44839ba84406116e8ba68722a0f30e8f6e7056c726f447681bb9e9ece8bd916
definitionId: 45d2e135-2ede-49e1-939f-3e3ec357a65e
dockerImageTag: 2.4.15
dockerImageTag: 2.5.0
dockerRepository: airbyte/source-recharge
githubIssueLabel: source-recharge
icon: recharge.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.4.15"
version = "2.5.0"
name = "source-recharge"
description = "Source implementation for Recharge."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,8 @@
},
"subscriptions": {
"updated_at": "2021-04-02T00:00:00"
},
"events": {
"created_at": "2021-04-02T00:00:00"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,23 @@ definitions:
type: RequestOption
field_name: "updated_at_min"
inject_into: request_parameter
base_incremental_events_stream:
$ref: "#/definitions/base_modern_api_stream"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: created_at
cursor_datetime_formats:
- "%Y-%m-%dT%H:%M:%S%z"
- "%Y-%m-%dT%H:%M:%S"
datetime_format: "%Y-%m-%dT%H:%M:%S%z"
start_datetime:
type: MinMaxDatetime
datetime: "{{ config['start_date'] }}"
datetime_format: "%Y-%m-%dT%H:%M:%SZ"
start_time_option:
type: RequestOption
field_name: "created_at_min"
inject_into: request_parameter

# FULL-REFRESH STREAMS
# COLLECTIONS
Expand Down Expand Up @@ -256,6 +273,13 @@ definitions:
$parameters:
name: "subscriptions"
data_path: "subscriptions"
events_stream:
description: >-
Events Stream: https://developer.rechargepayments.com/2021-11/events/events_list
$ref: "#/definitions/base_incremental_events_stream"
$parameters:
name: "events"
data_path: "events"

streams:
- "#/definitions/addresses_stream"
Expand All @@ -268,6 +292,7 @@ streams:
- "#/definitions/products_stream"
- "#/definitions/shop_stream"
- "#/definitions/subscriptions_stream"
- "#/definitions/events_stream"
# The `orders` stream remains implemented in `streams.py` due to:
# 1. Inability to resolve `$ref` conditionally
# 2. Inability to dynamically switch between paginators (diff api versions, require diff pagination approach) (or create the CustomPaginator component)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": ["null", "integer"]
},
"customer_id": {
"type": ["null", "integer"]
},
"object_id": {
"type": ["null", "integer"]
},
"created_at": {
"type": ["null", "string"],
"format": "date-time"
},
"custom_attributes": {
"type": ["null", "array"]
},
"description": {
"type": ["null", "string"]
},
"object_type": {
"type": ["null", "string"]
},
"source": {
"type": ["null", "object"],
"properties": {
"account_id": {
"type": ["null", "integer"]
},
"api_token_id": {
"type": ["null", "integer"]
},
"account_email": {
"type": ["null", "string"]
},
"api_token_name": {
"type": ["null", "string"]
},
"origin": {
"type": ["null", "string"]
},
"user_type": {
"type": ["null", "string"]
}
}
},
"updated_attributes": {
"type": ["null", "array"]
},
"verb": {
"type": ["null", "string"]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ def with_access_token(self, access_token: str) -> RequestBuilder:
def with_old_api_version(self, api_version: str) -> RequestBuilder:
self._headers["X-Recharge-Version"] = api_version
return self

def with_created_min(self, value: str) -> RequestBuilder:
self._query_params["created_at_min"] = dt.datetime.strptime(value, DATE_TIME_FORMAT).strftime(DATE_TIME_FORMAT)
return self

def build(self) -> HttpRequest:
return HttpRequest(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#


from unittest import TestCase

import freezegun
from airbyte_cdk.test.mock_http import HttpMocker

from ..config import NOW, START_DATE
from ..response_builder import NEXT_PAGE_TOKEN, get_stream_record, get_stream_response
from ..utils import StreamTestCase, config, get_cursor_value_from_state_message, read_full_refresh, read_incremental

_STREAM_NAME = "events"
_CURSOR_FIELD = "created_at"


@freezegun.freeze_time(NOW.isoformat())
class TestFullRefresh(StreamTestCase):
_STREAM_NAME = "events"

@HttpMocker()
def test_given_one_page_when_read_then_return_records(self, http_mocker: HttpMocker) -> None:
req = self.stream_request().with_limit(250).with_created_min(START_DATE).build()
http_mocker.get(
req,
get_stream_response(_STREAM_NAME).with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD)).build(),
)
output = read_full_refresh(self._config, _STREAM_NAME)
assert len(output.records) == 1

@HttpMocker()
def test_given_multiple_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None:

http_mocker.get(
self.stream_request().with_limit(250).with_next_page_token(NEXT_PAGE_TOKEN).build(),
get_stream_response(_STREAM_NAME).with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD)).build(),
)
http_mocker.get(
self.stream_request().with_limit(250).with_created_min(START_DATE).build(),
get_stream_response(_STREAM_NAME).with_pagination().with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD)).build(),
)

output = read_full_refresh(self._config, _STREAM_NAME)
assert len(output.records) == 2


@freezegun.freeze_time(NOW.isoformat())
class TestIncremental(StreamTestCase):
_STREAM_NAME = "events"

@HttpMocker()
def test_state_message_produced_while_read_and_state_match_latest_record(self, http_mocker: HttpMocker) -> None:
min_cursor_value = "2024-01-01T00:00:00+00:00"
max_cursor_value = "2024-02-01T00:00:00+00:00"

http_mocker.get(
self.stream_request().with_limit(250).with_created_min(START_DATE).build(),
get_stream_response(_STREAM_NAME)
.with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD).with_cursor(min_cursor_value))
.with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD).with_cursor(max_cursor_value))
.build(),
)

output = read_incremental(self._config, _STREAM_NAME)
test_cursor_value = get_cursor_value_from_state_message(output, _CURSOR_FIELD)
assert test_cursor_value == max_cursor_value

@HttpMocker()
def test_given_multiple_pages_when_read_then_return_records_with_state(self, http_mocker: HttpMocker) -> None:
min_cursor_value = "2024-01-01T00:00:00+00:00"
max_cursor_value = "2024-02-01T00:00:00+00:00"
http_mocker.get(
self.stream_request().with_limit(250).with_next_page_token(NEXT_PAGE_TOKEN).build(),
get_stream_response(_STREAM_NAME).with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD)).build(),
)
http_mocker.get(
self.stream_request().with_limit(250).with_created_min(START_DATE).build(),
get_stream_response(_STREAM_NAME)
.with_pagination()
.with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD).with_cursor(min_cursor_value))
.with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD).with_cursor(max_cursor_value))
.build(),
)

output = read_incremental(self._config, _STREAM_NAME)
assert len(output.records) == 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"next_cursor": null,
"previous_cursor": null,
"events": [
{
"id": 1,
"customer_id": 1,
"object_id": 1,
"created_at": "2024-10-28T12:56:16",
"custom_attributes": [],
"description": "Cancelled subscription for Something from Customer",
"object_type": "subscription",
"source": {
"account_id": null,
"api_token_id": null,
"account_email": null,
"api_token_name": null,
"origin": null,
"user_type": null
},
"updated_attributes": [],
"verb": "cancelled"
},
{
"id": 2,
"customer_id": 2,
"object_id": 2,
"created_at": "2024-10-28T14:41:32",
"custom_attributes": [],
"description": "Deleted charge #2",
"object_type": "charge",
"source": {
"account_id": null,
"api_token_id": null,
"account_email": null,
"api_token_name": null,
"origin": "somewhere",
"user_type": "customer"
},
"updated_attributes": [],
"verb": "deleted"
},
{
"id": 3,
"customer_id": 3,
"object_id": 3,
"created_at": "2024-10-28T15:41:02",
"custom_attributes": [],
"description": "Failed charge #3",
"object_type": "charge",
"source": {
"account_id": null,
"api_token_id": null,
"account_email": null,
"api_token_name": null,
"origin": "api",
"user_type": null
},
"updated_attributes": [],
"verb": "failed"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_get_auth_header(config) -> None:

def test_streams(config) -> None:
streams = SourceRecharge().streams(config)
assert len(streams) == 11
assert len(streams) == 12


class TestCommon:
Expand Down
2 changes: 2 additions & 0 deletions docs/integrations/sources/recharge.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Several output streams are available from this source:
- [Addresses](https://developer.rechargepayments.com/v1-shopify?python#list-addresses) \(Incremental sync\)
- [Charges](https://developer.rechargepayments.com/v1-shopify?python#list-charges) \(Incremental sync\)
- [Collections](https://developer.rechargepayments.com/v1-shopify)
- [Events](https://developer.rechargepayments.com/2021-11/events/events_list)
- [Customers](https://developer.rechargepayments.com/v1-shopify?python#list-customers) \(Incremental sync\)
- [Discounts](https://developer.rechargepayments.com/v1-shopify?python#list-discounts) \(Incremental sync\)
- [Metafields](https://developer.rechargepayments.com/v1-shopify?python#list-metafields)
Expand All @@ -79,6 +80,7 @@ The Recharge connector should gracefully handle Recharge API limitations under n

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :------------------------------------------------------- |:-------------------------------------------------------------------------------------------------------------------------------|
| 2.5.0 | 2024-11-26 | [48382](https://github.com/airbytehq/airbyte/pull/48382) | Add new stream `events` |
| 2.4.15 | 2024-11-04 | [48242](https://github.com/airbytehq/airbyte/pull/48242) | Update dependencies |
| 2.4.14 | 2024-10-29 | [47890](https://github.com/airbytehq/airbyte/pull/47890) | Update dependencies |
| 2.4.13 | 2024-10-28 | [47037](https://github.com/airbytehq/airbyte/pull/47037) | Update dependencies |
Expand Down

0 comments on commit f9c3c18

Please sign in to comment.