Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Importer] Add keep-existing-columns flag #1076

Draft
wants to merge 126 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
Show all changes
126 commits
Select commit Hold shift + click to select a range
658af63
New build system
SferaDev Dec 23, 2022
25f674f
Update pipeline
SferaDev Dec 23, 2022
5372277
Add dependency
SferaDev Dec 23, 2022
0b181fb
Change pipeline config
SferaDev Dec 23, 2022
a87964e
Attempt pnpm cache
SferaDev Dec 23, 2022
5c6cfa2
Swap setup pnpm
SferaDev Dec 23, 2022
3f3ae58
Add to build step
SferaDev Dec 23, 2022
b465240
Update pipeline
SferaDev Dec 23, 2022
9a13652
Add empty changeset
SferaDev Dec 23, 2022
bf10153
Keep vitest old
SferaDev Dec 23, 2022
69432a1
Try
SferaDev Dec 23, 2022
5812e22
Newlines
SferaDev Dec 23, 2022
960cea8
Add EOF
SferaDev Dec 23, 2022
a976d4c
Update command
SferaDev Dec 23, 2022
81b7c2a
Remove prepack
SferaDev Dec 23, 2022
788bed5
Update dependencies
SferaDev Jan 2, 2023
9288261
Add missing dep
SferaDev Jan 2, 2023
5cc40cf
Add one more
SferaDev Jan 2, 2023
7881d6e
Add new importer logic
SferaDev Jan 4, 2023
7faf775
Merge remote-tracking branch 'origin/main' into new-importer
SferaDev Jan 4, 2023
3ea9eac
Merge branch 'main' into new-importer
SferaDev Jan 6, 2023
b6f1f61
Add plugin
SferaDev Jan 9, 2023
7aae51d
Use plugins in CLI
SferaDev Jan 9, 2023
29be927
Add dummy commands
SferaDev Jan 9, 2023
6c832bc
Update command
SferaDev Jan 10, 2023
927b85d
Merge remote-tracking branch 'origin/main' into new-importer
SferaDev Jan 12, 2023
5eced24
Add nyc pizza
SferaDev Jan 13, 2023
cbe497e
Merge branch 'main' into new-importer
SferaDev Jan 13, 2023
aad8e65
Updated client readme (#855)
richardgill Jan 13, 2023
9ca8ec0
Update dependencies (#857)
xata-bot Jan 16, 2023
8dad3c1
Reduce column selection depth (#852)
SferaDev Jan 16, 2023
f1cbb09
Fix execution of workflow with pnpm (#859)
SferaDev Jan 16, 2023
0722fda
Merge remote-tracking branch 'origin/main' into new-importer
SferaDev Jan 16, 2023
20541fa
Work in progress changes
SferaDev Jan 18, 2023
18af9ab
Merge remote-tracking branch 'origin/main' into new-importer
SferaDev Feb 20, 2023
21ac365
Merge remote-tracking branch 'origin/main' into new-importer
SferaDev Feb 23, 2023
4fb27da
Refactor to use XataApiPlugin
SferaDev Feb 23, 2023
7bfc9f5
Merge remote-tracking branch 'origin/refactor-plugin' into new-importer
SferaDev Feb 23, 2023
7f2749c
Merge remote-tracking branch 'origin/main' into new-importer
SferaDev Apr 4, 2023
10fd251
got code running
richardgill Jun 13, 2023
361355f
added some tests
richardgill Jun 16, 2023
5a16b81
rename import -> parse
richardgill Jun 16, 2023
22f75ca
changed api interfaces
richardgill Jun 16, 2023
d38257a
removed unneeded code
richardgill Jun 16, 2023
993d78f
WIP
richardgill Jun 19, 2023
f0717f1
WIP
richardgill Jun 21, 2023
70a3755
tidy up
richardgill Jun 22, 2023
2401555
import with retries working
richardgill Jun 23, 2023
5a7c09e
progress calculation
richardgill Jun 23, 2023
1e7e24c
progress on cli
richardgill Jun 23, 2023
0271e60
tidy up
richardgill Jun 23, 2023
32a460b
removed glob
richardgill Jun 23, 2023
2abad0d
more tidy
richardgill Jun 23, 2023
8043c31
fixed tests
richardgill Jun 23, 2023
2ca0342
merge in main
richardgill Jun 23, 2023
aae0af0
remove ndjson code
richardgill Jun 23, 2023
c93bb36
removed unused code
richardgill Jun 23, 2023
7fde609
removed ndJson properly
richardgill Jun 23, 2023
3e81ad7
removed failing test
richardgill Jun 23, 2023
e0cfbae
improvements
richardgill Jun 23, 2023
b621214
return csv meta
richardgill Jun 26, 2023
e61ae35
concurrency of transaction API calls
richardgill Jun 26, 2023
fecd58b
slight optimization
richardgill Jun 26, 2023
1f28772
perf improvement
richardgill Jun 26, 2023
f71322d
improvements / todos
richardgill Jun 26, 2023
c493770
removed auto delimiter
richardgill Jun 27, 2023
87f1750
allow any delimiter
richardgill Jun 27, 2023
915523b
empty
richardgill Jun 28, 2023
1f7fc52
small improvements
richardgill Jun 30, 2023
16a2a12
fixed types
richardgill Jun 30, 2023
4628d00
added comment about applying schema edits
richardgill Jun 30, 2023
4073865
merged in main
richardgill Jun 30, 2023
0941603
fixed build
richardgill Jun 30, 2023
ecc64f8
fixed build
richardgill Jun 30, 2023
4d42b80
fixed tests
richardgill Jun 30, 2023
1cfe20b
switched function syntax
richardgill Jun 30, 2023
7b51e27
column test cases
richardgill Jun 30, 2023
48afa56
cleaned up
richardgill Jun 30, 2023
5b17b12
coerce rows test cases
richardgill Jul 3, 2023
445e677
tidied up code
richardgill Jul 3, 2023
a9d9a15
add tests for csv parsing
richardgill Jul 4, 2023
481c656
upgraded pnpm
richardgill Jul 4, 2023
68f063a
updated lock file
richardgill Jul 4, 2023
cf5df07
updated npm install instructions for dev builds
richardgill Jul 4, 2023
90b9b26
added csv parsing tests
richardgill Jul 4, 2023
ddc80b0
removed snapshot tests
richardgill Jul 4, 2023
0bc3da7
switched boolean to be a function
richardgill Jul 4, 2023
54a7550
tidied up tests
richardgill Jul 4, 2023
231e393
refactored csvStreamParser
richardgill Jul 4, 2023
858b2f0
jsdoc comments
richardgill Jul 4, 2023
27e92f2
tidied whitespace
richardgill Jul 4, 2023
b14dfdf
added missing jsdoc comment
richardgill Jul 4, 2023
948fce7
reverted CLI code closer to orignal
richardgill Jul 4, 2023
3f78437
merged in main
richardgill Jul 5, 2023
79a83b3
upgraded any-date-parser
richardgill Jul 5, 2023
1e09bfa
moved line to minimize diff
richardgill Jul 5, 2023
d2af569
fixed line to minimize diff
richardgill Jul 5, 2023
5499077
fixed file handle warning
richardgill Jul 5, 2023
55c7729
validate column types arg
richardgill Jul 5, 2023
0c120de
used batchSize
richardgill Jul 5, 2023
1677bfd
migrate schema
richardgill Jul 5, 2023
b202227
added more test cases for datetimes
richardgill Jul 5, 2023
376fd59
removed find table
richardgill Jul 5, 2023
2d156ac
removed normalization flag
richardgill Jul 5, 2023
a66ef61
added comment
richardgill Jul 5, 2023
8f50b64
whitespace
richardgill Jul 6, 2023
118dcb9
to lower case
richardgill Jul 6, 2023
75e8446
fixed progress
richardgill Jul 6, 2023
8e3c0e6
small fixes
richardgill Jul 6, 2023
ae96f58
fixed build
richardgill Jul 6, 2023
b0287b4
fixed BOM
richardgill Jul 6, 2023
c6bb37c
added import batch retry logic
richardgill Jul 6, 2023
f3aa35b
removed comment
richardgill Jul 6, 2023
51c2548
added delimitersToGuess flag
richardgill Jul 7, 2023
0869b30
added error logging
richardgill Jul 7, 2023
0082c90
fixed id import
richardgill Jul 7, 2023
56ffba1
cache database parse resonse & allow create
richardgill Jul 7, 2023
5c214b4
id columns are always strings
richardgill Jul 11, 2023
a57fd54
added destructive confirmation
richardgill Jul 11, 2023
fc2cb2a
Merge branch 'main' of https://github.com/xataio/client-ts into new-i…
richardgill Jul 11, 2023
517ab58
Create wise-glasses-swim.md
richardgill Jul 11, 2023
634650f
updated changelog
richardgill Jul 11, 2023
5edc1a6
Allow links
SferaDev Jul 12, 2023
0b86e48
Add keep-existing-columns
SferaDev Jul 12, 2023
174dfca
Merge remote-tracking branch 'origin/main' into cli-importer-improvem…
SferaDev Jul 12, 2023
d0c81b8
Add comment
SferaDev Jul 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 55 additions & 7 deletions cli/src/commands/import/csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
'null-value': Flags.string({
description: 'Value to use for null values',
multiple: true
}),
'keep-existing-columns': Flags.boolean({
description: 'Whether to keep existing columns when updating the schema'
})
};

Expand All @@ -73,7 +76,8 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
'max-rows': limit,
delimiter,
'delimiters-to-guess': delimitersToGuess,
'null-value': nullValues
'null-value': nullValues,
'keep-existing-columns': keepExistingColumns
} = flags;
const header = !noHeader;
let columns = flagsToColumns(flags);
Expand Down Expand Up @@ -104,44 +108,55 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
if (!parseResults.success) {
throw new Error(`Failed to parse CSV file ${parseResults.errors.join(' ')}`);
}

if (!columns) {
columns = parseResults.columns;
}

if (!columns) {
throw new Error('No columns found');
}
await this.migrateSchema({ table, columns, create });

const { schemaColumns } = await this.migrateSchema({ table, columns, create, keepExistingColumns });

let importSuccessCount = 0;
const errors: string[] = [];
let progress = 0;
const fileStream = await getFileStream();

await xata.import.parseCsvStreamBatches({
fileStream: fileStream,
fileSizeBytes: await getFileSizeBytes(file),
parserOptions: { ...csvOptions, columns },
parserOptions: { ...csvOptions, columns: schemaColumns },
batchRowCount: batchSize,
onBatch: async (parseResults, meta) => {
if (!parseResults.success) {
throw new Error('Failed to parse CSV file');
}

const dbBranchName = `${database}:${branch}`;
const importResult = await xata.import.importBatch(
// @ts-ignore
// @ts-ignore - TODO: fix this
{ dbBranchName: dbBranchName, region, workspace: workspace, database },
{ columns: parseResults.columns, table, batch: parseResults }
);

importSuccessCount += importResult.successful.results.length;

if (importResult.errors) {
const formattedErrors = importResult.errors.map(
(error) => `${error.error}. Record: ${JSON.stringify(error.row)}`
);

const errorsToLog = formattedErrors.slice(0, Math.abs(ERROR_CONSOLE_LOG_LIMIT - errors.length));

for (const error of errorsToLog) {
this.logToStderr(`Import Error: ${error}`);
}

errors.push(...formattedErrors);
}

progress = Math.max(progress, meta.estimatedProgress);
this.info(
`${importSuccessCount} rows successfully imported ${errors.length} errors. ${Math.ceil(
Expand All @@ -150,10 +165,12 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
);
}
});

if (errors.length > 0) {
await writeFile(ERROR_LOG_FILE, errors.join('\n'), 'utf8');
this.log(`Import errors written to ${ERROR_LOG_FILE}`);
}

fileStream.close();
this.success('Completed');
process.exit(0);
Expand All @@ -174,12 +191,14 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
async migrateSchema({
table,
columns,
create
create,
keepExistingColumns = false
}: {
table: string;
columns: Column[];
create: boolean;
}): Promise<void> {
keepExistingColumns?: boolean;
}): Promise<{ schemaColumns: Column[] }> {
const xata = await this.getXataClient();
const { workspace, region, database, branch } = await this.parseDatabase();
const { schema: existingSchema } = await xata.api.branches.getBranchDetails({
Expand All @@ -188,19 +207,24 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
database,
branch
});

const existingColumns = existingSchema.tables.find((t) => t.name === table)?.columns ?? [];
const schemaColumns = getSchemaColumns({ newColumns: columns, existingColumns, keepExistingColumns });
const newSchema = {
tables: [
...existingSchema.tables.filter((t) => t.name !== table),
{ name: table, columns: columns.filter((c) => c.name !== 'id') }
{ name: table, columns: schemaColumns.filter(({ name }) => name !== 'id') }
]
};

const { edits } = await xata.api.migrations.compareBranchWithUserSchema({
workspace,
region,
database,
branch: 'main',
schema: newSchema
});

if (edits.operations.length > 0) {
const destructiveOperations = edits.operations
.map((op) => {
Expand Down Expand Up @@ -241,6 +265,8 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
}
await xata.api.migrations.applyBranchSchemaEdit({ workspace, region, database, branch, edits });
}

return { schemaColumns };
}
}

Expand All @@ -249,12 +275,15 @@ const flagsToColumns = (flags: {
columns: string | undefined;
}): Schemas.Column[] | undefined => {
if (!flags.columns && !flags.types) return undefined;

if (flags.columns && !flags.types) {
throw new Error('Must specify types when specifying columns');
}

if (!flags.columns && flags.types) {
throw new Error('Must specify columns when specifying types');
}

const columns = splitCommas(flags.columns);
const types = splitCommas(flags.types);
const invalidTypes = types.filter((t) => !importColumnTypes.safeParse(t).success);
Expand All @@ -270,11 +299,15 @@ const flagsToColumns = (flags: {
if (columns?.length !== types?.length) {
throw new Error('Must specify same number of columns and types');
}

return columns.map((name, i) => {
const type = importColumnTypes.parse(types[i]);

// For link columns, we do a best effort to guess the table name
if (type === 'link') {
return { name, type, link: { table: name } };
}

return { name, type };
});
};
Expand All @@ -292,3 +325,18 @@ const getFileSizeBytes = async (file: string) => {
await fileHandle.close();
return stat.size;
};

const getSchemaColumns = ({
newColumns,
existingColumns,
keepExistingColumns
}: {
newColumns: Column[];
existingColumns: Column[];
keepExistingColumns: boolean;
}): Column[] => {
if (!keepExistingColumns) return newColumns;

const columnsToCreate = newColumns.filter(({ name }) => !existingColumns.find((c) => c.name === name));
return [...existingColumns, ...columnsToCreate];
};