Skip to content

Commit

Permalink
Added the backend changes for check constraints
Browse files Browse the repository at this point in the history
  • Loading branch information
Vivek Yadav committed Nov 22, 2024
1 parent d51ff7d commit 4c22d00
Show file tree
Hide file tree
Showing 17 changed files with 809 additions and 71 deletions.
1 change: 1 addition & 0 deletions internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ const (
SequenceCreated
ForeignKeyActionNotSupported
NumericPKNotSupported
TypeMismatch
)

const (
Expand Down
7 changes: 6 additions & 1 deletion internal/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

type Counter struct {
counterMutex sync.Mutex
ObjectId string
ObjectId string
}

var Cntr Counter
Expand Down Expand Up @@ -65,6 +65,11 @@ func GenerateForeignkeyId() string {
func GenerateIndexesId() string {
return GenerateId("i")
}

func GenerateCheckConstrainstId() string {
return GenerateId("ck")
}

func GenerateRuleId() string {
return GenerateId("r")
}
Expand Down
13 changes: 13 additions & 0 deletions internal/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,19 @@ func ToSpannerIndexName(conv *Conv, srcIndexName string) string {
return getSpannerValidName(conv, srcIndexName)
}

// Note that the check constraints names in spanner have to be globally unique
// (across the database). But in some source databases, such as MySQL,
// they only have to be unique for a table. Hence we must map each source
// constraint name to a unique spanner constraint name.
func ToSpannerCheckConstraintName(conv *Conv, srcCheckConstraintName string) string {
return getSpannerValidName(conv, srcCheckConstraintName)
}

func GetSpannerValidExpression(cks []ddl.Checkconstraint) []ddl.Checkconstraint {
// TODO validate the check constraints data with batch verification then send back
return cks
}

// conv.UsedNames tracks Spanner names that have been used for table names, foreign key constraints
// and indexes. We use this to ensure we generate unique names when
// we map from source dbs to Spanner since Spanner requires all these names to be
Expand Down
7 changes: 7 additions & 0 deletions internal/reports/report_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ func buildTableReportBody(conv *internal.Conv, tableId string, issues map[string
Description: fmt.Sprintf("UNIQUE constraint on column(s) '%s' replaced with primary key since table '%s' didn't have one. Spanner requires a primary key for every table", strings.Join(uniquePK, ", "), conv.SpSchema[tableId].Name),
}
l = append(l, toAppend)
case internal.TypeMismatch:
toAppend := Issue{
Category: IssueDB[i].Category,
Description: fmt.Sprintf("Table '%s': Type mismatch in '%s'column affecting check constraints. Verify data type compatibility with constraint logic", conv.SpSchema[tableId].Name, conv.SpSchema[tableId].ColDefs[colId].Name),
}
l = append(l, toAppend)

default:
toAppend := Issue{
Category: IssueDB[i].Category,
Expand Down
38 changes: 23 additions & 15 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,27 @@ import (

// Table represents a database table.
type Table struct {
Name string
Schema string
ColIds []string // List of column Ids (for predictable iteration order e.g. printing).
ColDefs map[string]Column // Details of columns.
ColNameIdMap map[string]string `json:"-"` // Computed every time just after conv is generated or after any column renaming
PrimaryKeys []Key
ForeignKeys []ForeignKey
Indexes []Index
Id string
Name string
Schema string
ColIds []string // List of column Ids (for predictable iteration order e.g. printing).
ColDefs map[string]Column // Details of columns.
ColNameIdMap map[string]string `json:"-"` // Computed every time just after conv is generated or after any column renaming
PrimaryKeys []Key
ForeignKeys []ForeignKey
CheckConstraints []CheckConstraints
Indexes []Index
Id string
}

// Column represents a database column.
// TODO: add support for foreign keys.
type Column struct {
Name string
Type Type
NotNull bool
Ignored Ignored
Id string
AutoGen ddl.AutoGenCol
Name string
Type Type
NotNull bool
Ignored Ignored
Id string
AutoGen ddl.AutoGenCol
}

// ForeignKey represents a foreign key.
Expand All @@ -76,6 +77,13 @@ type ForeignKey struct {
Id string
}

// CheckConstraints represents a Check Constrainst.
type CheckConstraints struct {
Name string
Expr string
Id string
}

// Key respresents a primary key or index key.
type Key struct {
ColId string
Expand Down
23 changes: 12 additions & 11 deletions sources/common/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type InfoSchema interface {
GetColumns(conv *internal.Conv, table SchemaAndName, constraints map[string][]string, primaryKeys []string) (map[string]schema.Column, []string, error)
GetRowsFromTable(conv *internal.Conv, srcTable string) (interface{}, error)
GetRowCount(table SchemaAndName) (int64, error)
GetConstraints(conv *internal.Conv, table SchemaAndName) ([]string, map[string][]string, error)
GetConstraints(conv *internal.Conv, table SchemaAndName) ([]string, []schema.CheckConstraints, map[string][]string, error)
GetForeignKeys(conv *internal.Conv, table SchemaAndName) (foreignKeys []schema.ForeignKey, err error)
GetIndexes(conv *internal.Conv, table SchemaAndName, colNameIdMp map[string]string) ([]schema.Index, error)
ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, spCols []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error
Expand Down Expand Up @@ -185,7 +185,7 @@ func (is *InfoSchemaImpl) processTable(conv *internal.Conv, table SchemaAndName,
var t schema.Table
fmt.Println("processing schema for table", table)
tblId := internal.GenerateTableId()
primaryKeys, constraints, err := infoSchema.GetConstraints(conv, table)
primaryKeys, checkConstraints, constraints, err := infoSchema.GetConstraints(conv, table)
if err != nil {
return t, fmt.Errorf("couldn't get constraints for table %s.%s: %s", table.Schema, table.Name, err)
}
Expand Down Expand Up @@ -215,15 +215,16 @@ func (is *InfoSchemaImpl) processTable(conv *internal.Conv, table SchemaAndName,
schemaPKeys = append(schemaPKeys, schema.Key{ColId: colNameIdMap[k]})
}
t = schema.Table{
Id: tblId,
Name: name,
Schema: table.Schema,
ColIds: colIds,
ColNameIdMap: colNameIdMap,
ColDefs: colDefs,
PrimaryKeys: schemaPKeys,
Indexes: indexes,
ForeignKeys: foreignKeys}
Id: tblId,
Name: name,
Schema: table.Schema,
ColIds: colIds,
ColNameIdMap: colNameIdMap,
ColDefs: colDefs,
PrimaryKeys: schemaPKeys,
CheckConstraints: checkConstraints,
Indexes: indexes,
ForeignKeys: foreignKeys}
return t, nil
}

Expand Down
32 changes: 24 additions & 8 deletions sources/common/toddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,15 @@ func (ss *SchemaToSpannerImpl) SchemaToSpannerDDLHelper(conv *internal.Conv, tod
}
comment := "Spanner schema for source table " + quoteIfNeeded(srcTable.Name)
conv.SpSchema[srcTable.Id] = ddl.CreateTable{
Name: spTableName,
ColIds: spColIds,
ColDefs: spColDef,
PrimaryKeys: cvtPrimaryKeys(srcTable.PrimaryKeys),
ForeignKeys: cvtForeignKeys(conv, spTableName, srcTable.Id, srcTable.ForeignKeys, isRestore),
Indexes: cvtIndexes(conv, srcTable.Id, srcTable.Indexes, spColIds, spColDef),
Comment: comment,
Id: srcTable.Id}
Name: spTableName,
ColIds: spColIds,
ColDefs: spColDef,
PrimaryKeys: cvtPrimaryKeys(srcTable.PrimaryKeys),
ForeignKeys: cvtForeignKeys(conv, spTableName, srcTable.Id, srcTable.ForeignKeys, isRestore),
CheckConstraint: cvtCheckConstraint(conv, srcTable.CheckConstraints),
Indexes: cvtIndexes(conv, srcTable.Id, srcTable.Indexes, spColIds, spColDef),
Comment: comment,
Id: srcTable.Id}
return nil
}

Expand Down Expand Up @@ -234,6 +235,21 @@ func cvtForeignKeys(conv *internal.Conv, spTableName string, srcTableId string,
return spKeys
}

func cvtCheckConstraint(conv *internal.Conv, srcKeys []schema.CheckConstraints) []ddl.Checkconstraint {
var spcks []ddl.Checkconstraint

for _, cks := range srcKeys {
spcks = append(spcks, ddl.Checkconstraint{
Id: cks.Id,
Name: internal.ToSpannerCheckConstraintName(conv, cks.Name),
Expr: cks.Expr,
})

}

return internal.GetSpannerValidExpression(spcks)
}

func CvtForeignKeysHelper(conv *internal.Conv, spTableName string, srcTableId string, srcKey schema.ForeignKey, isRestore bool) (ddl.Foreignkey, error) {
if len(srcKey.ColIds) != len(srcKey.ReferColumnIds) {
conv.Unexpected(fmt.Sprintf("ConvertForeignKeys: ColIds and referColumns don't have the same lengths: len(columns)=%d, len(referColumns)=%d for source tableId: %s, referenced table: %s", len(srcKey.ColIds), len(srcKey.ReferColumnIds), srcTableId, srcKey.ReferTableId))
Expand Down
30 changes: 30 additions & 0 deletions sources/common/toddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,33 @@ func Test_SchemaToSpannerSequenceHelper(t *testing.T) {
assert.Equal(t, expectedConv, conv)
}
}
func Test_cvtCheckContraint(t *testing.T) {

conv := internal.MakeConv()
srcSchema := []schema.CheckConstraints{
{
Id: "ck1",
Name: "check_1",
Expr: "age > 0",
},
{
Id: "ck1",
Name: "check_2",
Expr: "age < 99",
},
}
spSchema := []ddl.Checkconstraint{
{
Id: "ck1",
Name: "check_1",
Expr: "age > 0",
},
{
Id: "ck1",
Name: "check_2",
Expr: "age < 99",
},
}
result := cvtCheckConstraint(conv, srcSchema)
assert.Equal(t, spSchema, result)
}
100 changes: 89 additions & 11 deletions sources/mysql/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (isi InfoSchemaImpl) GetColumns(conv *internal.Conv, table common.SchemaAnd
// We've already filtered out PRIMARY KEY.
switch c {
case "CHECK":
ignored.Check = true

case "FOREIGN KEY", "PRIMARY KEY", "UNIQUE":
// Nothing to do here -- these are all handled elsewhere.
}
Expand Down Expand Up @@ -237,29 +237,107 @@ func (isi InfoSchemaImpl) GetColumns(conv *internal.Conv, table common.SchemaAnd
// other constraints. Note: we need to preserve ordinal order of
// columns in primary key constraints.
// Note that foreign key constraints are handled in getForeignKeys.
func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) ([]string, map[string][]string, error) {
func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) ([]string, []schema.CheckConstraints, map[string][]string, error) {
q := `SELECT k.COLUMN_NAME, t.CONSTRAINT_TYPE
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS t
INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS k
ON t.CONSTRAINT_NAME = k.CONSTRAINT_NAME AND t.CONSTRAINT_SCHEMA = k.CONSTRAINT_SCHEMA AND t.TABLE_NAME=k.TABLE_NAME
WHERE k.TABLE_SCHEMA = ? AND k.TABLE_NAME = ? ORDER BY k.ordinal_position;`
rows, err := isi.Db.Query(q, table.Schema, table.Name)

q1 := `SELECT
COALESCE(k.COLUMN_NAME, '') AS COLUMN_NAME,
t.CONSTRAINT_NAME,
t.CONSTRAINT_TYPE,
COALESCE(c.CHECK_CLAUSE, '') AS CHECK_CLAUSE
FROM
INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS t
LEFT JOIN
INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS k
ON t.CONSTRAINT_NAME = k.CONSTRAINT_NAME
AND t.CONSTRAINT_SCHEMA = k.CONSTRAINT_SCHEMA
AND t.TABLE_NAME = k.TABLE_NAME
LEFT JOIN
INFORMATION_SCHEMA.CHECK_CONSTRAINTS AS c
ON t.CONSTRAINT_NAME = c.CONSTRAINT_NAME
WHERE
t.TABLE_SCHEMA = ?
AND t.TABLE_NAME = ?
ORDER BY k.ORDINAL_POSITION;
`
checkQuery := `SELECT COUNT(*)
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'INFORMATION_SCHEMA'
AND TABLE_NAME = 'CHECK_CONSTRAINTS';`
var tableExistsCount int
rows1, err := isi.Db.Query(checkQuery)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
for rows1.Next() {
err1 := rows1.Scan(&tableExistsCount)
if err1 != nil {
conv.Unexpected(fmt.Sprintf("Can't scan: %v", err))
return nil, nil, nil, err
}
}

defer rows1.Close()

tableExists := tableExistsCount > 0

var finalQuery string
if tableExists {
finalQuery = q1
} else {
finalQuery = q
}

rows, err := isi.Db.Query(finalQuery, table.Schema, table.Name)

if err != nil {
return nil, nil, nil, err
}
defer rows.Close()
var primaryKeys []string
var col, constraint string
var checkKeys []schema.CheckConstraints
var col, constraintName, constraint, checkClause string
m := make(map[string][]string)
for rows.Next() {
err := rows.Scan(&col, &constraint)
if err != nil {
conv.Unexpected(fmt.Sprintf("Can't scan: %v", err))
continue
if tableExists {
err := rows.Scan(&col, &constraintName, &constraint, &checkClause)
if err != nil {
conv.Unexpected(fmt.Sprintf("Can't scan: %v", err))
continue
}
} else {
err := rows.Scan(&col, &constraintName, &constraint, &checkClause)
if err != nil {
conv.Unexpected(fmt.Sprintf("Can't scan: %v", err))
continue
}
}
if col == "" || constraint == "" {
conv.Unexpected(fmt.Sprintf("Got empty col or constraint"))

if tableExists {
if constraintName == "" || checkClause == "" {
conv.Unexpected(fmt.Sprintf("Got empty constraintName or checkClause"))
continue
}
switch constraint {
case "CHECK":
checkClause = strings.ReplaceAll(checkClause, "_utf8mb4\\", "")
checkClause = strings.ReplaceAll(checkClause, "\\", "")

checkKeys = append(checkKeys, schema.CheckConstraints{Name: constraintName, Expr: string(checkClause), Id: internal.GenerateCheckConstrainstId()})
default:
m[col] = append(m[col], constraint)
}
} else {
conv.Unexpected(fmt.Sprintf("Got empty col or constraint"))
}

continue

}
switch constraint {
case "PRIMARY KEY":
Expand All @@ -268,7 +346,7 @@ func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.Schem
m[col] = append(m[col], constraint)
}
}
return primaryKeys, m, nil
return primaryKeys, checkKeys, m, nil
}

// GetForeignKeys return list all the foreign keys constraints.
Expand Down
Loading

0 comments on commit 4c22d00

Please sign in to comment.