Skip to content

Commit

Permalink
Fix checkpointing for declarative streams
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi committed Dec 17, 2024
1 parent ceebfda commit d997cf0
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,17 @@ def read( # type: ignore # ignoring typing for ConnectorStateManager because o
record_counter += 1

checkpoint_interval = self.state_checkpoint_interval
checkpoint = checkpoint_reader.get_checkpoint()
if (
should_checkpoint
and checkpoint_interval
and record_counter % checkpoint_interval == 0
and checkpoint is not None
):
airbyte_state_message = self._checkpoint_state(
checkpoint, state_manager=state_manager
)
yield airbyte_state_message
checkpoint = checkpoint_reader.get_checkpoint()
if checkpoint:
airbyte_state_message = self._checkpoint_state(
checkpoint, state_manager=state_manager
)
yield airbyte_state_message

if internal_config.is_limit_reached(record_counter):
break
Expand Down

0 comments on commit d997cf0

Please sign in to comment.