Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐙 source-twilio: per partition states for nested streams #49097

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b9dc6155-672e-42ea-b10d-9f1f1fb95ab1
dockerImageTag: 0.11.13
dockerImageTag: 0.12.0
dockerRepository: airbyte/source-twilio
documentationUrl: https://docs.airbyte.com/integrations/sources/twilio
githubIssueLabel: source-twilio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.11.13"
version = "0.12.0"
name = "source-twilio"
description = "Source implementation for Twilio."
authors = [ "Airbyte <[email protected]>",]
Expand Down
112 changes: 68 additions & 44 deletions airbyte-integrations/connectors/source-twilio/source_twilio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.types import StreamSlice
from airbyte_cdk.sources.streams import IncrementalMixin
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.http import HttpStream
Expand Down Expand Up @@ -133,7 +134,7 @@ def __init__(
self._slice_step = slice_step and pendulum.duration(days=slice_step)
self._start_date = start_date if start_date is not None else "1970-01-01T00:00:00Z"
self._lookback_window = lookback_window
self._cursor_value = None
self._state = {"states": []}

@property
def slice_step(self):
Expand All @@ -155,29 +156,26 @@ def upper_boundary_filter_field(self) -> str:

@property
def state(self) -> Mapping[str, Any]:
if self._cursor_value:
return {
self.cursor_field: self._cursor_value,
}

return {}
return self._state

@state.setter
def state(self, value: MutableMapping[str, Any]):
if self._lookback_window and value.get(self.cursor_field):
new_start_date = (
pendulum.parse(value[self.cursor_field]) - pendulum.duration(minutes=self._lookback_window)
).to_iso8601_string()
if new_start_date > self._start_date:
value[self.cursor_field] = new_start_date
self._cursor_value = value.get(self.cursor_field)

def generate_date_ranges(self) -> Iterable[Optional[MutableMapping[str, Any]]]:
if self._lookback_window:
lookback_duration = pendulum.duration(minutes=self._lookback_window)
for state in value.get("states", []):
cursor = state.get("cursor", {})
if self.cursor_field in cursor:
new_start_date = (pendulum.parse(cursor[self.cursor_field]) - lookback_duration).to_iso8601_string()
if new_start_date > self._start_date:
cursor[self.cursor_field] = new_start_date
self._state = value

def generate_date_ranges(self, partition: MutableMapping[str, Any]) -> Iterable[Optional[MutableMapping[str, Any]]]:
def align_to_dt_format(dt: DateTime) -> DateTime:
return pendulum.parse(dt.format(self.time_filter_template))

end_datetime = pendulum.now("utc")
start_datetime = min(end_datetime, pendulum.parse(self.state.get(self.cursor_field, self._start_date)))
start_datetime = min(end_datetime, self._min_datetime(partition))
current_start = start_datetime
current_end = start_datetime
# Aligning to a datetime format is done to avoid the following scenario:
Expand All @@ -195,23 +193,21 @@ def align_to_dt_format(dt: DateTime) -> DateTime:
current_start = current_end + self.slice_granularity

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: StreamSlice = None
) -> Iterable[Optional[Mapping[str, Any]]]:
for super_slice in super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state):
for dt_range in self.generate_date_ranges():
slice_ = copy.deepcopy(super_slice) if super_slice else {}
slice_.update(dt_range)
yield slice_
for dt_range in self.generate_date_ranges(super_slice.partition if super_slice else {}):
yield StreamSlice(partition=super_slice.partition if super_slice else {}, cursor_slice=dt_range)

