Skip to content

Commit

Permalink
Destination Redshift - additional check method check, fix s3 file del…
Browse files Browse the repository at this point in the history
…etion (#34186)

Signed-off-by: Gireesh Sreepathi <[email protected]>
Co-authored-by: Sitaram Shelke <[email protected]>
Co-authored-by: Marcos Marx <[email protected]>
Co-authored-by: Cynthia Yin <[email protected]>
Co-authored-by: Baz <[email protected]>
Co-authored-by: bazarnov <[email protected]>
Co-authored-by: kekiss <[email protected]>
Co-authored-by: Anatolii Yatsuk <[email protected]>
Co-authored-by: Edward Gao <[email protected]>
Co-authored-by: Augustin <[email protected]>
Co-authored-by: Joe Reuter <[email protected]>
Co-authored-by: Alexandre Cuoci <[email protected]>
Co-authored-by: perangel <[email protected]>
Co-authored-by: Aaron ("AJ") Steers <[email protected]>
Co-authored-by: Ben Church <[email protected]>
Co-authored-by: Gireesh Sreepathi <[email protected]>
  • Loading branch information
16 people authored Jan 16, 2024
1 parent 07579bd commit db83e14
Show file tree
Hide file tree
Showing 18 changed files with 82 additions and 91 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.12.1 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Add hook for additional destination specific checks to JDBC destination check method |
| 0.12.0 | 2024-01-10 | [\#33875](https://github.com/airbytehq/airbyte/pull/33875) | Upgrade sshd-mina to 2.11.1 |
| 0.11.5 | 2024-01-10 | [\#34119](https://github.com/airbytehq/airbyte/pull/34119) | Remove wal2json support for postgres+debezium. |
| 0.11.4 | 2024-01-09 | [\#33305](https://github.com/airbytehq/airbyte/pull/33305) | Source stats in incremental syncs |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.12.0
version=0.12.1
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
final var v2RawSchema = namingResolver.getIdentifier(TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
attemptTableOperations(v2RawSchema, database, namingResolver, sqlOperations, false);
destinationSpecificTableOperations(database);
}
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (final ConnectionErrorException ex) {
Expand All @@ -114,6 +115,15 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

/**
* Specific Databases may have additional checks unique to them which they need to perform, override
* this method to add additional checks.
*
* @param database the database to run checks against
* @throws Exception
*/
protected void destinationSpecificTableOperations(final JdbcDatabase database) throws Exception {}

/**
* This method is deprecated. It verifies table creation, but not insert right to a newly created
* table. Use attemptTableOperations with the attemptInsert argument instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -22,6 +23,16 @@
@Slf4j
public class GeneralStagingFunctions {

// using a random string here as a placeholder for the moment.
// This would avoid mixing data in the staging area between different syncs (especially if they
// manipulate streams with similar names)
// if we replaced the random connection id by the actual connection_id, we'd gain the opportunity to
// leverage data that was uploaded to stage
// in a previous attempt but failed to load to the warehouse for some reason (interrupted?) instead.
// This would also allow other programs/scripts
// to load (or reload backups?) in the connection's staging area to be loaded at the next sync.
public static final UUID RANDOM_CONNECTION_ID = UUID.randomUUID();

public static OnStartFunction onStartFunction(final JdbcDatabase database,
final StagingOperations stagingOperations,
final List<WriteConfig> writeConfigs,
Expand All @@ -34,7 +45,6 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database,
final String schema = writeConfig.getOutputSchemaName();
final String stream = writeConfig.getStreamName();
final String dstTableName = writeConfig.getOutputTableName();
final String stageName = stagingOperations.getStageName(schema, dstTableName);
final String stagingPath =
stagingOperations.getStagingPath(SerialStagingConsumerFactory.RANDOM_CONNECTION_ID, schema, stream, writeConfig.getOutputTableName(),
writeConfig.getWriteDatetime());
Expand All @@ -44,7 +54,7 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database,

stagingOperations.createSchemaIfNotExists(database, schema);
stagingOperations.createTableIfNotExists(database, schema, dstTableName);
stagingOperations.createStageIfNotExists(database, stageName);
stagingOperations.createStageIfNotExists();

/*
* When we're in OVERWRITE, clear out the table at the start of a sync, this is an expected side
Expand All @@ -68,7 +78,6 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database,
* upload was unsuccessful
*/
public static void copyIntoTableFromStage(final JdbcDatabase database,
final String stageName,
final String stagingPath,
final List<String> stagedFiles,
final String tableName,
Expand All @@ -83,7 +92,7 @@ public static void copyIntoTableFromStage(final JdbcDatabase database,
final Lock rawTableInsertLock = typerDeduper.getRawTableInsertLock(streamNamespace, streamName);
rawTableInsertLock.lock();
try {
stagingOperations.copyIntoTableFromStage(database, stageName, stagingPath, stagedFiles,
stagingOperations.copyIntoTableFromStage(database, stagingPath, stagedFiles,
tableName, schemaName);
} finally {
rawTableInsertLock.unlock();
Expand All @@ -96,8 +105,6 @@ public static void copyIntoTableFromStage(final JdbcDatabase database,
typerDeduperValve.updateTimeAndIncreaseInterval(streamId);
}
} catch (final Exception e) {
stagingOperations.cleanUpStage(database, stageName, stagedFiles);
log.info("Cleaning stage path {}", stagingPath);
throw new RuntimeException("Failed to upload data from stage " + stagingPath, e);
}
}
Expand All @@ -124,10 +131,15 @@ public static OnCloseFunction onCloseFunction(final JdbcDatabase database,
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
if (purgeStagingData) {
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getOutputTableName());
final String stagePath = stagingOperations.getStagingPath(
RANDOM_CONNECTION_ID,
schemaName,
writeConfig.getStreamName(),
writeConfig.getOutputTableName(),
writeConfig.getWriteDatetime());
log.info("Cleaning stage in destination started for stream {}. schema {}, stage: {}", writeConfig.getStreamName(), schemaName,
stageName);
stagingOperations.dropStageIfExists(database, stageName);
stagePath);
stagingOperations.dropStageIfExists(database, stagePath);
}
}
typerDeduper.commitFinalTables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,14 @@ public static FlushBufferFunction function(

final WriteConfig writeConfig = pairToWriteConfig.get(pair);
final String schemaName = writeConfig.getOutputSchemaName();
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getOutputTableName());
final String stagingPath =
stagingOperations.getStagingPath(
SerialStagingConsumerFactory.RANDOM_CONNECTION_ID, schemaName, writeConfig.getStreamName(),
writeConfig.getOutputTableName(), writeConfig.getWriteDatetime());
try (writer) {
writer.flush();
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stageName, stagingPath);
GeneralStagingFunctions.copyIntoTableFromStage(database, stageName, stagingPath, List.of(stagedFile), writeConfig.getOutputTableName(),
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stagingPath);
GeneralStagingFunctions.copyIntoTableFromStage(database, stagingPath, List.of(stagedFile), writeConfig.getOutputTableName(),
schemaName,
stagingOperations,
writeConfig.getNamespace(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@
*/
public interface StagingOperations extends SqlOperations {

/**
* Returns the staging environment's name
*
* @param namespace Name of schema
* @param streamName Name of the stream
* @return Fully qualified name of the staging environment
*/
String getStageName(String namespace, String streamName);

/**
* @param outputTableName The name of the table this staging file will be loaded into (typically a
* raw table). Not all destinations use the table name in the staging path (e.g. Snowflake
Expand All @@ -37,48 +28,36 @@ public interface StagingOperations extends SqlOperations {
/**
* Create a staging folder where to upload temporary files before loading into the final destination
*/
void createStageIfNotExists(JdbcDatabase database, String stageName) throws Exception;
void createStageIfNotExists() throws Exception;

/**
* Upload the data file into the stage area.
*
* @param database database used for syncing
* @param recordsData records stored in in-memory buffer
* @param schemaName name of schema
* @param stageName name of the staging area folder
* @param stagingPath path of staging folder to data files
* @return the name of the file that was uploaded.
*/
String uploadRecordsToStage(JdbcDatabase database, SerializableBuffer recordsData, String schemaName, String stageName, String stagingPath)
String uploadRecordsToStage(JdbcDatabase database, SerializableBuffer recordsData, String schemaName, String stagingPath)
throws Exception;

/**
* Load the data stored in the stage area into a temporary table in the destination
*
* @param database database interface
* @param stageName name of staging area folder
* @param stagingPath path to staging files
* @param stagedFiles collection of staged files
* @param tableName name of table to write staging files to
* @param schemaName name of schema
*/
void copyIntoTableFromStage(JdbcDatabase database,
String stageName,
String stagingPath,
List<String> stagedFiles,
String tableName,
String schemaName)
throws Exception;

/**
* Remove files that were just staged
*
* @param database database used for syncing
* @param stageName name of staging area folder
* @param stagedFiles collection of the staging files to remove
*/
void cleanUpStage(JdbcDatabase database, String stageName, List<String> stagedFiles) throws Exception;

/**
* Delete the stage area and all staged files that was in it
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected BlobStorageOperations() {
*
* @return the name of the file that was uploaded.
*/
public abstract String uploadRecordsToBucket(SerializableBuffer recordsData, String namespace, String streamName, String objectPath)
public abstract String uploadRecordsToBucket(SerializableBuffer recordsData, String namespace, String objectPath)
throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ private FlushBufferFunction flushBufferFunction(final BlobStorageOperations stor
writeConfig.addStoredFile(storageOperations.uploadRecordsToBucket(
writer,
writeConfig.getNamespace(),
writeConfig.getStreamName(),
writeConfig.getFullOutputPath()));
} catch (final Exception e) {
LOGGER.error("Failed to flush and upload buffer to storage:", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ protected boolean doesBucketExist(final String bucket) {
@Override
public String uploadRecordsToBucket(final SerializableBuffer recordsData,
final String namespace,
final String streamName,
final String objectPath) {
final List<Exception> exceptionsThrown = new ArrayList<>();
while (exceptionsThrown.size() < UPLOAD_RETRY_LIMIT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,17 @@ public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessag

final WriteConfig writeConfig = streamDescToWriteConfig.get(decs);
final String schemaName = writeConfig.getOutputSchemaName();
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getOutputTableName());
final String stagingPath =
stagingOperations.getStagingPath(
StagingConsumerFactory.RANDOM_CONNECTION_ID,
GeneralStagingFunctions.RANDOM_CONNECTION_ID,
schemaName,
writeConfig.getStreamName(),
writeConfig.getOutputTableName(),
writeConfig.getWriteDatetime());
try {
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stageName, stagingPath);
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stagingPath);
GeneralStagingFunctions.copyIntoTableFromStage(
database,
stageName,
stagingPath,
List.of(stagedFile),
writeConfig.getOutputTableName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import org.joda.time.DateTime;
Expand All @@ -49,16 +48,7 @@ public class StagingConsumerFactory extends SerialStagingConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(StagingConsumerFactory.class);

// using a random string here as a placeholder for the moment.
// This would avoid mixing data in the staging area between different syncs (especially if they
// manipulate streams with similar names)
// if we replaced the random connection id by the actual connection_id, we'd gain the opportunity to
// leverage data that was uploaded to stage
// in a previous attempt but failed to load to the warehouse for some reason (interrupted?) instead.
// This would also allow other programs/scripts
// to load (or reload backups?) in the connection's staging area to be loaded at the next sync.
private static final DateTime SYNC_DATETIME = DateTime.now(DateTimeZone.UTC);
public static final UUID RANDOM_CONNECTION_ID = UUID.randomUUID();

public SerializedAirbyteMessageConsumer createAsync(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.12.0'
cdkVersionRequired = '0.12.1'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}

//remove once upgrading the CDK version to 0.4.x or later
java {
compileJava {
options.compilerArgs.remove("-Werror")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.7.14
dockerImageTag: 0.7.15
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator;
import io.airbyte.integrations.destination.redshift.util.RedshiftUtil;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -60,6 +61,11 @@ public DataSource getDataSource(final JsonNode config) {
Duration.ofMinutes(2));
}

@Override
protected void destinationSpecificTableOperations(final JdbcDatabase database) throws Exception {
RedshiftUtil.checkSvvTableAccess(database);
}

@Override
public JdbcDatabase getDatabase(final DataSource dataSource) {
return new DefaultJdbcDatabase(dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator;
import io.airbyte.integrations.destination.redshift.util.RedshiftUtil;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand Down Expand Up @@ -103,7 +104,8 @@ public AirbyteConnectionStatus check(final JsonNode config) {
try {
final JdbcDatabase database = new DefaultJdbcDatabase(dataSource);
final String outputSchema = super.getNamingResolver().getIdentifier(config.get(JdbcUtils.SCHEMA_KEY).asText());
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, redshiftS3StagingSqlOperations);
attemptTableOperations(outputSchema, database, nameTransformer, redshiftS3StagingSqlOperations, false);
RedshiftUtil.checkSvvTableAccess(database);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final ConnectionErrorException e) {
final String message = getErrorMessage(e.getStateCode(), e.getErrorCode(), e.getExceptionMessage(), e);
Expand Down
Loading

0 comments on commit db83e14

Please sign in to comment.