Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Stripe: Implement StripePartition #31696

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e45c5aa
Implement Stripe Partition
yevhenii-ldv Oct 22, 2023
1c091df
Update version and docs
yevhenii-ldv Oct 22, 2023
f568b03
Update source_defined_primary_key logic for ThreadBasedConcurrentStream
yevhenii-ldv Oct 22, 2023
3686904
Merge branch 'master' into ykurochkin/source-stripe/implement-partition
girarda Oct 23, 2023
a03a954
Update aftrer review
yevhenii-ldv Oct 25, 2023
44da760
Merge branch 'master' of github.com:airbytehq/airbyte into ykurochkin…
yevhenii-ldv Oct 25, 2023
f6cad71
Remove Dockerfile
yevhenii-ldv Oct 25, 2023
88764a5
Merge branch 'master' of github.com:airbytehq/airbyte into ykurochkin…
yevhenii-ldv Oct 26, 2023
9b9f351
Fix after merge master
yevhenii-ldv Oct 26, 2023
53e1fdd
Update version
yevhenii-ldv Oct 26, 2023
8770818
Add unit test
yevhenii-ldv Oct 26, 2023
d16cc86
Add record transform
yevhenii-ldv Oct 26, 2023
ebccd50
Merge branch 'master' of github.com:airbytehq/airbyte into ykurochkin…
yevhenii-ldv Oct 26, 2023
5db4806
Remove unused code
yevhenii-ldv Oct 26, 2023
c836834
Little updates
yevhenii-ldv Oct 27, 2023
93b34c5
Splitting CursorPaginationStrategy into LowCodeCursorPaginationStrate…
yevhenii-ldv Oct 31, 2023
a7bd8dd
Merge branch 'master' of github.com:airbytehq/airbyte into ykurochkin…
yevhenii-ldv Nov 2, 2023
9464124
Merge branch 'ykurochkin/cdk/decoupling-the-objects-from-declarative-…
yevhenii-ldv Nov 2, 2023
1872989
Update after cdk changed
yevhenii-ldv Nov 2, 2023
7b918f9
Merge branch 'ykurochkin/cdk/decoupling-the-objects-from-declarative-…
yevhenii-ldv Nov 2, 2023
468ed26
Add StripePaginator
yevhenii-ldv Nov 2, 2023
f7bf955
Merge branch 'ykurochkin/cdk/decoupling-the-objects-from-declarative-…
yevhenii-ldv Nov 3, 2023
d56902d
Merge branch 'ykurochkin/cdk/decoupling-the-objects-from-declarative-…
yevhenii-ldv Nov 3, 2023
1ba98c2
Change HttpRequester with SourceHttpRequester
yevhenii-ldv Nov 3, 2023
2619a23
Merge branch 'ykurochkin/cdk/decoupling-the-objects-from-declarative-…
yevhenii-ldv Nov 7, 2023
ee83ece
Add tests
yevhenii-ldv Nov 7, 2023
230e243
Bump version
yevhenii-ldv Nov 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ def as_airbyte_stream(self) -> AirbyteStream:

