Skip to content

Commit

Permalink
Removed feature flag to enable pk sync
Browse files Browse the repository at this point in the history
Renamed and removed pk sync only jdbc tests
bumped version in metadata and dockerfile
update mysql change logs
  • Loading branch information
nguyenaiden committed Sep 21, 2023
1 parent c07c003 commit 89ba1b9
Show file tree
Hide file tree
Showing 11 changed files with 807 additions and 1,370 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=3.0.9
LABEL io.airbyte.version=3.1.0

LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.0.9
dockerImageTag: 3.1.0
dockerRepository: airbyte/source-mysql-strict-encrypt
githubIssueLabel: source-mysql
icon: mysql.svg
Expand Down

This file was deleted.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=3.0.9
LABEL io.airbyte.version=3.1.0

LABEL io.airbyte.name=airbyte/source-mysql
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.0.9
dockerImageTag: 3.1.0
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,41 +355,39 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
return MySqlInitialReadUtil.getCdcReadIterators(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString());
} else {
if (isAnyStreamIncrementalSyncMode(catalog)) {
if (mySqlFeatureFlags.isStandardInitialSyncViaPkEnabled()) {
LOGGER.info("Syncing via Primary Key");
final MySqlCursorBasedStateManager cursorBasedStateManager = new MySqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog);
final InitialLoadStreams initialLoadStreams = streamsForInitialPrimaryKeyLoad(cursorBasedStateManager, catalog);
final Map<AirbyteStreamNameNamespacePair, CursorBasedStatus> pairToCursorBasedStatus =
getCursorBasedSyncStatusForStreams(database, initialLoadStreams.streamsForInitialLoad(), stateManager, quoteString);
final CursorBasedStreams cursorBasedStreams =
new CursorBasedStreams(MySqlInitialReadUtil.identifyStreamsForCursorBased(catalog, initialLoadStreams.streamsForInitialLoad()),
pairToCursorBasedStatus);

logStreamSyncStatus(initialLoadStreams.streamsForInitialLoad(), "Primary Key");
logStreamSyncStatus(cursorBasedStreams.streamsForCursorBased(), "Cursor");

final MySqlInitialLoadStreamStateManager mySqlInitialLoadStreamStateManager =
new MySqlInitialLoadStreamStateManager(catalog, initialLoadStreams,
initPairToPrimaryKeyInfoMap(database, initialLoadStreams, tableNameToTable, quoteString));
final MySqlInitialLoadHandler initialLoadHandler =
new MySqlInitialLoadHandler(sourceConfig, database, new MySqlSourceOperations(), getQuoteString(), mySqlInitialLoadStreamStateManager,
namespacePair -> Jsons.jsonNode(pairToCursorBasedStatus.get(convertNameNamespacePairFromV0(namespacePair))),
getTableSizeInfoForStreams(database, catalog.getStreams(), getQuoteString()));
final List<AutoCloseableIterator<AirbyteMessage>> initialLoadIterator = new ArrayList<>(initialLoadHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(initialLoadStreams.streamsForInitialLoad()),
tableNameToTable,
emittedAt));

// Build Cursor based iterator
final List<AutoCloseableIterator<AirbyteMessage>> cursorBasedIterator =
new ArrayList<>(super.getIncrementalIterators(database,
new ConfiguredAirbyteCatalog().withStreams(
cursorBasedStreams.streamsForCursorBased()),
tableNameToTable,
cursorBasedStateManager, emittedAt));

return Stream.of(initialLoadIterator, cursorBasedIterator).flatMap(Collection::stream).collect(Collectors.toList());
}
LOGGER.info("Syncing via Primary Key");
final MySqlCursorBasedStateManager cursorBasedStateManager = new MySqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog);
final InitialLoadStreams initialLoadStreams = streamsForInitialPrimaryKeyLoad(cursorBasedStateManager, catalog);
final Map<AirbyteStreamNameNamespacePair, CursorBasedStatus> pairToCursorBasedStatus =
getCursorBasedSyncStatusForStreams(database, initialLoadStreams.streamsForInitialLoad(), stateManager, quoteString);
final CursorBasedStreams cursorBasedStreams =
new CursorBasedStreams(MySqlInitialReadUtil.identifyStreamsForCursorBased(catalog, initialLoadStreams.streamsForInitialLoad()),
pairToCursorBasedStatus);

logStreamSyncStatus(initialLoadStreams.streamsForInitialLoad(), "Primary Key");
logStreamSyncStatus(cursorBasedStreams.streamsForCursorBased(), "Cursor");

final MySqlInitialLoadStreamStateManager mySqlInitialLoadStreamStateManager =
new MySqlInitialLoadStreamStateManager(catalog, initialLoadStreams,
initPairToPrimaryKeyInfoMap(database, initialLoadStreams, tableNameToTable, quoteString));
final MySqlInitialLoadHandler initialLoadHandler =
new MySqlInitialLoadHandler(sourceConfig, database, new MySqlSourceOperations(), getQuoteString(), mySqlInitialLoadStreamStateManager,
namespacePair -> Jsons.jsonNode(pairToCursorBasedStatus.get(convertNameNamespacePairFromV0(namespacePair))),
getTableSizeInfoForStreams(database, catalog.getStreams(), getQuoteString()));
final List<AutoCloseableIterator<AirbyteMessage>> initialLoadIterator = new ArrayList<>(initialLoadHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(initialLoadStreams.streamsForInitialLoad()),
tableNameToTable,
emittedAt));

// Build Cursor based iterator
final List<AutoCloseableIterator<AirbyteMessage>> cursorBasedIterator =
new ArrayList<>(super.getIncrementalIterators(database,
new ConfiguredAirbyteCatalog().withStreams(
cursorBasedStreams.streamsForCursorBased()),
tableNameToTable,
cursorBasedStateManager, emittedAt));

return Stream.of(initialLoadIterator, cursorBasedIterator).flatMap(Collection::stream).collect(Collectors.toList());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,12 @@
// Feature flags to gate new primary key load features.
public class MySqlFeatureFlags {

public static final String STANDARD_VIA_PK = "standard_via_pk";
private final JsonNode sourceConfig;

public MySqlFeatureFlags(final JsonNode sourceConfig) {
this.sourceConfig = sourceConfig;
}

public boolean isStandardInitialSyncViaPkEnabled() {
return getFlagValue(STANDARD_VIA_PK);
}


private boolean getFlagValue(final String flag) {
return sourceConfig.has(flag) && sourceConfig.get(flag).asBoolean();
}
Expand Down
Loading

0 comments on commit 89ba1b9

Please sign in to comment.