Skip to content

Commit

Permalink
Save twilio state after the whole records batch has been processed
Browse files Browse the repository at this point in the history
  • Loading branch information
mkrawc committed Dec 11, 2024
1 parent 7995b47 commit 7c723b1
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,13 @@ def read_records(
) -> Iterable[Mapping[str, Any]]:
if stream_slice is None:
stream_slice = StreamSlice(partition={}, cursor_slice={})
max_cursor_value = self._get_partition_state(stream_slice.partition).get(self.cursor_field, self._start_date)
for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state):
record[self.cursor_field] = pendulum.parse(record[self.cursor_field], strict=False).to_iso8601_string()
if record[self.cursor_field] >= self._get_partition_state(stream_slice.partition).get(self.cursor_field, self._start_date):
self._state = self._update_partition_state(stream_slice.partition, {self.cursor_field: record[self.cursor_field]})
if record[self.cursor_field] >= max_cursor_value:
max_cursor_value = record[self.cursor_field]
yield record
self._state = self._update_partition_state(stream_slice.partition, {self.cursor_field: max_cursor_value})

def _update_partition_state(self, partition: Mapping[str, Any], cursor: Mapping[str, Any]) -> Mapping[str, Any]:
states = copy.deepcopy(self._state.get("states", []))
Expand Down

0 comments on commit 7c723b1

Please sign in to comment.