keys = self._primary_key
if keys and len(keys) > 0:
stream.source_defined_primary_key = [keys]
if isinstance(keys, str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you encounter a case where keys was a string and not a list? This wouldn't respect the interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, agree, will fix

stream.source_defined_primary_key = [[keys]]
else:
stream.source_defined_primary_key = [keys]

return stream

Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-stripe/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=4.4.1
LABEL io.airbyte.version=4.5.0
LABEL io.airbyte.name=airbyte/source-stripe
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 4.4.1
dockerImageTag: 4.5.0
dockerRepository: airbyte/source-stripe
githubIssueLabel: source-stripe
icon: stripe.svg
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you unit test these classes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import copy
from typing import Any, Callable, Iterable, Mapping, Optional, Union

import requests
from airbyte_cdk.sources.declarative.extractors import RecordSelector
from airbyte_cdk.sources.declarative.requesters import Requester
from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_protocol.models import SyncMode


class PaginatedRequester:
def __init__(self, requester: Requester, record_selector: RecordSelector, paginator: DefaultPaginator):
self._requester = requester
self._record_selector = record_selector
self._page_token_option = paginator.page_token_option
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why create a new field instead of always referring to paginator.page_token_option?

self._paginator = paginator

def send_requests(
self,
path: Optional[str] = None,
request_headers: Optional[Mapping[str, Any]] = None,
request_params: Optional[Mapping[str, Any]] = None,
request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
request_body_json: Optional[Mapping[str, Any]] = None,
log_formatter: Optional[Callable[[requests.Response], Any]] = None,
) -> Iterable[Mapping]:
next_page_token = None
pagination_complete = False
while not pagination_complete:
if next_page_token:
next_page_parameter = {self._page_token_option.field_name: next_page_token["next_page_token"]}

if self._page_token_option.inject_into == RequestOptionType.request_parameter:
request_params = request_params | next_page_parameter
elif self._page_token_option.inject_into == RequestOptionType.header:
request_headers = request_headers | next_page_parameter
elif self._page_token_option.inject_into == RequestOptionType.body_data:
request_body_data = request_body_data | next_page_parameter
elif self._page_token_option.inject_into == RequestOptionType.body_json:
request_body_json = request_body_json | next_page_parameter

response = self._requester.send_request(
path=path,
request_headers=request_headers,
request_params=request_params,
request_body_data=request_body_data,
request_body_json=request_body_json,
next_page_token=next_page_token,
log_formatter=log_formatter,
)
if not response:
pagination_complete = True
else:
records = list(self._record_selector.select_records(response, {}))
yield from records

next_page_token = self._paginator.next_page_token(response, records)
if not next_page_token:
pagination_complete = True


class StripePartitionGenerator(PartitionGenerator):
def __init__(self, stream: Stream, message_repository: MessageRepository, requester: PaginatedRequester):
self._stream = stream
self._message_repository = message_repository
self._requester = requester

def generate(self, sync_mode: SyncMode) -> Iterable[Partition]:
for s in self._stream.stream_slices(sync_mode=sync_mode):
yield StripePartition(copy.deepcopy(s), self._message_repository, self._requester)


class SourcePartition(Partition):
def __init__(self, _slice: Mapping[str, Any], message_repository: MessageRepository, requester: PaginatedRequester):
self._slice = _slice
self._message_repository = message_repository
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the message repository needed? it doesn't seem used

self._requester = requester

@property
def request_parameters(self) -> Optional[Mapping[str, Any]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't expose these properties publicly for arbitrary source because not all sources are HTTP API sources

return None

@property
def request_headers(self) -> Optional[Mapping[str, Any]]:
return None

@property
def request_body_data(self) -> Optional[Union[Mapping[str, Any], str]]:
return None

@property
def request_body_json(self) -> Optional[Mapping[str, Any]]:
return None

def read(self) -> Iterable[Record]:
for r in self._requester.send_requests(
request_params=self.request_parameters,
request_headers=self.request_headers,
request_body_data=self.request_body_data,
request_body_json=self.request_body_json,
):
yield Record(r)

def to_slice(self) -> Optional[Mapping[str, Any]]:
return self._slice

def __hash__(self) -> int:
return hash(str(self._slice))


class StripePartition(SourcePartition):
@property
def request_parameters(self) -> Mapping[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting these as constructor arguments would remove the need to create 1 class per connector, which would make the codebase easier to maintain

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've already done it in a similar way, I'm finalizing with the unit tests and will update the PR today.

params = {
"limit": 100,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this parameter should be set by the paginator

"created[gte]": self._slice.get("created[gte]") if self._slice else None,
"created[lte]": self._slice.get("created[lte]") if self._slice else None,
}

return {k: v for k, v in params.items() if v}
Loading
Loading