Skip to content

Commit

Permalink
[source-postgres/mysql/mssql/mongodb-v2] Use latest CDK to adopt late…
Browse files Browse the repository at this point in the history
…st apache sshd mina to handle tcpkeepalive requests (#45639)
  • Loading branch information
theyueli authored Sep 18, 2024
1 parent 398a1bb commit 2e62443
Show file tree
Hide file tree
Showing 18 changed files with 334 additions and 330 deletions.
598 changes: 299 additions & 299 deletions airbyte-cdk/java/airbyte-cdk/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.17'
cdkVersionRequired = '0.45.1'
features = ['db-sources', 'datastore-mongo']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.5.9
dockerImageTag: 1.5.10
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ public boolean isHeartbeatSupported() {
public boolean reachedTargetPosition(final ChangeEventWithMetadata changeEventWithMetadata) {
if (changeEventWithMetadata.isSnapshotEvent()) {
return false;
} else if (SnapshotMetadata.LAST == changeEventWithMetadata.snapshotMetadata()) {
} else if (SnapshotMetadata.LAST == changeEventWithMetadata.getSnapshotMetadata()) {
LOGGER.info("Signalling close because Snapshot is complete");
return true;
} else {
final BsonTimestamp eventResumeTokenTimestamp =
MongoDbResumeTokenHelper.extractTimestampFromEvent(changeEventWithMetadata.eventValueAsJson());
MongoDbResumeTokenHelper.extractTimestampFromEvent(changeEventWithMetadata.getEventValueAsJson());
final boolean isEventResumeTokenAfter = resumeTokenTimestamp.compareTo(eventResumeTokenTimestamp) <= 0;
if (isEventResumeTokenAfter) {
LOGGER.info("Signalling close because record's event timestamp {} is after target event timestamp {}.",
Expand Down Expand Up @@ -84,7 +84,7 @@ public boolean isEventAheadOffset(final Map<String, String> offset, final Change
return false;
}

return MongoDbResumeTokenHelper.extractTimestampFromEvent(event.eventValueAsJson()).getValue() >= ResumeTokens
return MongoDbResumeTokenHelper.extractTimestampFromEvent(event.getEventValueAsJson()).getValue() >= ResumeTokens
.getTimestamp(ResumeTokens.fromData(getResumeToken(offset))).getValue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public MongoDbDebeziumEventConverter(

@Override
public AirbyteMessage toAirbyteMessage(ChangeEventWithMetadata event) {
final JsonNode debeziumEventKey = event.eventKeyAsJson();
final JsonNode debeziumEvent = event.eventValueAsJson();
final JsonNode debeziumEventKey = event.getEventKeyAsJson();
final JsonNode debeziumEvent = event.getEventValueAsJson();
final JsonNode before = debeziumEvent.get(DebeziumEventConverter.BEFORE_EVENT);
final JsonNode after = debeziumEvent.get(DebeziumEventConverter.AFTER_EVENT);
final JsonNode source = debeziumEvent.get(DebeziumEventConverter.SOURCE_EVENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,12 +533,12 @@ void testReachedTargetPosition() {
assertFalse(targetPosition.reachedTargetPosition(changeEventWithMetadata));

when(changeEventWithMetadata.isSnapshotEvent()).thenReturn(false);
when(changeEventWithMetadata.snapshotMetadata()).thenReturn(SnapshotMetadata.LAST);
when(changeEventWithMetadata.getSnapshotMetadata()).thenReturn(SnapshotMetadata.LAST);

assertTrue(targetPosition.reachedTargetPosition(changeEventWithMetadata));

when(changeEventWithMetadata.snapshotMetadata()).thenReturn(SnapshotMetadata.FIRST);
when(changeEventWithMetadata.eventValueAsJson()).thenReturn(Jsons.jsonNode(
when(changeEventWithMetadata.getSnapshotMetadata()).thenReturn(SnapshotMetadata.FIRST);
when(changeEventWithMetadata.getEventValueAsJson()).thenReturn(Jsons.jsonNode(
Map.of(MongoDbDebeziumConstants.ChangeEvent.SOURCE,
Map.of(MongoDbDebeziumConstants.ChangeEvent.SOURCE_TIMESTAMP_MS, eventTimestamp,
MongoDbDebeziumConstants.ChangeEvent.SOURCE_ORDER, order))));
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.22'
cdkVersionRequired = '0.45.1'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.13
dockerImageTag: 4.1.14
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.17'
cdkVersionRequired = '0.45.1'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.7.2
dockerImageTag: 3.7.3
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ public static MySqlCdcTargetPosition targetPosition(final JdbcDatabase database)
public boolean reachedTargetPosition(final ChangeEventWithMetadata changeEventWithMetadata) {
if (changeEventWithMetadata.isSnapshotEvent()) {
return false;
} else if (SnapshotMetadata.LAST == changeEventWithMetadata.snapshotMetadata()) {
} else if (SnapshotMetadata.LAST == changeEventWithMetadata.getSnapshotMetadata()) {
LOGGER.info("Signalling close because Snapshot is complete");
return true;
} else {
final String eventFileName = changeEventWithMetadata.eventValueAsJson().get("source").get("file").asText();
final long eventPosition = changeEventWithMetadata.eventValueAsJson().get("source").get("pos").asLong();
final String eventFileName = changeEventWithMetadata.getEventValueAsJson().get("source").get("file").asText();
final long eventPosition = changeEventWithMetadata.getEventValueAsJson().get("source").get("pos").asLong();
final boolean isEventPositionAfter =
eventFileName.compareTo(targetPosition.fileName) > 0 || (eventFileName.compareTo(
targetPosition.fileName) == 0 && eventPosition >= targetPosition.position);
Expand Down Expand Up @@ -110,8 +110,8 @@ public boolean isEventAheadOffset(final Map<String, String> offset, final Change
return false;
}

final String eventFileName = event.eventValueAsJson().get("source").get("file").asText();
final long eventPosition = event.eventValueAsJson().get("source").get("pos").asLong();
final String eventFileName = event.getEventValueAsJson().get("source").get("file").asText();
final long eventPosition = event.getEventValueAsJson().get("source").get("pos").asLong();

final JsonNode offsetJson = Jsons.deserialize((String) offset.values().toArray()[0]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.17'
cdkVersionRequired = '0.45.1'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.6.18
dockerImageTag: 3.6.19
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ public static PostgresCdcTargetPosition targetPosition(final JdbcDatabase databa
public boolean reachedTargetPosition(final ChangeEventWithMetadata changeEventWithMetadata) {
if (changeEventWithMetadata.isSnapshotEvent()) {
return false;
} else if (SnapshotMetadata.LAST == changeEventWithMetadata.snapshotMetadata()) {
} else if (SnapshotMetadata.LAST == changeEventWithMetadata.getSnapshotMetadata()) {
LOGGER.info("Signalling close because Snapshot is complete");
return true;
} else {
final PgLsn eventLsn = extractLsn(changeEventWithMetadata.eventValueAsJson());
final PgLsn eventLsn = extractLsn(changeEventWithMetadata.getEventValueAsJson());
final boolean isEventLSNAfter = targetLsn.compareTo(eventLsn) <= 0;
if (isEventLSNAfter) {
LOGGER.info("Signalling close because record's LSN : " + eventLsn + " is after target LSN : " + targetLsn);
Expand Down Expand Up @@ -135,7 +135,7 @@ public boolean isEventAheadOffset(final Map<String, String> offset, final Change
See https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events for the full event structure.
@formatter:on
*/
final JsonNode lsnSequenceNode = event.eventValueAsJson().get("source").get("sequence");
final JsonNode lsnSequenceNode = event.getEventValueAsJson().get("source").get("sequence");
List<String> lsnSequence = objectMapper.readValue(lsnSequenceNode.asText(), listType);
// The sequence field is a pair of [lsn_commit, lsn_processed]. We want to make sure
// lsn_commit(event) is compared against lsn_commit(state_offset). For the event, either of the lsn
Expand Down
13 changes: 7 additions & 6 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,17 +198,18 @@ For more information regarding configuration parameters, please see [MongoDb Doc
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :------------------------------------------------------- |:----------------------------------------------------------------------------------------------------------|
| 1.5.9 | 2024-08-28 | [42927](https://github.com/airbytehq/airbyte/pull/42927) | Support binary subtype. |
| 1.5.8 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.5.10 | 2024-09-17 | [45639](https://github.com/airbytehq/airbyte/pull/45639) | Adopt latest CDK to use the latest apache sshd mina to handle tcpkeepalive requests. |
| 1.5.9 | 2024-08-28 | [42927](https://github.com/airbytehq/airbyte/pull/42927) | Support binary subtype. |
| 1.5.8 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
| 1.5.7 | 2024-08-27 | [44846](https://github.com/airbytehq/airbyte/pull/44846) | DBZ filters in related streams only. |
| 1.5.6 | 2024-08-27 | [44839](https://github.com/airbytehq/airbyte/pull/44839) | DBZ filters in related streams only. |
| 1.5.5 | 2024-08-26 | [44779](https://github.com/airbytehq/airbyte/pull/44779) | Revert permission check on oplog.rs. |
| 1.5.4 | 2024-08-20 | [44490](https://github.com/airbytehq/airbyte/pull/44490) | Add read permission check on oplog.rs collection used by CDC. |
| 1.5.3 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. |
| 1.5.2 | 2024-08-06 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. |
| 1.5.3 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. |
| 1.5.2 | 2024-08-06 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. |
| 1.5.1 | 2024-08-01 | [42549](https://github.com/airbytehq/airbyte/pull/42549) | Centered the connector icon. |
| 1.5.0 | 2024-07-26 | [42561](https://github.com/airbytehq/airbyte/pull/42561) | Implement WASS algorithm. |
| 1.5.0 | 2024-07-26 | [42561](https://github.com/airbytehq/airbyte/pull/42561) | Implement WASS algorithm. |
| 1.4.3 | 2024-07-22 | [39145](https://github.com/airbytehq/airbyte/pull/39145) | Warn (vs fail) on different \_id types in collection. |
| 1.4.2 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| 1.4.1 | 2024-06-11 | [39530](https://github.com/airbytehq/airbyte/pull/39530) | Adopt new CDK. |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.14 | 2024-09-17 | [45639](https://github.com/airbytehq/airbyte/pull/45639) | Adopt latest CDK to use the latest apache sshd mina to handle tcpkeepalive requests. |
| 4.1.13 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. |
| 4.1.12 | 2024-09-10 | [45368](https://github.com/airbytehq/airbyte/pull/45368) | Remove excessive debezium logging. |
| 4.1.11 | 2024-09-04 | [45142](https://github.com/airbytehq/airbyte/pull/45142) | Fix incorrect datetimeoffset format in cursor state. |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.7.3 | 2024-09-17 | [45639](https://github.com/airbytehq/airbyte/pull/45639) | Adopt latest CDK to use the latest apache sshd mina to handle tcpkeepalive requests. |
| 3.7.2 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. |
| 3.7.1 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
| 3.7.0 | 2024-08-13 | [44013](https://github.com/airbytehq/airbyte/pull/44013) | Upgrading to Debezium 2.7.1.Final |
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.6.18 | 2024-08-28 | [44878](https://github.com/airbytehq/airbyte/pull/44878) | Enable tcpKeepAlive for jdbc connection. |
| 3.6.19 | 2024-09-17 | [45639](https://github.com/airbytehq/airbyte/pull/45639) | Adopt latest CDK to use the latest apache sshd mina to handle tcpkeepalive requests. |
| 3.6.18 | 2024-08-28 | [44878](https://github.com/airbytehq/airbyte/pull/44878) | Enable tcpKeepAlive for jdbc connection. |
| 3.6.17 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
| 3.6.16 | 2024-08-15 | [44119](https://github.com/airbytehq/airbyte/pull/44119) | Fix incorrect final state on initial read in CDC mode. |
| 3.6.15 | 2024-08-12 | [43945](https://github.com/airbytehq/airbyte/pull/43945) | Add missing replication slot config error. |
Expand Down

0 comments on commit 2e62443

Please sign in to comment.