Skip to content

Commit

Permalink
Destination redshift: delete unused spec option (#36365)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Apr 5, 2024
1 parent 4342182 commit 9be3a8e
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 2.4.1
dockerImageTag: 2.4.2
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static io.airbyte.integrations.destination.redshift.util.RedshiftUtil.findS3Options;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
Expand All @@ -30,7 +29,6 @@
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator;
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption;
import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption.KeyType;
import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig;
Expand Down Expand Up @@ -225,14 +223,6 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
: new NoEncryption();
final JsonNode s3Options = findS3Options(config);
final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options);
final int numberOfFileBuffers = getNumberOfFileBuffers(s3Options);
if (numberOfFileBuffers > FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER) {
LOGGER.warn("""
Increasing the number of file buffers past {} can lead to increased performance but
leads to increased memory usage. If the number of file buffers exceeds the number
of streams {} this will create more buffers than necessary, leading to nonexistent gains
""", FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER, catalog.getStreams().size());
}

final String defaultNamespace = config.get("schema").asText();
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
Expand Down Expand Up @@ -287,26 +277,6 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
.createAsync();
}

/**
* Retrieves user configured file buffer amount so as long it doesn't exceed the maximum number of
* file buffers and sets the minimum number to the default
* <p>
* NOTE: If Out Of Memory Exceptions (OOME) occur, this can be a likely cause as this hard limit has
* not been thoroughly load tested across all instance sizes
*
* @param config user configurations
* @return number of file buffers if configured otherwise default
*/
@VisibleForTesting
public int getNumberOfFileBuffers(final JsonNode config) {
int numOfFileBuffers = FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER;
if (config.has(FileBuffer.FILE_BUFFER_COUNT_KEY)) {
numOfFileBuffers = Math.min(config.get(FileBuffer.FILE_BUFFER_COUNT_KEY).asInt(), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER);
}
// Only allows for values 10 <= numOfFileBuffers <= 50
return Math.max(numOfFileBuffers, FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER);
}

private boolean isPurgeStagingData(final JsonNode config) {
return !config.has("purge_staging_data") || config.get("purge_staging_data").asBoolean();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,6 @@
}
],
"order": 7
},
"file_buffer_count": {
"title": "File Buffer Count",
"type": "integer",
"minimum": 10,
"maximum": 50,
"default": 10,
"description": "Number of file buffers allocated for writing data. Increasing this number is beneficial for connections using Change Data Capture (CDC) and up to the number of streams within a connection. Increasing the number of file buffers past the maximum number of streams has deteriorating effects",
"examples": ["10"]
}
}
},
Expand Down

This file was deleted.

6 changes: 1 addition & 5 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ Optional parameters:
`bucketPath/namespace/streamName/syncDate_epochMillis_randomUuid.csv` containing three columns
(`ab_id`, `data`, `emitted_at`). Normally these files are deleted after the `COPY` command
completes; if you want to keep them for other purposes, set `purge_staging_data` to `false`.
- **File Buffer Count**
- Number of file buffers allocated for writing data. Increasing this number is beneficial for
connections using Change Data Capture (CDC) and up to the number of streams within a connection.
Increasing the number of file buffers past the maximum number of streams has deteriorating
effects.

NOTE: S3 staging does not use the SSH Tunnel option for copying data, if configured. SSH Tunnel
supports the SQL connection only. S3 is secured through public HTTPS access only. Subsequent typing
Expand Down Expand Up @@ -247,6 +242,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 2.4.2 | 2024-04-05 | [\#36365](https://github.com/airbytehq/airbyte/pull/36365) | Remove unused config option |
| 2.4.1 | 2024-04-04 | [#36846](https://github.com/airbytehq/airbyte/pull/36846) | Remove duplicate S3 Region |
| 2.4.0 | 2024-03-21 | [\#36589](https://github.com/airbytehq/airbyte/pull/36589) | Adapt to Kotlin cdk 0.28.19 |
| 2.3.2 | 2024-03-21 | [\#36374](https://github.com/airbytehq/airbyte/pull/36374) | Supress Jooq DataAccessException error message in logs |
Expand Down

0 comments on commit 9be3a8e

Please sign in to comment.