def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
stream_slice: StreamSlice = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
lower_bound = stream_slice and stream_slice.get(self.lower_boundary_filter_field)
upper_bound = stream_slice and stream_slice.get(self.upper_boundary_filter_field)
lower_bound = stream_slice and stream_slice.cursor_slice.get(self.lower_boundary_filter_field)
upper_bound = stream_slice and stream_slice.cursor_slice.get(self.upper_boundary_filter_field)
if lower_bound:
params[self.lower_boundary_filter_field] = lower_bound
if upper_bound:
Expand All @@ -222,18 +218,36 @@ def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_slice: StreamSlice = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
unsorted_records = []
if stream_slice is None:
stream_slice = StreamSlice(partition={}, cursor_slice={})
max_cursor_value = self._get_partition_state(stream_slice.partition).get(self.cursor_field, self._start_date)
for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state):
record[self.cursor_field] = pendulum.parse(record[self.cursor_field], strict=False).to_iso8601_string()
unsorted_records.append(record)
sorted_records = sorted(unsorted_records, key=lambda x: x[self.cursor_field])
for record in sorted_records:
if record[self.cursor_field] >= self.state.get(self.cursor_field, self._start_date):
self._cursor_value = record[self.cursor_field]
yield record
if record[self.cursor_field] >= max_cursor_value:
max_cursor_value = record[self.cursor_field]
yield record
self._state = self._update_partition_state(stream_slice.partition, {self.cursor_field: max_cursor_value})

def _update_partition_state(self, partition: Mapping[str, Any], cursor: Mapping[str, Any]) -> Mapping[str, Any]:
states = self._state.get("states", [])
for state in states:
if state.get("partition") == partition:
state.update({"cursor": cursor})
return self._state
states.append({"partition": partition, "cursor": cursor})
return {"states": states}

def _get_partition_state(self, partition: Mapping[str, Any]) -> Mapping[str, Any]:
for state in self._state.get("states", []):
if state.get("partition") == partition:
return state.get("cursor", {})
return {}

def _min_datetime(self, partition: Mapping[str, Any]) -> DateTime:
return pendulum.parse(self._get_partition_state(partition).get(self.cursor_field, self._start_date))


class TwilioNestedStream(TwilioStream):
Expand Down Expand Up @@ -264,7 +278,7 @@ def parent_stream_instance(self):
return self.parent_stream(authenticator=self._session.auth)

