Skip to content

Commit

Permalink
Adds Resumable Sync for higher stability of syncing large datasets (#…
Browse files Browse the repository at this point in the history
…3266)

Co-authored-by: Nick Zelei <[email protected]>
  • Loading branch information
alishakawaguchi and nickzelei authored Feb 16, 2025
1 parent dd8480b commit b612c8b
Show file tree
Hide file tree
Showing 64 changed files with 4,045 additions and 2,992 deletions.
1 change: 1 addition & 0 deletions backend/gen/go/db/dbschemas/postgresql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 57 additions & 1 deletion backend/gen/go/db/dbschemas/postgresql/system.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

239 changes: 183 additions & 56 deletions backend/gen/go/protos/mgmt/v1alpha1/connection_data.pb.go

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions backend/gen/go/protos/mgmt/v1alpha1/connection_data.pb.json.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2,582 changes: 1,292 additions & 1,290 deletions backend/gen/go/protos/mgmt/v1alpha1/job.pb.go

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions backend/internal/connectiondata/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,23 @@ func (s *SQLConnectionDataService) GetTableConstraints(
}
}

uniqueIndexesMap := map[string]*mgmtv1alpha1.UniqueIndexes{}
for table, uniqueIndexes := range tableConstraints.UniqueIndexes {
uniqueIndexesMap[table] = &mgmtv1alpha1.UniqueIndexes{
Indexes: []*mgmtv1alpha1.UniqueIndex{},
}
for _, ui := range uniqueIndexes {
uniqueIndexesMap[table].Indexes = append(uniqueIndexesMap[table].Indexes, &mgmtv1alpha1.UniqueIndex{
Columns: ui,
})
}
}

return &mgmtv1alpha1.GetConnectionTableConstraintsResponse{
ForeignKeyConstraints: fkConstraintsMap,
PrimaryKeyConstraints: pkConstraintsMap,
UniqueConstraints: uniqueConstraintsMap,
UniqueIndexes: uniqueIndexesMap,
}, nil
}

Expand Down
23 changes: 22 additions & 1 deletion backend/pkg/dbschemas/sql/postgresql/queries/system.sql
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,6 @@ FROM information_schema.table_constraints AS tc
JOIN pg_catalog.pg_constraint AS pgcon
ON pgcon.conname = tc.constraint_name
AND pgcon.connamespace = pn.oid

WHERE
tc.table_schema = ANY(sqlc.arg('schema')::TEXT[])
/* Exclude foreign keys */
Expand Down Expand Up @@ -680,3 +679,25 @@ GROUP BY
referenced_schema.nspname,
referenced_tbl.relname,
ref_columns.foreign_column_names;


-- name: GetUniqueIndexesBySchema :many
SELECT
ns.nspname AS table_schema, -- Schema name for the table
tbl.relname AS table_name, -- Name of the table the index belongs to
idx.relname AS index_name, -- Name of the index
array_agg(col.attname ORDER BY key_info.ordinality)::TEXT[] AS index_columns -- Comma-separated list of index columns
FROM pg_catalog.pg_class AS tbl
-- Join to get the schema information for the table
JOIN pg_catalog.pg_namespace AS ns ON tbl.relnamespace = ns.oid
-- Join to retrieve index metadata for the table
JOIN pg_catalog.pg_index AS idx_meta ON tbl.oid = idx_meta.indrelid
-- Join to get the index object details
JOIN pg_catalog.pg_class AS idx ON idx_meta.indexrelid = idx.oid
-- Unnest the index key attribute numbers along with their ordinal positions
JOIN unnest(idx_meta.indkey) WITH ORDINALITY AS key_info(attnum, ordinality) ON true
-- Join to get the column attributes corresponding to the index keys
JOIN pg_catalog.pg_attribute AS col ON col.attrelid = tbl.oid AND col.attnum = key_info.attnum
WHERE ns.nspname = ANY(sqlc.arg('schema')::TEXT[])
AND idx_meta.indisunique = true
GROUP BY ns.nspname, tbl.relname, idx.relname;
1 change: 1 addition & 0 deletions backend/pkg/mssql-querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Querier interface {
GetRolePermissions(ctx context.Context, db mysql_queries.DBTX) ([]*GetRolePermissionsRow, error)
GetTableConstraintsBySchemas(ctx context.Context, db mysql_queries.DBTX, schemas []string) ([]*GetTableConstraintsBySchemasRow, error)
GetViewsAndFunctionsBySchemas(ctx context.Context, db mysql_queries.DBTX, schemas []string) ([]*GetViewsAndFunctionsBySchemasRow, error)
GetUniqueIndexesBySchema(ctx context.Context, db mysql_queries.DBTX, schemas []string) ([]*GetUniqueIndexesBySchemaRow, error)
}

