diff --git a/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py b/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py index 47a6f3971db8..939bd1eac42b 100644 --- a/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py +++ b/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py @@ -223,11 +223,13 @@ def read_records( ) -> Iterable[Mapping[str, Any]]: if stream_slice is None: stream_slice = StreamSlice(partition={}, cursor_slice={}) + max_cursor_value = self._get_partition_state(stream_slice.partition).get(self.cursor_field, self._start_date) for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state): record[self.cursor_field] = pendulum.parse(record[self.cursor_field], strict=False).to_iso8601_string() - if record[self.cursor_field] >= self._get_partition_state(stream_slice.partition).get(self.cursor_field, self._start_date): - self._state = self._update_partition_state(stream_slice.partition, {self.cursor_field: record[self.cursor_field]}) + if record[self.cursor_field] >= max_cursor_value: + max_cursor_value = record[self.cursor_field] yield record + self._state = self._update_partition_state(stream_slice.partition, {self.cursor_field: max_cursor_value}) def _update_partition_state(self, partition: Mapping[str, Any], cursor: Mapping[str, Any]) -> Mapping[str, Any]: states = copy.deepcopy(self._state.get("states", []))