Skip to content

Commit

Permalink
working cursor based incremental
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich committed May 20, 2024
1 parent d7f42b9 commit 53658da
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -805,4 +805,26 @@ abstract class AbstractJdbcSource<Datatype>(
return result
}
}

override fun createReadIterator(
database: JdbcDatabase,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
table: TableInfo<CommonField<Datatype>>,
stateManager: StateManager?,
emittedAt: Instant
): AutoCloseableIterator<AirbyteMessage> {
val iterator = super.createReadIterator(
database,
airbyteStream,
catalog,
table,
stateManager,
emittedAt
)
return when (airbyteStream.syncMode) {
SyncMode.INCREMENTAL -> augmentWithStreamStatus(airbyteStream, iterator)
else -> iterator
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ protected constructor(driverClassName: String) :
* @param emittedAt Time when data was emitted from the Source database
* @return
*/
private fun createReadIterator(
protected open fun createReadIterator(
database: Database,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
tableNameToTable,
cursorBasedStateManager, emittedAt));


return Stream.of(initialLoadIterator, cursorBasedIterator).flatMap(Collection::stream).collect(Collectors.toList());
}
}
Expand Down

0 comments on commit 53658da

Please sign in to comment.