Skip to content

Commit

Permalink
Redshift table migrator
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed Dec 19, 2023
1 parent f28fb86 commit 6d6bd0c
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 45 deletions.
150 changes: 112 additions & 38 deletions docs/release_notes/destinations_v2.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import React, {useState} from 'react';
import CodeBlock from '@theme/CodeBlock';
import React, { useState } from "react";
import CodeBlock from "@theme/CodeBlock";

function concatenateRawTableName(namespace, name) {
let plainConcat = namespace + name;
Expand All @@ -8,18 +8,21 @@ function concatenateRawTableName(namespace, name) {
for (let i = 0; i < plainConcat.length; i++) {
// If we've found an underscore, count the number of consecutive underscores
let underscoreRun = 0;
while (i < plainConcat.length && plainConcat.charAt(i) === '_') {
underscoreRun++;
i++;
while (i < plainConcat.length && plainConcat.charAt(i) === "_") {
underscoreRun++;
i++;
}
longestUnderscoreRun = Math.max(longestUnderscoreRun, underscoreRun);
}
return namespace + "_raw" + "_".repeat(longestUnderscoreRun + 1) + "stream_" + name;
return (
namespace + "_raw" + "_".repeat(longestUnderscoreRun + 1) + "stream_" + name
);
}

// Taken from StandardNameTransformer
function convertStreamName(str) {
return str.normalize('NFKD')
return str
.normalize("NFKD")
.replaceAll(/\p{M}/gu, "")
.replaceAll(/\s+/g, "_")
.replaceAll(/[^A-Za-z0-9_]/g, "_");
Expand All @@ -44,9 +47,12 @@ export const BigQueryMigrationGenerator = () => {
}

function generateSql(og_namespace, new_namespace, name, raw_dataset) {
let v2RawTableName = '`' + bigqueryConvertStreamName(concatenateRawTableName(new_namespace, name)) + '`';
let v1namespace = '`' + escapeNamespace(og_namespace) + '`';
let v1name = '`' + bigqueryConvertStreamName("_airbyte_raw_" + name) + '`';
let v2RawTableName =
"`" +
bigqueryConvertStreamName(concatenateRawTableName(new_namespace, name)) +
"`";
let v1namespace = "`" + escapeNamespace(og_namespace) + "`";
let v1name = "`" + bigqueryConvertStreamName("_airbyte_raw_" + name) + "`";
return `CREATE SCHEMA IF NOT EXISTS ${raw_dataset};
CREATE OR REPLACE TABLE \`${raw_dataset}\`.${v2RawTableName} (
_airbyte_raw_id STRING,
Expand All @@ -66,9 +72,9 @@ AS (
}

return (
<MigrationGenerator destination="bigquery" generateSql={generateSql}/>
<MigrationGenerator destination="bigquery" generateSql={generateSql} />
);
}
};

export const SnowflakeMigrationGenerator = () => {
// See SnowflakeSQLNameTransformer
Expand All @@ -81,7 +87,8 @@ export const SnowflakeMigrationGenerator = () => {
}
}
function generateSql(og_namespace, new_namespace, name, raw_schema) {
let v2RawTableName = '"' + concatenateRawTableName(new_namespace, name) + '"';
let v2RawTableName =
'"' + concatenateRawTableName(new_namespace, name) + '"';
let v1namespace = snowflakeConvertStreamName(og_namespace);
let v1name = snowflakeConvertStreamName("_airbyte_raw_" + name);
return `CREATE SCHEMA IF NOT EXISTS "${raw_schema}";
Expand All @@ -100,53 +107,120 @@ AS (
)`;
}
return (
<MigrationGenerator destination="snowflake" generateSql={generateSql}/>
<MigrationGenerator destination="snowflake" generateSql={generateSql} />
);
}
};

export const MigrationGenerator = ({destination, generateSql}) => {
const defaultMessage =
`Enter your stream's name and namespace to see the SQL output.
export const RedshiftMigrationGenerator = () => {
// See RedshiftSQLNameTransformer
function redshiftConvertStreamName(str) {
str = convertStreamName(str);
if (str.charAt(0).match(/[A-Za-z_]/)) {
return str;
} else {
return "_" + str;
}
}
function generateSql(og_namespace, new_namespace, name, raw_schema) {
let v2RawTableName =
'"' + concatenateRawTableName(new_namespace, name) + '"';
let v1namespace = redshiftConvertStreamName(og_namespace);
let v1name = redshiftConvertStreamName("_airbyte_raw_" + name);
return `CREATE SCHEMA IF NOT EXISTS "${raw_schema}";
DROP TABLE IF EXISTS "${raw_schema}".${v2RawTableName};
CREATE TABLE "${raw_schema}".${v2RawTableName} (
"_airbyte_raw_id" VARCHAR(36) NOT NULL PRIMARY KEY
, "_airbyte_extracted_at" TIMESTAMPTZ DEFAULT NOW()
, "_airbyte_loaded_at" TIMESTAMPTZ
, "_airbyte_data" SUPER
);
INSERT INTO "${raw_schema}".${v2RawTableName} (
SELECT
_airbyte_ab_id AS "_airbyte_raw_id",
_airbyte_emitted_at AS "_airbyte_extracted_at",
CAST(NULL AS TIMESTAMPTZ) AS "_airbyte_loaded_at",
_airbyte_data AS "_airbyte_data"
FROM ${v1namespace}.${v1name}
);`;
}
return (
<MigrationGenerator destination="redshift" generateSql={generateSql} />
);
};

export const MigrationGenerator = ({ destination, generateSql }) => {
const defaultMessage = `Enter your stream's name and namespace to see the SQL output.
If your stream has no namespace, take the default value from the destination connector's settings.`;
const [message, updateMessage] = useState({
'message': defaultMessage,
'language': 'text'
message: defaultMessage,
language: "text",
});
function updateSql(event) {
let og_namespace = document.getElementById("og_stream_namespace_" + destination).value;
let new_namespace = document.getElementById("new_stream_namespace_" + destination).value;
let og_namespace = document.getElementById(
"og_stream_namespace_" + destination
).value;
let new_namespace = document.getElementById(
"new_stream_namespace_" + destination
).value;
let name = document.getElementById("stream_name_" + destination).value;
var raw_dataset = document.getElementById("raw_dataset_" + destination).value;
if (raw_dataset === '') {
raw_dataset = 'airbyte_internal';
var raw_dataset = document.getElementById(
"raw_dataset_" + destination
).value;
if (raw_dataset === "") {
raw_dataset = "airbyte_internal";
}
let sql = generateSql(og_namespace, new_namespace, name, raw_dataset);
if ([og_namespace, new_namespace, name].every(text => text != "")) {
if ([og_namespace, new_namespace, name].every((text) => text != "")) {
updateMessage({
'message': sql,
'language': 'sql'
message: sql,
language: "sql",
});
} else {
updateMessage({
'message': defaultMessage,
'language': 'text'
message: defaultMessage,
language: "text",
});
}
}

return (
<div>
<label>Original Stream namespace </label>
<input type="text" id={"og_stream_namespace_" + destination} onChange={ updateSql }/><br/>
<input
type="text"
id={"og_stream_namespace_" + destination}
onChange={updateSql}
/>
<br />
<label>New Stream namespace (to avoid overwriting)</label>
<input type="text" id={"new_stream_namespace_" + destination} onChange={ updateSql }/><br/>
<input
type="text"
id={"new_stream_namespace_" + destination}
onChange={updateSql}
/>
<br />
<label>Stream name </label>
<input type="text" id={"stream_name_" + destination} onChange={ updateSql }/><br/>
<label>Raw table dataset/schema (defaults to <code>airbyte_internal</code>) </label>
<input type="text" id={"raw_dataset_" + destination} onChange={ updateSql }/><br/>
<CodeBlock id={ "sql_output_block_" + destination } language={ message['language'] }>
{ message['message'] }
<input
type="text"
id={"stream_name_" + destination}
onChange={updateSql}
/>
<br />
<label>
Raw table dataset/schema (defaults to <code>airbyte_internal</code>){" "}
</label>
<input
type="text"
id={"raw_dataset_" + destination}
onChange={updateSql}
/>
<br />
<CodeBlock
id={"sql_output_block_" + destination}
language={message["language"]}
>
{message["message"]}
</CodeBlock>
</div>
);
}
};
16 changes: 9 additions & 7 deletions docs/release_notes/upgrading_to_destinations_v2.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
import {SnowflakeMigrationGenerator, BigQueryMigrationGenerator} from './destinations_v2.js'
import {SnowflakeMigrationGenerator, BigQueryMigrationGenerator, RedshiftMigrationGenerator} from './destinations_v2.js'

# Upgrading to Destinations V2

Expand Down Expand Up @@ -32,7 +32,7 @@ Note that Destinations V2 also removes the option to _only_ replicate raw data.
The following table details the delivered data modified by Destinations V2:

| Current Normalization Setting | Source Type | Impacted Data (Breaking Changes) |
|-------------------------------|---------------------------------------|----------------------------------------------------------|
| ----------------------------- | ------------------------------------- | -------------------------------------------------------- |
| Raw JSON | All | `_airbyte` metadata columns, raw table location |
| Normalized tabular data | API Source | Unnested tables, `_airbyte` metadata columns, SCD tables |
| Normalized tabular data | Tabular Source (database, file, etc.) | `_airbyte` metadata columns, SCD tables |
Expand All @@ -43,15 +43,14 @@ Whenever possible, we've taken this opportunity to use the best data type for st

## Quick Start to Upgrading

**The quickest path to upgrading is to click upgrade on any out-of-date connection in the UI**. The advanced options later in this document will allow you to test out the upgrade in more detail if you choose.
**The quickest path to upgrading is to click upgrade on any out-of-date connection in the UI**. The advanced options later in this document will allow you to test out the upgrade in more detail if you choose.

:::caution

**[Airbyte Open Source Only]** You should upgrade to 0.50.24+ of the Airbyte Platform _before_ updating to Destinations V2. Failure to do so may cause upgraded connections to fail.

:::


![Upgrade Path](./assets/airbyte_destinations_v2_upgrade_prompt.png)

After upgrading the out-of-date destination to a [Destinations V2 compatible version](#destinations-v2-effective-versions), the following will occur at the next sync **for each connection** sending data to the updated destination:
Expand Down Expand Up @@ -107,6 +106,9 @@ These steps allow you to dual-write for connections incrementally syncing data w
<TabItem value="snowflake" label="Snowflake">
<SnowflakeMigrationGenerator />
</TabItem>
<TabItem value="redshift" label="Redshift">
<RedshiftMigrationGenerator />
</TabItem>
</Tabs>

2. Navigate to the existing connection you are duplicating, and navigate to the `Settings` tab. Open the `Advanced` settings to see the connection state (which manages incremental syncs). Copy the state to your clipboard.
Expand All @@ -132,7 +134,7 @@ If you have written downstream transformations directly from the output of raw t

- Multiple column names are being updated (from `airbyte_ab_id` to `airbyte_raw_id`, and `airbyte_emitted_at` to `airbyte_extracted_at`).
- The location of raw tables will from now on default to an `airbyte` schema in your destination.
- When you upgrade to a [Destinations V2 compatible version](#destinations-v2-effective-versions) of your destination, we will leave a copy of your existing raw tables as they are, and new syncs will work from a new copy we make in the new `airbyte_internal` schema. Although existing downstream dashboards will go stale, they will not be broken.
- When you upgrade to a [Destinations V2 compatible version](#destinations-v2-effective-versions) of your destination, we will leave a copy of your existing raw tables as they are, and new syncs will work from a new copy we make in the new `airbyte_internal` schema. Although existing downstream dashboards will go stale, they will not be broken.
- You can dual write by following the [steps above](#upgrading-connections-one-by-one-with-dual-writing) and copying your raw data to the schema of your newly created connection.

We may make further changes to raw tables in the future, as these tables are intended to be a staging ground for Airbyte to optimize the performance of your syncs. We cannot guarantee the same level of stability as for final tables in your destination schema, nor will features like error handling be implemented in the raw tables.
Expand All @@ -144,7 +146,7 @@ As a user previously not running Normalization, Upgrading to Destinations V2 wil
For each [CDC-supported](https://docs.airbyte.com/understanding-airbyte/cdc) source connector, we recommend the following:

| CDC Source | Recommendation | Notes |
|------------|--------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| ---------- | ------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Postgres | [Upgrade connection in place](#quick-start-to-upgrading) | You can optionally dual write, but this requires resyncing historical data from the source. You must create a new Postgres source with a different replication slot than your existing source to preserve the integrity of your existing connection. |
| MySQL | [All above upgrade paths supported](#advanced-upgrade-paths) | You can upgrade the connection in place, or dual write. When dual writing, Airbyte can leverage the state of an existing, active connection to ensure historical data is not re-replicated from MySQL. |

Expand All @@ -153,7 +155,7 @@ For each [CDC-supported](https://docs.airbyte.com/understanding-airbyte/cdc) sou
For each destination connector, Destinations V2 is effective as of the following versions:

| Destination Connector | Safe Rollback Version | Destinations V2 Compatible | Upgrade Deadline |
|-----------------------|-----------------------|----------------------------|--------------------------|
| --------------------- | --------------------- | -------------------------- | ------------------------ |
| BigQuery | 1.10.2 | 2.0.6+ | November 7, 2023 |
| Snowflake | 2.1.7 | 3.1.0+ | November 7, 2023 |
| Redshift | 0.6.11 | [coming soon] 2.0.0+ | [coming soon] early 2024 |
Expand Down

0 comments on commit 6d6bd0c

Please sign in to comment.