Skip to content

Commit

Permalink
[DB sources] : Add back Debezium heartbeat timeout error (#42055)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Jul 17, 2024
1 parent 1c06673 commit 27d1437
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 94 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ corresponds to that version.
=======
| Version | Date | Pull Request | Subject |
|:------------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.41.7 | 2024-07-17 | [\#42055](https://github.com/airbytehq/airbyte/pull/42055) | Add debezium heartbeat timeout back to shutdown debezium. |
| 0.41.6 | 2024-07-17 | [\#41996](https://github.com/airbytehq/airbyte/pull/41996) | Fix java interop compilation issue in Config/TransientErrorException. |
| 0.41.5 | 2024-07-16 | [\#42011] (https://github.com/airbytehq/airbyte/pull/42011) | Async consumer accepts null default namespace |
| 0.41.4 | 2024-07-15 | [\#41959](https://github.com/airbytehq/airbyte/pull/41959) | Allow setting `internal_message` in Config/TransientErrorException. Destinations: shorten error message for INCOMPLETE stream status. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.41.6
version=0.41.7
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,8 @@ class DebeziumRecordIterator<T>(
private fun heartbeatPosNotChanging(): Boolean {
if (this.tsLastHeartbeat == null) {
return false
} else if (!isTest() && receivedFirstRecord) {
// Closing debezium due to heartbeat position not changing only exists as an escape
// hatch
// for testing setups. In production, we rely on the platform heartbeats to kill the
// sync
// ONLY if we haven't received a record from Debezium. If a record has not been received
// from Debezium and the heartbeat isn't changing, the sync should be shut down due to
// heartbeat position not changing.
return false
}

val timeElapsedSinceLastHeartbeatTs =
Duration.between(this.tsLastHeartbeat, LocalDateTime.now())
return timeElapsedSinceLastHeartbeatTs.compareTo(firstRecordWaitTime) > 0
Expand All @@ -288,10 +280,6 @@ class DebeziumRecordIterator<T>(
}
}

private fun isTest(): Boolean {
return config.has("is_test") && config["is_test"].asBoolean()
}

/**
* [DebeziumRecordIterator.heartbeatEventSourceField] acts as a cache so that we avoid using
* reflection to setAccessible for each event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.41.6'
cdkVersionRequired = '0.41.7'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.5.0
dockerImageTag: 3.5.1
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Loading

0 comments on commit 27d1437

Please sign in to comment.