def parent_record_to_stream_slice(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
return {"subresource_uri": record["subresource_uris"][self.subresource_uri_key]}
return StreamSlice(partition={"subresource_uri": record["subresource_uris"][self.subresource_uri_key]}, cursor_slice={})

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
stream_instance = self.parent_stream_instance
Expand Down Expand Up @@ -308,7 +322,7 @@ def path(self, stream_slice: Mapping[str, Any], **kwargs):
return f"Accounts/{stream_slice['account_sid']}/Addresses/{stream_slice['sid']}/DependentPhoneNumbers.json"

def parent_record_to_stream_slice(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
return {"sid": record["sid"], "account_sid": record["account_sid"]}
return StreamSlice(partition={"sid": record["sid"], "account_sid": record["account_sid"]}, cursor_slice={})


class Applications(TwilioNestedStream):
Expand Down Expand Up @@ -429,7 +443,7 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs):
return f"Flows/{ stream_slice['flow_sid'] }/Executions"

def parent_record_to_stream_slice(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
return {"flow_sid": record["sid"]}
return StreamSlice(partition={"flow_sid": record["sid"]}, cursor_slice={})


class Step(TwilioNestedStream):
Expand All @@ -446,7 +460,7 @@ def path(self, stream_slice: Mapping[str, Any], **kwargs):
return f"Flows/{stream_slice['flow_sid']}/Executions/{stream_slice['execution_sid']}/Steps"

def parent_record_to_stream_slice(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
return {"flow_sid": record["flow_sid"], "execution_sid": record["sid"]}
return StreamSlice(partition={"flow_sid": record["flow_sid"], "execution_sid": record["sid"]}, cursor_slice={})


class OutgoingCallerIds(TwilioNestedStream):
Expand Down Expand Up @@ -502,7 +516,7 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs):
return f"Services/{ stream_slice['service_sid'] }/Roles"

def parent_record_to_stream_slice(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
return {"service_sid": record["sid"]}
return StreamSlice(partition={"service_sid": record["sid"]}, cursor_slice={})


class Transcriptions(TwilioNestedStream):
Expand Down Expand Up @@ -569,7 +583,7 @@ def path(self, stream_slice: Mapping[str, Any], **kwargs):
return f"Accounts/{stream_slice['account_sid']}/Usage/{self.path_name}.json"

def parent_record_to_stream_slice(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
return {"account_sid": record["sid"]}
return StreamSlice(partition={"account_sid": record["sid"], "date_created": record["date_created"]}, cursor_slice={})


class UsageRecords(IncrementalTwilioStream, UsageNestedStream):
Expand All @@ -581,10 +595,15 @@ class UsageRecords(IncrementalTwilioStream, UsageNestedStream):
cursor_field = "start_date"
time_filter_template = "YYYY-MM-DD"
slice_granularity = pendulum.duration(days=1)
path_name = "Records"
primary_key = [["account_sid"], ["category"]]
path_name = "Records/Daily"
primary_key = [["account_sid"], ["category"], ["start_date"], ["end_date"]]
changeable_fields = ["as_of"]

def _min_datetime(self, partition: Mapping[str, Any]) -> DateTime:
cursor_value = pendulum.parse(self._get_partition_state(partition).get(self.cursor_field, self._start_date))

return max(cursor_value, pendulum.parse(partition.get("date_created", self._start_date), strict=False))


class UsageTriggers(UsageNestedStream):
"""https://www.twilio.com/docs/usage/api/usage-trigger#read-multiple-usagetrigger-resources"""
Expand All @@ -593,6 +612,11 @@ class UsageTriggers(UsageNestedStream):
subresource_uri_key = "triggers"
path_name = "Triggers"

def _min_datetime(self, partition: Mapping[str, Any]) -> DateTime:
cursor_value = pendulum.parse(self._get_partition_state(partition).get(self.cursor_field, self._start_date))

return max(cursor_value, pendulum.parse(partition.get("date_created", self._start_date), strict=False))


class Alerts(IncrementalTwilioStream):
"""https://www.twilio.com/docs/usage/monitor-alert#read-multiple-alert-resources"""
Expand Down Expand Up @@ -627,7 +651,7 @@ def path(self, stream_slice: Mapping[str, Any], **kwargs):
return f"Conversations/{stream_slice['conversation_sid']}/Participants"

def parent_record_to_stream_slice(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
return {"conversation_sid": record["sid"]}
return StreamSlice(partition={"conversation_sid": record["sid"]}, cursor_slice={})


class ConversationMessages(TwilioNestedStream):
Expand All @@ -642,7 +666,7 @@ def path(self, stream_slice: Mapping[str, Any], **kwargs):
return f"Conversations/{stream_slice['conversation_sid']}/Messages"

def parent_record_to_stream_slice(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
return {"conversation_sid": record["sid"]}
return StreamSlice(partition={"conversation_sid": record["sid"]}, cursor_slice={})


class Users(TwilioStream):
Expand All @@ -667,4 +691,4 @@ def path(self, stream_slice: Mapping[str, Any], **kwargs):
return f"Users/{stream_slice['user_sid']}/Conversations"

def parent_record_to_stream_slice(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
return {"user_sid": record["sid"]}
return StreamSlice(partition={"user_sid": record["sid"]}, cursor_slice={})
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pendulum
import pytest
import requests
from airbyte_cdk.sources.declarative.types import StreamSlice
from airbyte_cdk.sources.streams.http import HttpStream
from freezegun import freeze_time
from source_twilio.auth import HttpBasicAuthenticator
Expand Down Expand Up @@ -168,7 +169,7 @@ class TestIncrementalTwilioStream:
[
(
Calls,
{"EndTime>": "2022-01-01", "EndTime<": "2022-01-02"},
StreamSlice(partition={}, cursor_slice={"EndTime>": "2022-01-01", "EndTime<": "2022-01-02"}),
{"Page": "2", "PageSize": "1000", "PageToken": "PAAD42931b949c0dedce94b2f93847fdcf95"},
{
"EndTime>": "2022-01-01",
Expand All @@ -194,7 +195,7 @@ def test_request_params(self, stream_cls, stream_slice, next_page_token, expecte
def test_read_records(self, stream_cls, record, expected):
stream = stream_cls(**self.CONFIG)
with patch.object(HttpStream, "read_records", return_value=record):
result = stream.read_records(sync_mode=None)
result = stream.read_records(sync_mode=None, stream_slice=StreamSlice(partition={}, cursor_slice={}))
assert list(result) == expected

@pytest.mark.parametrize(
Expand Down Expand Up @@ -230,17 +231,42 @@ def test_stream_slices(self, mocker, stream_cls, parent_cls_records, extra_slice
(
(
Messages,
{"date_sent": "2022-11-13 23:39:00"},
{
"states": [
{
"partition": {"key": "value"},
"cursor": {"date_sent": "2022-11-13 23:39:00"},
}
]
},
[
{"DateSent>": "2022-11-13 23:39:00Z", "DateSent<": "2022-11-14 23:39:00Z"},
{"DateSent>": "2022-11-14 23:39:00Z", "DateSent<": "2022-11-15 23:39:00Z"},
{"DateSent>": "2022-11-15 23:39:00Z", "DateSent<": "2022-11-16 12:03:11Z"},
],
),
(UsageRecords, {"start_date": "2021-11-16 00:00:00"}, [{"StartDate": "2021-11-16", "EndDate": "2022-11-16"}]),
(
UsageRecords,
{
"states": [
{
"partition": {"key": "value"},
"cursor": {"start_date": "2021-11-16 00:00:00"},
}
]
},
[{"StartDate": "2021-11-16", "EndDate": "2022-11-16"}],
),
(
Recordings,
{"date_created": "2021-11-16 00:00:00"},
{
"states": [
{
"partition": {"key": "value"},
"cursor": {"date_created": "2021-11-16 00:00:00"},
}
]
},
[
{"DateCreated>": "2021-11-16 00:00:00Z", "DateCreated<": "2022-11-16 00:00:00Z"},
{"DateCreated>": "2022-11-16 00:00:00Z", "DateCreated<": "2022-11-16 12:03:11Z"},
Expand All @@ -251,7 +277,7 @@ def test_stream_slices(self, mocker, stream_cls, parent_cls_records, extra_slice
def test_generate_dt_ranges(self, stream_cls, state, expected_dt_ranges):
stream = stream_cls(authenticator=TEST_CONFIG.get("authenticator"), start_date="2000-01-01 00:00:00")
stream.state = state
dt_ranges = list(stream.generate_date_ranges())
dt_ranges = list(stream.generate_date_ranges({"key": "value"}))
assert dt_ranges == expected_dt_ranges


Expand Down Expand Up @@ -279,13 +305,13 @@ def test_media_exist_validation(self, stream_cls, expected):
Addresses,
Accounts,
[{"subresource_uris": {"addresses": "123"}}],
[{"subresource_uri": "123"}],
[StreamSlice(partition={"subresource_uri": "123"}, cursor_slice={})],
),
(
DependentPhoneNumbers,
Addresses,
[{"subresource_uris": {"addresses": "123"}, "sid": "123", "account_sid": "456"}],
[{"sid": "123", "account_sid": "456"}],
[StreamSlice(partition={"sid": "123", "account_sid": "456"}, cursor_slice={})],
),
],
)
Expand Down Expand Up @@ -319,8 +345,8 @@ def test_path_name(self, stream_cls, expected):
(
UsageTriggers,
Accounts,
[{"sid": "234", "account_sid": "678"}],
[{"account_sid": "234"}],
[{"sid": "234", "account_sid": "678", "date_created": "2022-11-16 00:00:00"}],
[StreamSlice(partition={"account_sid": "234", "date_created": "2022-11-16 00:00:00"}, cursor_slice={})],
),
],
)
Expand Down
Loading