var _ Querier = (*Queries)(nil)
63 changes: 63 additions & 0 deletions backend/pkg/mssql-querier/system.sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,3 +963,66 @@ func createSchemaTableParams(values []string) (argPlaceholders string, arguments

return strings.Join(placeholders, ","), args
}

const getUniqueIndexesBySchema = `-- name: getUniqueIndexesBySchema :many
SELECT
SCHEMA_NAME(t.schema_id) AS table_schema,
t.name AS table_name,
i.name AS index_name,
index_columns = (
SELECT STRING_AGG(c.name, ', ') WITHIN GROUP (ORDER BY ic.key_ordinal)
FROM sys.index_columns ic
JOIN sys.columns c
ON ic.object_id = c.object_id
AND ic.column_id = c.column_id
WHERE ic.object_id = i.object_id
AND ic.index_id = i.index_id
AND ic.is_included_column = 0
)
FROM sys.indexes i
JOIN sys.tables t
ON i.object_id = t.object_id
WHERE i.type > 0 -- valid index types only
AND i.is_unique = 1 -- only unique indexes
AND i.is_unique_constraint = 0 -- exclude UNIQUE constraints
AND i.is_primary_key = 0 -- exclude primary keys
AND SCHEMA_NAME(t.schema_id) IN (%s)
ORDER BY i.index_id;
`

type GetUniqueIndexesBySchemaRow struct {
TableSchema string
TableName string
IndexName string
IndexColumns string
}

