From 4744bbff9b9d42676d9ef66d252470a42bad3c57 Mon Sep 17 00:00:00 2001 From: Jatin Yadav Date: Mon, 19 Aug 2024 14:12:32 +0530 Subject: [PATCH 1/3] fix: DPA-2740 Oracle data type issue for integer in custom SQL --- .../source/jdbc/AbstractJdbcSource.java | 224 +++++++++++------- 1 file changed, 137 insertions(+), 87 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.java index f54569e9ff4a..64792604efbf 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.java @@ -91,7 +91,8 @@ * relational DB source which can be accessed via JDBC driver. If you are implementing a connector * for a relational DB which has a JDBC driver, make an effort to use this class. */ -public abstract class AbstractJdbcSource extends AbstractDbSource implements Source { +public abstract class AbstractJdbcSource extends + AbstractDbSource implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcSource.class); @@ -102,8 +103,8 @@ public abstract class AbstractJdbcSource extends AbstractDbSource dataSources = new ArrayList<>(); public AbstractJdbcSource(final String driverClass, - final Supplier streamingQueryConfigProvider, - final JdbcCompatibleSourceOperations sourceOperations) { + final Supplier streamingQueryConfigProvider, + final JdbcCompatibleSourceOperations sourceOperations) { super(driverClass); this.streamingQueryConfigProvider = streamingQueryConfigProvider; this.sourceOperations = sourceOperations; @@ -125,14 +126,15 @@ protected AutoCloseableIterator queryTableFullRefresh(final JdbcDataba if (syncMode.equals(SyncMode.INCREMENTAL) && getStateEmissionFrequency() > 0) { final String quotedCursorField = enquoteIdentifier(cursorField.get(), getQuoteString()); String query = ""; - if (!whereClause.equals("")) { + if (!whereClause.isEmpty()) { query = String.format("SELECT %s FROM %s where %s ORDER BY %s ASC", enquoteIdentifierList(columnNames, getQuoteString()), getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()), whereClause, quotedCursorField); - } else if (customSQL != null && !customSQL.equals("")) { - query = String.format("SELECT * FROM (%s) sc ORDER BY %s ASC", customSQL, quotedCursorField); + } else if (customSQL != null && !customSQL.isEmpty()) { + query = String.format("SELECT * FROM (%s) sc ORDER BY %s ASC", customSQL, + quotedCursorField); } else { query = String.format("SELECT %s FROM %s ORDER BY %s ASC", enquoteIdentifierList(columnNames, getQuoteString()), @@ -169,10 +171,12 @@ protected AutoCloseableIterator queryTableFullRefresh(final JdbcDataba * @return list of consumers that run queries for the check command. */ @Trace(operationName = CHECK_TRACE_OPERATION_NAME) - protected List> getCheckOperations(final JsonNode config) throws Exception { + protected List> getCheckOperations(final JsonNode config) + throws Exception { return ImmutableList.of(database -> { LOGGER.info("Attempting to get metadata from the database to see if we can connect."); - database.bufferedResultSetQuery(connection -> connection.getMetaData().getCatalogs(), sourceOperations::rowToJson); + database.bufferedResultSetQuery(connection -> connection.getMetaData().getCatalogs(), + sourceOperations::rowToJson); }); } @@ -182,36 +186,42 @@ protected List> getCheckOperations(fina * @return a map by StreamName to associated list of primary keys */ @VisibleForTesting - public static Map> aggregatePrimateKeys(final List entries) { + public static Map> aggregatePrimateKeys( + final List entries) { final Map> result = new HashMap<>(); - entries.stream().sorted(Comparator.comparingInt(PrimaryKeyAttributesFromDb::keySequence)).forEach(entry -> { - if (!result.containsKey(entry.streamName())) { - result.put(entry.streamName(), new ArrayList<>()); - } - result.get(entry.streamName()).add(entry.primaryKey()); - }); + entries.stream().sorted(Comparator.comparingInt(PrimaryKeyAttributesFromDb::keySequence)) + .forEach(entry -> { + if (!result.containsKey(entry.streamName())) { + result.put(entry.streamName(), new ArrayList<>()); + } + result.get(entry.streamName()).add(entry.primaryKey()); + }); return result; } private String getCatalog(final SqlDatabase database) { - return (database.getSourceConfig().has(JdbcUtils.DATABASE_KEY) ? database.getSourceConfig().get(JdbcUtils.DATABASE_KEY).asText() : null); + return (database.getSourceConfig().has(JdbcUtils.DATABASE_KEY) ? database.getSourceConfig() + .get(JdbcUtils.DATABASE_KEY).asText() : null); } @Override - protected List>> discoverInternal(final JdbcDatabase database, final String schema) throws Exception { + protected List>> discoverInternal(final JdbcDatabase database, + final String schema) throws Exception { final Set internalSchemas = new HashSet<>(getExcludedInternalNameSpaces()); LOGGER.info("Internal schemas to exclude: {}", internalSchemas); - final Set tablesWithSelectGrantPrivilege = getPrivilegesTableForCurrentUser(database, schema); + final Set tablesWithSelectGrantPrivilege = getPrivilegesTableForCurrentUser( + database, schema); return database.bufferedResultSetQuery( - // retrieve column metadata from the database - connection -> connection.getMetaData().getColumns(getCatalog(database), schema, null, null), - // store essential column metadata to a Json object from the result set about each column - this::getColumnMetadata) + // retrieve column metadata from the database + connection -> connection.getMetaData().getColumns(getCatalog(database), schema, null, null), + // store essential column metadata to a Json object from the result set about each column + this::getColumnMetadata) .stream() .filter(excludeNotAccessibleTables(internalSchemas, tablesWithSelectGrantPrivilege)) // group by schema and table name to handle the case where a table with the same name exists in // multiple schemas. - .collect(Collectors.groupingBy(t -> ImmutablePair.of(t.get(INTERNAL_SCHEMA_NAME).asText(), t.get(INTERNAL_TABLE_NAME).asText()))) + .collect(Collectors.groupingBy(t -> ImmutablePair.of(t.get(INTERNAL_SCHEMA_NAME).asText(), + t.get(INTERNAL_TABLE_NAME).asText()))) .values() .stream() .map(fields -> TableInfo.>builder() @@ -229,7 +239,8 @@ protected List>> discoverInternal(final JdbcData f.get(INTERNAL_COLUMN_SIZE).asInt(), f.get(INTERNAL_IS_NULLABLE).asBoolean(), jsonType); - return new CommonField(f.get(INTERNAL_COLUMN_NAME).asText(), datatype) {}; + return new CommonField(f.get(INTERNAL_COLUMN_NAME).asText(), datatype) { + }; }) .collect(Collectors.toList())) .cursorFields(extractCursorFields(fields)) @@ -245,7 +256,7 @@ private List extractCursorFields(final List fields) { } protected Predicate excludeNotAccessibleTables(final Set internalSchemas, - final Set tablesWithSelectGrantPrivilege) { + final Set tablesWithSelectGrantPrivilege) { return jsonNode -> { if (tablesWithSelectGrantPrivilege.isEmpty()) { return isNotInternalSchema(jsonNode, internalSchemas); @@ -253,26 +264,29 @@ protected Predicate excludeNotAccessibleTables(final Set inter return tablesWithSelectGrantPrivilege.stream() .anyMatch(e -> e.getSchemaName().equals(jsonNode.get(INTERNAL_SCHEMA_NAME).asText())) && tablesWithSelectGrantPrivilege.stream() - .anyMatch(e -> e.getTableName().equals(jsonNode.get(INTERNAL_TABLE_NAME).asText())) + .anyMatch(e -> e.getTableName().equals(jsonNode.get(INTERNAL_TABLE_NAME).asText())) && !internalSchemas.contains(jsonNode.get(INTERNAL_SCHEMA_NAME).asText()); }; } // needs to override isNotInternalSchema for connectors that override // getPrivilegesTableForCurrentUser() - protected boolean isNotInternalSchema(final JsonNode jsonNode, final Set internalSchemas) { + protected boolean isNotInternalSchema(final JsonNode jsonNode, + final Set internalSchemas) { return !internalSchemas.contains(jsonNode.get(INTERNAL_SCHEMA_NAME).asText()); } /** * @param resultSet Description of a column available in the table catalog. - * @return Essential information about a column to determine which table it belongs to and its type. + * @return Essential information about a column to determine which table it belongs to and its + * type. */ private JsonNode getColumnMetadata(final ResultSet resultSet) throws SQLException { final var fieldMap = ImmutableMap.builder() // we always want a namespace, if we cannot get a schema, use db name. .put(INTERNAL_SCHEMA_NAME, - resultSet.getObject(JDBC_COLUMN_SCHEMA_NAME) != null ? resultSet.getString(JDBC_COLUMN_SCHEMA_NAME) + resultSet.getObject(JDBC_COLUMN_SCHEMA_NAME) != null ? resultSet.getString( + JDBC_COLUMN_SCHEMA_NAME) : resultSet.getObject(JDBC_COLUMN_DATABASE_NAME)) .put(INTERNAL_TABLE_NAME, resultSet.getString(JDBC_COLUMN_TABLE_NAME)) .put(INTERNAL_COLUMN_NAME, resultSet.getString(JDBC_COLUMN_COLUMN_NAME)) @@ -306,40 +320,53 @@ public record PrimaryKeyAttributesFromDb(String streamName, @Override protected Map> discoverPrimaryKeys(final JdbcDatabase database, - final List>> tableInfos) { - LOGGER.info("Discover primary keys for tables: " + tableInfos.stream().map(TableInfo::getName).collect( - Collectors.toSet())); + final List>> tableInfos) { + LOGGER.info( + "Discover primary keys for tables: " + tableInfos.stream().map(TableInfo::getName).collect( + Collectors.toSet())); try { // Get all primary keys without specifying a table name - final Map> tablePrimaryKeys = aggregatePrimateKeys(database.bufferedResultSetQuery( - connection -> connection.getMetaData().getPrimaryKeys(getCatalog(database), null, null), - r -> { - final String schemaName = - r.getObject(JDBC_COLUMN_SCHEMA_NAME) != null ? r.getString(JDBC_COLUMN_SCHEMA_NAME) : r.getString(JDBC_COLUMN_DATABASE_NAME); - final String streamName = JdbcUtils.getFullyQualifiedTableName(schemaName, r.getString(JDBC_COLUMN_TABLE_NAME)); - final String primaryKey = r.getString(JDBC_COLUMN_COLUMN_NAME); - final int keySeq = r.getInt(KEY_SEQ); - return new PrimaryKeyAttributesFromDb(streamName, primaryKey, keySeq); - })); + final Map> tablePrimaryKeys = aggregatePrimateKeys( + database.bufferedResultSetQuery( + connection -> connection.getMetaData() + .getPrimaryKeys(getCatalog(database), null, null), + r -> { + final String schemaName = + r.getObject(JDBC_COLUMN_SCHEMA_NAME) != null ? r.getString( + JDBC_COLUMN_SCHEMA_NAME) : r.getString(JDBC_COLUMN_DATABASE_NAME); + final String streamName = JdbcUtils.getFullyQualifiedTableName(schemaName, + r.getString(JDBC_COLUMN_TABLE_NAME)); + final String primaryKey = r.getString(JDBC_COLUMN_COLUMN_NAME); + final int keySeq = r.getInt(KEY_SEQ); + return new PrimaryKeyAttributesFromDb(streamName, primaryKey, keySeq); + })); if (!tablePrimaryKeys.isEmpty()) { return tablePrimaryKeys; } } catch (final SQLException e) { - LOGGER.debug(String.format("Could not retrieve primary keys without a table name (%s), retrying", e)); + LOGGER.debug( + String.format("Could not retrieve primary keys without a table name (%s), retrying", e)); } // Get primary keys one table at a time return tableInfos.stream() .collect(Collectors.toMap( - tableInfo -> JdbcUtils.getFullyQualifiedTableName(tableInfo.getNameSpace(), tableInfo.getName()), + tableInfo -> JdbcUtils.getFullyQualifiedTableName(tableInfo.getNameSpace(), + tableInfo.getName()), tableInfo -> { - final String streamName = JdbcUtils.getFullyQualifiedTableName(tableInfo.getNameSpace(), tableInfo.getName()); + final String streamName = JdbcUtils.getFullyQualifiedTableName( + tableInfo.getNameSpace(), tableInfo.getName()); try { - final Map> primaryKeys = aggregatePrimateKeys(database.bufferedResultSetQuery( - connection -> connection.getMetaData().getPrimaryKeys(getCatalog(database), tableInfo.getNameSpace(), tableInfo.getName()), - r -> new PrimaryKeyAttributesFromDb(streamName, r.getString(JDBC_COLUMN_COLUMN_NAME), r.getInt(KEY_SEQ)))); + final Map> primaryKeys = aggregatePrimateKeys( + database.bufferedResultSetQuery( + connection -> connection.getMetaData() + .getPrimaryKeys(getCatalog(database), tableInfo.getNameSpace(), + tableInfo.getName()), + r -> new PrimaryKeyAttributesFromDb(streamName, + r.getString(JDBC_COLUMN_COLUMN_NAME), r.getInt(KEY_SEQ)))); return primaryKeys.getOrDefault(streamName, Collections.emptyList()); } catch (final SQLException e) { - LOGGER.error(String.format("Could not retrieve primary keys for %s: %s", streamName, e)); + LOGGER.error( + String.format("Could not retrieve primary keys for %s: %s", streamName, e)); return Collections.emptyList(); } })); @@ -357,13 +384,13 @@ public boolean isCursorType(final Datatype type) { @Override public AutoCloseableIterator queryTableIncremental(final JdbcDatabase database, - final List columnNames, - final String schemaName, - final String tableName, - final CursorInfo cursorInfo, - final Datatype cursorFieldType, - final String whereClause, - final String customSQL) { + final List columnNames, + final String schemaName, + final String tableName, + final CursorInfo cursorInfo, + final Datatype cursorFieldType, + final String whereClause, + final String customSQL) { LOGGER.info("Queueing query for table: {}", tableName); final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair airbyteStream = AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName); @@ -372,16 +399,20 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase final Stream stream = database.unsafeQuery( connection -> { LOGGER.info("Preparing query for table: {}", tableName); - final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()); - final String quotedCursorField = enquoteIdentifier(cursorInfo.getCursorField(), getQuoteString()); + final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, + tableName, getQuoteString()); + final String quotedCursorField = enquoteIdentifier(cursorInfo.getCursorField(), + getQuoteString()); final String operator; if (cursorInfo.getCursorRecordCount() <= 0L) { operator = ">"; } else { final long actualRecordCount = getActualCursorRecordCount( - connection, fullTableName, quotedCursorField, cursorFieldType, cursorInfo.getCursor()); - LOGGER.info("Table {} cursor count: expected {}, actual {}", tableName, cursorInfo.getCursorRecordCount(), actualRecordCount); + connection, fullTableName, quotedCursorField, cursorFieldType, + cursorInfo.getCursor()); + LOGGER.info("Table {} cursor count: expected {}, actual {}", tableName, + cursorInfo.getCursorRecordCount(), actualRecordCount); if (actualRecordCount == cursorInfo.getCursorRecordCount()) { operator = ">"; } else { @@ -389,10 +420,13 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase } } - final String wrappedColumnNames = getWrappedColumnNames(database, connection, columnNames, schemaName, tableName); + final String wrappedColumnNames = getWrappedColumnNames(database, connection, + columnNames, schemaName, tableName); StringBuilder sql = new StringBuilder(); - if (customSQL!= null && !customSQL.equals("")) { - sql = new StringBuilder(String.format("SELECT * FROM (%s) sc WHERE %s %s ?", customSQL, quotedCursorField, operator)); + if (customSQL != null && !customSQL.equals("")) { + sql = new StringBuilder( + String.format("SELECT * FROM (%s) sc WHERE %s %s ?", customSQL, + quotedCursorField, operator)); } else { sql = new StringBuilder(String.format("SELECT %s FROM %s WHERE %s %s ?", wrappedColumnNames, @@ -409,9 +443,11 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase sql.append(String.format(" ORDER BY %s ASC", quotedCursorField)); } - final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString()); + final PreparedStatement preparedStatement = connection.prepareStatement( + sql.toString()); LOGGER.info("Executing query for table {}: {}", tableName, sql); - sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor()); + sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, + cursorInfo.getCursor()); return preparedStatement; }, sourceOperations::rowToJson); @@ -426,10 +462,10 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase * Some databases need special column names in the query. */ protected String getWrappedColumnNames(final JdbcDatabase database, - final Connection connection, - final List columnNames, - final String schemaName, - final String tableName) + final Connection connection, + final List columnNames, + final String schemaName, + final String tableName) throws SQLException { return enquoteIdentifierList(columnNames, getQuoteString()); } @@ -439,15 +475,16 @@ protected String getCountColumnName() { } protected long getActualCursorRecordCount(final Connection connection, - final String fullTableName, - final String quotedCursorField, - final Datatype cursorFieldType, - final String cursor) + final String fullTableName, + final String quotedCursorField, + final Datatype cursorFieldType, + final String cursor) throws SQLException { final String columnName = getCountColumnName(); final PreparedStatement cursorRecordStatement; if (cursor == null) { - final String cursorRecordQuery = String.format("SELECT COUNT(*) AS %s FROM %s WHERE %s IS NULL", + final String cursorRecordQuery = String.format( + "SELECT COUNT(*) AS %s FROM %s WHERE %s IS NULL", columnName, fullTableName, quotedCursorField); @@ -457,7 +494,8 @@ protected long getActualCursorRecordCount(final Connection connection, columnName, fullTableName, quotedCursorField); - cursorRecordStatement = connection.prepareStatement(cursorRecordQuery);; + cursorRecordStatement = connection.prepareStatement(cursorRecordQuery); + ; sourceOperations.setCursorField(cursorRecordStatement, 1, cursorFieldType, cursor); } final ResultSet resultSet = cursorRecordStatement.executeQuery(); @@ -473,13 +511,17 @@ public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLExcept return createDatabase(sourceConfig, JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER); } - public JdbcDatabase createDatabase(final JsonNode sourceConfig, String delimiter) throws SQLException { + public JdbcDatabase createDatabase(final JsonNode sourceConfig, String delimiter) + throws SQLException { final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig); - Map connectionProperties = JdbcDataSourceUtils.getConnectionProperties(sourceConfig, delimiter); + Map connectionProperties = JdbcDataSourceUtils.getConnectionProperties( + sourceConfig, delimiter); // Create the data source final DataSource dataSource = DataSourceFactory.create( - jdbcConfig.has(JdbcUtils.USERNAME_KEY) ? jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText() : null, - jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null, + jdbcConfig.has(JdbcUtils.USERNAME_KEY) ? jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText() + : null, + jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() + : null, driverClassName, jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(), connectionProperties, @@ -492,7 +534,8 @@ public JdbcDatabase createDatabase(final JsonNode sourceConfig, String delimiter sourceOperations, streamingQueryConfigProvider); - quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString); + quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() + : quoteString); database.setSourceConfig(sourceConfig); database.setDatabaseConfig(jdbcConfig); return database; @@ -502,11 +545,12 @@ public JdbcDatabase createDatabase(final JsonNode sourceConfig, String delimiter * {@inheritDoc} * * @param database database instance - * @param catalog schema of the incoming messages. + * @param catalog schema of the incoming messages. * @throws SQLException */ @Override - protected void logPreSyncDebugData(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog) + protected void logPreSyncDebugData(final JdbcDatabase database, + final ConfiguredAirbyteCatalog catalog) throws SQLException { LOGGER.info("Data source product recognized as {}:{}", database.getMetaData().getDatabaseProductName(), @@ -525,20 +569,25 @@ public void close() { dataSources.clear(); } - protected List identifyStreamsToSnapshot(final ConfiguredAirbyteCatalog catalog, final StateManager stateManager) { - final Set alreadySyncedStreams = stateManager.getCdcStateManager().getInitialStreamsSynced(); + protected List identifyStreamsToSnapshot( + final ConfiguredAirbyteCatalog catalog, final StateManager stateManager) { + final Set alreadySyncedStreams = stateManager.getCdcStateManager() + .getInitialStreamsSynced(); if (alreadySyncedStreams.isEmpty() && (stateManager.getCdcStateManager().getCdcState() == null || stateManager.getCdcStateManager().getCdcState().getState() == null)) { return Collections.emptyList(); } - final Set allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog); + final Set allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog( + catalog); - final Set newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySyncedStreams)); + final Set newlyAddedStreams = new HashSet<>( + Sets.difference(allStreams, alreadySyncedStreams)); return catalog.getStreams().stream() .filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL) - .filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))) + .filter(stream -> newlyAddedStreams.contains( + AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))) .map(Jsons::clone) .collect(Collectors.toList()); } @@ -617,6 +666,7 @@ private TableInfo> getColumnMetadataForCustomSQL(ResultSet .put(INTERNAL_COLUMN_TYPE, metaData.getColumnType(i)) .put(INTERNAL_COLUMN_TYPE_NAME, metaData.getColumnTypeName(i)) .put(INTERNAL_COLUMN_SIZE, metaData.getColumnDisplaySize(i)) + .put(INTERNAL_DECIMAL_DIGITS, metaData.getScale(i)) .put(INTERNAL_IS_NULLABLE, metaData.isNullable(i)).build()) ); return new CommonField<>(columnName, datatype); From 2f8eb6e668a9e78ee610f32fe41b9e0829869b4a Mon Sep 17 00:00:00 2001 From: Jatin Yadav Date: Mon, 19 Aug 2024 14:15:28 +0530 Subject: [PATCH 2/3] files added --- airbyte-integrations/connectors/source-oracle/metadata.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-oracle/metadata.yaml b/airbyte-integrations/connectors/source-oracle/metadata.yaml index 1438d7d10843..152b46deccf0 100644 --- a/airbyte-integrations/connectors/source-oracle/metadata.yaml +++ b/airbyte-integrations/connectors/source-oracle/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: b39a7370-74c3-45a6-ac3a-380d48520a83 - dockerImageTag: 0.5.2-v1.0.4 + dockerImageTag: 0.5.2-v1.0.5 dockerRepository: datapipes-airbyte/source-oracle documentationUrl: https://docs.airbyte.com/integrations/sources/oracle githubIssueLabel: source-oracle From a0ebe146f062f255e372a62f56ffa73627270dc6 Mon Sep 17 00:00:00 2001 From: Jatin Yadav Date: Mon, 19 Aug 2024 14:18:23 +0530 Subject: [PATCH 3/3] files added --- .../source/jdbc/AbstractJdbcSource.java | 223 +++++++----------- 1 file changed, 87 insertions(+), 136 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.java index 64792604efbf..feb04a6e52a5 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.java @@ -91,8 +91,7 @@ * relational DB source which can be accessed via JDBC driver. If you are implementing a connector * for a relational DB which has a JDBC driver, make an effort to use this class. */ -public abstract class AbstractJdbcSource extends - AbstractDbSource implements Source { +public abstract class AbstractJdbcSource extends AbstractDbSource implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcSource.class); @@ -103,8 +102,8 @@ public abstract class AbstractJdbcSource extends protected Collection dataSources = new ArrayList<>(); public AbstractJdbcSource(final String driverClass, - final Supplier streamingQueryConfigProvider, - final JdbcCompatibleSourceOperations sourceOperations) { + final Supplier streamingQueryConfigProvider, + final JdbcCompatibleSourceOperations sourceOperations) { super(driverClass); this.streamingQueryConfigProvider = streamingQueryConfigProvider; this.sourceOperations = sourceOperations; @@ -126,15 +125,14 @@ protected AutoCloseableIterator queryTableFullRefresh(final JdbcDataba if (syncMode.equals(SyncMode.INCREMENTAL) && getStateEmissionFrequency() > 0) { final String quotedCursorField = enquoteIdentifier(cursorField.get(), getQuoteString()); String query = ""; - if (!whereClause.isEmpty()) { + if (!whereClause.equals("")) { query = String.format("SELECT %s FROM %s where %s ORDER BY %s ASC", enquoteIdentifierList(columnNames, getQuoteString()), getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()), whereClause, quotedCursorField); - } else if (customSQL != null && !customSQL.isEmpty()) { - query = String.format("SELECT * FROM (%s) sc ORDER BY %s ASC", customSQL, - quotedCursorField); + } else if (customSQL != null && !customSQL.equals("")) { + query = String.format("SELECT * FROM (%s) sc ORDER BY %s ASC", customSQL, quotedCursorField); } else { query = String.format("SELECT %s FROM %s ORDER BY %s ASC", enquoteIdentifierList(columnNames, getQuoteString()), @@ -171,12 +169,10 @@ protected AutoCloseableIterator queryTableFullRefresh(final JdbcDataba * @return list of consumers that run queries for the check command. */ @Trace(operationName = CHECK_TRACE_OPERATION_NAME) - protected List> getCheckOperations(final JsonNode config) - throws Exception { + protected List> getCheckOperations(final JsonNode config) throws Exception { return ImmutableList.of(database -> { LOGGER.info("Attempting to get metadata from the database to see if we can connect."); - database.bufferedResultSetQuery(connection -> connection.getMetaData().getCatalogs(), - sourceOperations::rowToJson); + database.bufferedResultSetQuery(connection -> connection.getMetaData().getCatalogs(), sourceOperations::rowToJson); }); } @@ -186,42 +182,36 @@ protected List> getCheckOperations(fina * @return a map by StreamName to associated list of primary keys */ @VisibleForTesting - public static Map> aggregatePrimateKeys( - final List entries) { + public static Map> aggregatePrimateKeys(final List entries) { final Map> result = new HashMap<>(); - entries.stream().sorted(Comparator.comparingInt(PrimaryKeyAttributesFromDb::keySequence)) - .forEach(entry -> { - if (!result.containsKey(entry.streamName())) { - result.put(entry.streamName(), new ArrayList<>()); - } - result.get(entry.streamName()).add(entry.primaryKey()); - }); + entries.stream().sorted(Comparator.comparingInt(PrimaryKeyAttributesFromDb::keySequence)).forEach(entry -> { + if (!result.containsKey(entry.streamName())) { + result.put(entry.streamName(), new ArrayList<>()); + } + result.get(entry.streamName()).add(entry.primaryKey()); + }); return result; } private String getCatalog(final SqlDatabase database) { - return (database.getSourceConfig().has(JdbcUtils.DATABASE_KEY) ? database.getSourceConfig() - .get(JdbcUtils.DATABASE_KEY).asText() : null); + return (database.getSourceConfig().has(JdbcUtils.DATABASE_KEY) ? database.getSourceConfig().get(JdbcUtils.DATABASE_KEY).asText() : null); } @Override - protected List>> discoverInternal(final JdbcDatabase database, - final String schema) throws Exception { + protected List>> discoverInternal(final JdbcDatabase database, final String schema) throws Exception { final Set internalSchemas = new HashSet<>(getExcludedInternalNameSpaces()); LOGGER.info("Internal schemas to exclude: {}", internalSchemas); - final Set tablesWithSelectGrantPrivilege = getPrivilegesTableForCurrentUser( - database, schema); + final Set tablesWithSelectGrantPrivilege = getPrivilegesTableForCurrentUser(database, schema); return database.bufferedResultSetQuery( - // retrieve column metadata from the database - connection -> connection.getMetaData().getColumns(getCatalog(database), schema, null, null), - // store essential column metadata to a Json object from the result set about each column - this::getColumnMetadata) + // retrieve column metadata from the database + connection -> connection.getMetaData().getColumns(getCatalog(database), schema, null, null), + // store essential column metadata to a Json object from the result set about each column + this::getColumnMetadata) .stream() .filter(excludeNotAccessibleTables(internalSchemas, tablesWithSelectGrantPrivilege)) // group by schema and table name to handle the case where a table with the same name exists in // multiple schemas. - .collect(Collectors.groupingBy(t -> ImmutablePair.of(t.get(INTERNAL_SCHEMA_NAME).asText(), - t.get(INTERNAL_TABLE_NAME).asText()))) + .collect(Collectors.groupingBy(t -> ImmutablePair.of(t.get(INTERNAL_SCHEMA_NAME).asText(), t.get(INTERNAL_TABLE_NAME).asText()))) .values() .stream() .map(fields -> TableInfo.>builder() @@ -239,8 +229,7 @@ protected List>> discoverInternal(final JdbcData f.get(INTERNAL_COLUMN_SIZE).asInt(), f.get(INTERNAL_IS_NULLABLE).asBoolean(), jsonType); - return new CommonField(f.get(INTERNAL_COLUMN_NAME).asText(), datatype) { - }; + return new CommonField(f.get(INTERNAL_COLUMN_NAME).asText(), datatype) {}; }) .collect(Collectors.toList())) .cursorFields(extractCursorFields(fields)) @@ -256,7 +245,7 @@ private List extractCursorFields(final List fields) { } protected Predicate excludeNotAccessibleTables(final Set internalSchemas, - final Set tablesWithSelectGrantPrivilege) { + final Set tablesWithSelectGrantPrivilege) { return jsonNode -> { if (tablesWithSelectGrantPrivilege.isEmpty()) { return isNotInternalSchema(jsonNode, internalSchemas); @@ -264,29 +253,26 @@ protected Predicate excludeNotAccessibleTables(final Set inter return tablesWithSelectGrantPrivilege.stream() .anyMatch(e -> e.getSchemaName().equals(jsonNode.get(INTERNAL_SCHEMA_NAME).asText())) && tablesWithSelectGrantPrivilege.stream() - .anyMatch(e -> e.getTableName().equals(jsonNode.get(INTERNAL_TABLE_NAME).asText())) + .anyMatch(e -> e.getTableName().equals(jsonNode.get(INTERNAL_TABLE_NAME).asText())) && !internalSchemas.contains(jsonNode.get(INTERNAL_SCHEMA_NAME).asText()); }; } // needs to override isNotInternalSchema for connectors that override // getPrivilegesTableForCurrentUser() - protected boolean isNotInternalSchema(final JsonNode jsonNode, - final Set internalSchemas) { + protected boolean isNotInternalSchema(final JsonNode jsonNode, final Set internalSchemas) { return !internalSchemas.contains(jsonNode.get(INTERNAL_SCHEMA_NAME).asText()); } /** * @param resultSet Description of a column available in the table catalog. - * @return Essential information about a column to determine which table it belongs to and its - * type. + * @return Essential information about a column to determine which table it belongs to and its type. */ private JsonNode getColumnMetadata(final ResultSet resultSet) throws SQLException { final var fieldMap = ImmutableMap.builder() // we always want a namespace, if we cannot get a schema, use db name. .put(INTERNAL_SCHEMA_NAME, - resultSet.getObject(JDBC_COLUMN_SCHEMA_NAME) != null ? resultSet.getString( - JDBC_COLUMN_SCHEMA_NAME) + resultSet.getObject(JDBC_COLUMN_SCHEMA_NAME) != null ? resultSet.getString(JDBC_COLUMN_SCHEMA_NAME) : resultSet.getObject(JDBC_COLUMN_DATABASE_NAME)) .put(INTERNAL_TABLE_NAME, resultSet.getString(JDBC_COLUMN_TABLE_NAME)) .put(INTERNAL_COLUMN_NAME, resultSet.getString(JDBC_COLUMN_COLUMN_NAME)) @@ -320,53 +306,40 @@ public record PrimaryKeyAttributesFromDb(String streamName, @Override protected Map> discoverPrimaryKeys(final JdbcDatabase database, - final List>> tableInfos) { - LOGGER.info( - "Discover primary keys for tables: " + tableInfos.stream().map(TableInfo::getName).collect( - Collectors.toSet())); + final List>> tableInfos) { + LOGGER.info("Discover primary keys for tables: " + tableInfos.stream().map(TableInfo::getName).collect( + Collectors.toSet())); try { // Get all primary keys without specifying a table name - final Map> tablePrimaryKeys = aggregatePrimateKeys( - database.bufferedResultSetQuery( - connection -> connection.getMetaData() - .getPrimaryKeys(getCatalog(database), null, null), - r -> { - final String schemaName = - r.getObject(JDBC_COLUMN_SCHEMA_NAME) != null ? r.getString( - JDBC_COLUMN_SCHEMA_NAME) : r.getString(JDBC_COLUMN_DATABASE_NAME); - final String streamName = JdbcUtils.getFullyQualifiedTableName(schemaName, - r.getString(JDBC_COLUMN_TABLE_NAME)); - final String primaryKey = r.getString(JDBC_COLUMN_COLUMN_NAME); - final int keySeq = r.getInt(KEY_SEQ); - return new PrimaryKeyAttributesFromDb(streamName, primaryKey, keySeq); - })); + final Map> tablePrimaryKeys = aggregatePrimateKeys(database.bufferedResultSetQuery( + connection -> connection.getMetaData().getPrimaryKeys(getCatalog(database), null, null), + r -> { + final String schemaName = + r.getObject(JDBC_COLUMN_SCHEMA_NAME) != null ? r.getString(JDBC_COLUMN_SCHEMA_NAME) : r.getString(JDBC_COLUMN_DATABASE_NAME); + final String streamName = JdbcUtils.getFullyQualifiedTableName(schemaName, r.getString(JDBC_COLUMN_TABLE_NAME)); + final String primaryKey = r.getString(JDBC_COLUMN_COLUMN_NAME); + final int keySeq = r.getInt(KEY_SEQ); + return new PrimaryKeyAttributesFromDb(streamName, primaryKey, keySeq); + })); if (!tablePrimaryKeys.isEmpty()) { return tablePrimaryKeys; } } catch (final SQLException e) { - LOGGER.debug( - String.format("Could not retrieve primary keys without a table name (%s), retrying", e)); + LOGGER.debug(String.format("Could not retrieve primary keys without a table name (%s), retrying", e)); } // Get primary keys one table at a time return tableInfos.stream() .collect(Collectors.toMap( - tableInfo -> JdbcUtils.getFullyQualifiedTableName(tableInfo.getNameSpace(), - tableInfo.getName()), + tableInfo -> JdbcUtils.getFullyQualifiedTableName(tableInfo.getNameSpace(), tableInfo.getName()), tableInfo -> { - final String streamName = JdbcUtils.getFullyQualifiedTableName( - tableInfo.getNameSpace(), tableInfo.getName()); + final String streamName = JdbcUtils.getFullyQualifiedTableName(tableInfo.getNameSpace(), tableInfo.getName()); try { - final Map> primaryKeys = aggregatePrimateKeys( - database.bufferedResultSetQuery( - connection -> connection.getMetaData() - .getPrimaryKeys(getCatalog(database), tableInfo.getNameSpace(), - tableInfo.getName()), - r -> new PrimaryKeyAttributesFromDb(streamName, - r.getString(JDBC_COLUMN_COLUMN_NAME), r.getInt(KEY_SEQ)))); + final Map> primaryKeys = aggregatePrimateKeys(database.bufferedResultSetQuery( + connection -> connection.getMetaData().getPrimaryKeys(getCatalog(database), tableInfo.getNameSpace(), tableInfo.getName()), + r -> new PrimaryKeyAttributesFromDb(streamName, r.getString(JDBC_COLUMN_COLUMN_NAME), r.getInt(KEY_SEQ)))); return primaryKeys.getOrDefault(streamName, Collections.emptyList()); } catch (final SQLException e) { - LOGGER.error( - String.format("Could not retrieve primary keys for %s: %s", streamName, e)); + LOGGER.error(String.format("Could not retrieve primary keys for %s: %s", streamName, e)); return Collections.emptyList(); } })); @@ -384,13 +357,13 @@ public boolean isCursorType(final Datatype type) { @Override public AutoCloseableIterator queryTableIncremental(final JdbcDatabase database, - final List columnNames, - final String schemaName, - final String tableName, - final CursorInfo cursorInfo, - final Datatype cursorFieldType, - final String whereClause, - final String customSQL) { + final List columnNames, + final String schemaName, + final String tableName, + final CursorInfo cursorInfo, + final Datatype cursorFieldType, + final String whereClause, + final String customSQL) { LOGGER.info("Queueing query for table: {}", tableName); final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair airbyteStream = AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName); @@ -399,20 +372,16 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase final Stream stream = database.unsafeQuery( connection -> { LOGGER.info("Preparing query for table: {}", tableName); - final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, - tableName, getQuoteString()); - final String quotedCursorField = enquoteIdentifier(cursorInfo.getCursorField(), - getQuoteString()); + final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()); + final String quotedCursorField = enquoteIdentifier(cursorInfo.getCursorField(), getQuoteString()); final String operator; if (cursorInfo.getCursorRecordCount() <= 0L) { operator = ">"; } else { final long actualRecordCount = getActualCursorRecordCount( - connection, fullTableName, quotedCursorField, cursorFieldType, - cursorInfo.getCursor()); - LOGGER.info("Table {} cursor count: expected {}, actual {}", tableName, - cursorInfo.getCursorRecordCount(), actualRecordCount); + connection, fullTableName, quotedCursorField, cursorFieldType, cursorInfo.getCursor()); + LOGGER.info("Table {} cursor count: expected {}, actual {}", tableName, cursorInfo.getCursorRecordCount(), actualRecordCount); if (actualRecordCount == cursorInfo.getCursorRecordCount()) { operator = ">"; } else { @@ -420,13 +389,10 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase } } - final String wrappedColumnNames = getWrappedColumnNames(database, connection, - columnNames, schemaName, tableName); + final String wrappedColumnNames = getWrappedColumnNames(database, connection, columnNames, schemaName, tableName); StringBuilder sql = new StringBuilder(); - if (customSQL != null && !customSQL.equals("")) { - sql = new StringBuilder( - String.format("SELECT * FROM (%s) sc WHERE %s %s ?", customSQL, - quotedCursorField, operator)); + if (customSQL!= null && !customSQL.equals("")) { + sql = new StringBuilder(String.format("SELECT * FROM (%s) sc WHERE %s %s ?", customSQL, quotedCursorField, operator)); } else { sql = new StringBuilder(String.format("SELECT %s FROM %s WHERE %s %s ?", wrappedColumnNames, @@ -443,11 +409,9 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase sql.append(String.format(" ORDER BY %s ASC", quotedCursorField)); } - final PreparedStatement preparedStatement = connection.prepareStatement( - sql.toString()); + final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString()); LOGGER.info("Executing query for table {}: {}", tableName, sql); - sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, - cursorInfo.getCursor()); + sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor()); return preparedStatement; }, sourceOperations::rowToJson); @@ -462,10 +426,10 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase * Some databases need special column names in the query. */ protected String getWrappedColumnNames(final JdbcDatabase database, - final Connection connection, - final List columnNames, - final String schemaName, - final String tableName) + final Connection connection, + final List columnNames, + final String schemaName, + final String tableName) throws SQLException { return enquoteIdentifierList(columnNames, getQuoteString()); } @@ -475,16 +439,15 @@ protected String getCountColumnName() { } protected long getActualCursorRecordCount(final Connection connection, - final String fullTableName, - final String quotedCursorField, - final Datatype cursorFieldType, - final String cursor) + final String fullTableName, + final String quotedCursorField, + final Datatype cursorFieldType, + final String cursor) throws SQLException { final String columnName = getCountColumnName(); final PreparedStatement cursorRecordStatement; if (cursor == null) { - final String cursorRecordQuery = String.format( - "SELECT COUNT(*) AS %s FROM %s WHERE %s IS NULL", + final String cursorRecordQuery = String.format("SELECT COUNT(*) AS %s FROM %s WHERE %s IS NULL", columnName, fullTableName, quotedCursorField); @@ -494,8 +457,7 @@ protected long getActualCursorRecordCount(final Connection connection, columnName, fullTableName, quotedCursorField); - cursorRecordStatement = connection.prepareStatement(cursorRecordQuery); - ; + cursorRecordStatement = connection.prepareStatement(cursorRecordQuery);; sourceOperations.setCursorField(cursorRecordStatement, 1, cursorFieldType, cursor); } final ResultSet resultSet = cursorRecordStatement.executeQuery(); @@ -511,17 +473,13 @@ public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLExcept return createDatabase(sourceConfig, JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER); } - public JdbcDatabase createDatabase(final JsonNode sourceConfig, String delimiter) - throws SQLException { + public JdbcDatabase createDatabase(final JsonNode sourceConfig, String delimiter) throws SQLException { final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig); - Map connectionProperties = JdbcDataSourceUtils.getConnectionProperties( - sourceConfig, delimiter); + Map connectionProperties = JdbcDataSourceUtils.getConnectionProperties(sourceConfig, delimiter); // Create the data source final DataSource dataSource = DataSourceFactory.create( - jdbcConfig.has(JdbcUtils.USERNAME_KEY) ? jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText() - : null, - jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() - : null, + jdbcConfig.has(JdbcUtils.USERNAME_KEY) ? jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText() : null, + jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null, driverClassName, jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(), connectionProperties, @@ -534,8 +492,7 @@ public JdbcDatabase createDatabase(final JsonNode sourceConfig, String delimiter sourceOperations, streamingQueryConfigProvider); - quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() - : quoteString); + quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString); database.setSourceConfig(sourceConfig); database.setDatabaseConfig(jdbcConfig); return database; @@ -545,12 +502,11 @@ public JdbcDatabase createDatabase(final JsonNode sourceConfig, String delimiter * {@inheritDoc} * * @param database database instance - * @param catalog schema of the incoming messages. + * @param catalog schema of the incoming messages. * @throws SQLException */ @Override - protected void logPreSyncDebugData(final JdbcDatabase database, - final ConfiguredAirbyteCatalog catalog) + protected void logPreSyncDebugData(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog) throws SQLException { LOGGER.info("Data source product recognized as {}:{}", database.getMetaData().getDatabaseProductName(), @@ -569,25 +525,20 @@ public void close() { dataSources.clear(); } - protected List identifyStreamsToSnapshot( - final ConfiguredAirbyteCatalog catalog, final StateManager stateManager) { - final Set alreadySyncedStreams = stateManager.getCdcStateManager() - .getInitialStreamsSynced(); + protected List identifyStreamsToSnapshot(final ConfiguredAirbyteCatalog catalog, final StateManager stateManager) { + final Set alreadySyncedStreams = stateManager.getCdcStateManager().getInitialStreamsSynced(); if (alreadySyncedStreams.isEmpty() && (stateManager.getCdcStateManager().getCdcState() == null || stateManager.getCdcStateManager().getCdcState().getState() == null)) { return Collections.emptyList(); } - final Set allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog( - catalog); + final Set allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog); - final Set newlyAddedStreams = new HashSet<>( - Sets.difference(allStreams, alreadySyncedStreams)); + final Set newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySyncedStreams)); return catalog.getStreams().stream() .filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL) - .filter(stream -> newlyAddedStreams.contains( - AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))) + .filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))) .map(Jsons::clone) .collect(Collectors.toList()); }