func (q *Queries) GetUniqueIndexesBySchema(ctx context.Context, db mysql_queries.DBTX, schemas []string) ([]*GetUniqueIndexesBySchemaRow, error) {
placeholders, args := createSchemaTableParams(schemas)
query := fmt.Sprintf(getUniqueIndexesBySchema, placeholders)
rows, err := db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*GetUniqueIndexesBySchemaRow
for rows.Next() {
var i GetUniqueIndexesBySchemaRow
if err := rows.Scan(
&i.TableSchema,
&i.TableName,
&i.IndexName,
&i.IndexColumns,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
41 changes: 35 additions & 6 deletions backend/pkg/sqlmanager/mssql/mssql-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
ee_sqlmanager_mssql "github.com/nucleuscloud/neosync/internal/ee/mssql-manager"
"github.com/nucleuscloud/neosync/internal/gotypeutil"
"github.com/nucleuscloud/neosync/internal/neosyncdb"
"golang.org/x/sync/errgroup"
)

type Manager struct {
Expand Down Expand Up @@ -188,18 +189,40 @@ func (m *Manager) GetTableConstraintsBySchema(ctx context.Context, schemas []str
if len(schemas) == 0 {
return &sqlmanager_shared.TableConstraints{}, nil
}
rows, err := m.querier.GetTableConstraintsBySchemas(ctx, m.db, schemas)
if err != nil && !neosyncdb.IsNoRows(err) {

errgrp, errctx := errgroup.WithContext(ctx)
constraints := []*mssql_queries.GetTableConstraintsBySchemasRow{}
errgrp.Go(func() error {
rows, err := m.querier.GetTableConstraintsBySchemas(ctx, m.db, schemas)
if err != nil && !neosyncdb.IsNoRows(err) {
return err
} else if err != nil && neosyncdb.IsNoRows(err) {
return nil
}
constraints = rows
return nil
})

uniqueIndexes := []*mssql_queries.GetUniqueIndexesBySchemaRow{}
errgrp.Go(func() error {
indexes, err := m.querier.GetUniqueIndexesBySchema(errctx, m.db, schemas)
if err != nil {
return err
}
uniqueIndexes = indexes
return nil
})

if err := errgrp.Wait(); err != nil {
return nil, err
} else if err != nil && neosyncdb.IsNoRows(err) {
return &sqlmanager_shared.TableConstraints{}, nil
}

foreignKeyMap := map[string][]*sqlmanager_shared.ForeignConstraint{}
primaryKeyMap := map[string][]string{}
uniqueConstraintsMap := map[string][][]string{}
uniqueIndexesMap := map[string][][]string{}

for _, row := range rows {
for _, row := range constraints {
tableName := sqlmanager_shared.BuildTable(row.SchemaName, row.TableName)
constraintCols := splitAndStrip(row.ConstraintColumns, ", ")

Expand Down Expand Up @@ -245,10 +268,16 @@ func (m *Manager) GetTableConstraintsBySchema(ctx context.Context, schemas []str
}
}

for _, row := range uniqueIndexes {
tableName := sqlmanager_shared.BuildTable(row.TableSchema, row.TableName)
uniqueIndexesMap[tableName] = append(uniqueIndexesMap[tableName], splitAndStrip(row.IndexColumns, ", "))
}

return &sqlmanager_shared.TableConstraints{
ForeignKeyConstraints: foreignKeyMap,
PrimaryKeyConstraints: primaryKeyMap,
UniqueConstraints: uniqueConstraintsMap,
UniqueIndexes: uniqueIndexesMap,
}, nil
}

Expand Down Expand Up @@ -289,7 +318,7 @@ func (m *Manager) GetRolePermissionsMap(ctx context.Context) (map[string][]strin
return schemaTablePrivsMap, err
}

func splitAndStrip(input, delim string) []string {
func splitAndStrip(input, delim string) []string { //nolint:unparam
output := []string{}

for _, piece := range strings.Split(input, delim) {
Expand Down
2 changes: 2 additions & 0 deletions backend/pkg/sqlmanager/mysql/mysql-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ func (m *MysqlManager) GetTableConstraintsBySchema(ctx context.Context, schemas
ForeignKeyConstraints: foreignKeyMap,
PrimaryKeyConstraints: primaryKeyMap,
UniqueConstraints: uniqueConstraintsMap,
// there is no real distinction between unique indexes and unique constraints in mysql
UniqueIndexes: map[string][][]string{},
}, nil
}

Expand Down
17 changes: 17 additions & 0 deletions backend/pkg/sqlmanager/postgres/postgres-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ func (p *PostgresManager) GetTableConstraintsBySchema(ctx context.Context, schem
return nil
})

uniqueIndexes := []*pg_queries.GetUniqueIndexesBySchemaRow{}
errgrp.Go(func() error {
indexes, err := p.querier.GetUniqueIndexesBySchema(errctx, p.db, schemas)
if err != nil {
return err
}
uniqueIndexes = indexes
return nil
})

if err := errgrp.Wait(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -184,10 +194,17 @@ func (p *PostgresManager) GetTableConstraintsBySchema(ctx context.Context, schem
})
}

uniqueIndexesMap := map[string][][]string{}
for _, row := range uniqueIndexes {
tableName := sqlmanager_shared.BuildTable(row.TableSchema, row.TableName)
uniqueIndexesMap[tableName] = append(uniqueIndexesMap[tableName], row.IndexColumns)
}

return &sqlmanager_shared.TableConstraints{
ForeignKeyConstraints: foreignKeyMap,
PrimaryKeyConstraints: primaryKeyMap,
UniqueConstraints: uniqueConstraintsMap,
UniqueIndexes: uniqueIndexesMap,
}, nil
}

Expand Down
5 changes: 5 additions & 0 deletions backend/pkg/sqlmanager/shared/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type TableConstraints struct {
ForeignKeyConstraints map[string][]*ForeignConstraint
PrimaryKeyConstraints map[string][]string
UniqueConstraints map[string][][]string
UniqueIndexes map[string][][]string
}

type DataType struct {
Expand Down Expand Up @@ -183,7 +184,11 @@ type InitSchemaStatements struct {
}

type SelectQuery struct {
// Query is the query used to get all data
Query string
// PageQuery is the query used to get a page of data based on a unique identifier like a primary key in the WHERE clause
PageQuery string
PageLimit int

// If true, this query could return rows that violate foreign key constraints
IsNotForeignKeySafeSubset bool
Expand Down
Loading

0 comments on commit b612c8b

Please sign in to comment.