Skip to content

Commit

Permalink
Use TableSettings instead of strings for wider use cases
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Feb 18, 2025
1 parent b40d3eb commit bdc7309
Show file tree
Hide file tree
Showing 9 changed files with 1,430 additions and 1,375 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,19 @@ var (
)

func commandAddReferenceTables(cmd *cobra.Command, args []string) error {
tableSettings := []*vtctldatapb.TableMaterializeSettings{}
for _, table := range addReferenceTablesOptions.Tables {
tableSettings = append(tableSettings, &vtctldatapb.TableMaterializeSettings{
TargetTable: table,
})
}

_, err := common.GetClient().MaterializeAddTables(common.GetCommandCtx(), &vtctldatapb.MaterializeAddTablesRequest{
Workflow: common.BaseOptions.Workflow,
Keyspace: common.BaseOptions.TargetKeyspace,
Tables: addReferenceTablesOptions.Tables,
Workflow: common.BaseOptions.Workflow,
Keyspace: common.BaseOptions.TargetKeyspace,
TableSettings: tableSettings,
})

if err != nil {
return err
}
Expand Down
2,434 changes: 1,219 additions & 1,215 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

45 changes: 26 additions & 19 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3104,7 +3104,7 @@ func (s *VtctldServer) MaterializeAddTables(ctx context.Context, req *vtctldatap

span.Annotate("workflow", req.Workflow)
span.Annotate("keyspace", req.Keyspace)
span.Annotate("tables", req.Tables)
span.Annotate("table_settings", req.TableSettings)

err = s.ws.MaterializeAddTables(ctx, req)
return resp, err
Expand Down
119 changes: 63 additions & 56 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe
// Each tablet needs its own copy of the request as it will have a unique
// BinlogSource.
tabletReq := req.CloneVT()
tabletReq.BinlogSource, err = mz.generateBinlogSources(mz.ctx, target, sourceShards, streamKeyRangesEqual)
tabletReq.BinlogSource, err = mz.generateBinlogSources(target, sourceShards, streamKeyRangesEqual)
if err != nil {
return err
}
Expand All @@ -178,7 +178,7 @@ func (mz *materializer) getTenantClause() (*sqlparser.Expr, error) {
return getTenantClause(mz.ms.WorkflowOptions, mz.targetVSchema, mz.env.Parser())
}

func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo, keyRangesEqual bool) ([]*binlogdatapb.BinlogSource, error) {
func (mz *materializer) generateBinlogSources(targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo, keyRangesEqual bool) ([]*binlogdatapb.BinlogSource, error) {
blses := make([]*binlogdatapb.BinlogSource, 0, len(mz.sourceShards))
for _, sourceShard := range sourceShards {
bls := &binlogdatapb.BinlogSource{
Expand All @@ -202,69 +202,76 @@ func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *
}

for _, ts := range mz.ms.TableSettings {
rule := &binlogdatapb.Rule{
Match: ts.TargetTable,
rule, err := mz.generateRule(ts, targetShard, tenantClause, keyRangesEqual)
if err != nil {
return nil, err
}
bls.Filter.Rules = append(bls.Filter.Rules, rule)
}
blses = append(blses, bls)
}
return blses, nil
}

if ts.SourceExpression == "" {
bls.Filter.Rules = append(bls.Filter.Rules, rule)
continue
}
func (mz *materializer) generateRule(ts *vtctldatapb.TableMaterializeSettings, targetShard *topo.ShardInfo, tenantClause *sqlparser.Expr, keyRangesEqual bool) (*binlogdatapb.Rule, error) {
rule := &binlogdatapb.Rule{
Match: ts.TargetTable,
}

// Validate non-empty query.
stmt, err := mz.env.Parser().Parse(ts.SourceExpression)
if ts.SourceExpression == "" {
return rule, nil
}

// Validate non-empty query.
stmt, err := mz.env.Parser().Parse(ts.SourceExpression)
if err != nil {
return nil, err
}
sel, ok := stmt.(*sqlparser.Select)
if !ok {
return nil, fmt.Errorf("unrecognized statement: %s", ts.SourceExpression)
}
if !keyRangesEqual && mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference {
cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable])
if err != nil {
return nil, err
}
mappedCols := make([]*sqlparser.ColName, 0, len(cv.Columns))
for _, col := range cv.Columns {
colName, err := matchColInSelect(col, sel)
if err != nil {
return nil, err
}
sel, ok := stmt.(*sqlparser.Select)
if !ok {
return nil, fmt.Errorf("unrecognized statement: %s", ts.SourceExpression)
mappedCols = append(mappedCols, colName)
}
subExprs := make([]sqlparser.Expr, 0, len(mappedCols)+2)
for _, mappedCol := range mappedCols {
subExprs = append(subExprs, mappedCol)
}
var vindexName string
if mz.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate {
// For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the
// SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source.
// Note: it is expected that the source and target keyspaces have the same vindex name and data type.
keyspace := mz.ms.TargetKeyspace
if mz.ms.ExternalCluster != "" {
keyspace = mz.ms.SourceKeyspace
}
if !keyRangesEqual && mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference {
cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable])
if err != nil {
return nil, err
}
mappedCols := make([]*sqlparser.ColName, 0, len(cv.Columns))
for _, col := range cv.Columns {
colName, err := matchColInSelect(col, sel)
if err != nil {
return nil, err
}
mappedCols = append(mappedCols, colName)
}
subExprs := make([]sqlparser.Expr, 0, len(mappedCols)+2)
for _, mappedCol := range mappedCols {
subExprs = append(subExprs, mappedCol)
}
var vindexName string
if mz.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate {
// For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the
// SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source.
// Note: it is expected that the source and target keyspaces have the same vindex name and data type.
keyspace := mz.ms.TargetKeyspace
if mz.ms.ExternalCluster != "" {
keyspace = mz.ms.SourceKeyspace
}
vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name)
} else {
vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
}

subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName))
subExprs = append(subExprs, sqlparser.NewStrLiteral(key.KeyRangeString(targetShard.KeyRange)))
inKeyRange := sqlparser.NewFuncExpr("in_keyrange", subExprs...)
addFilter(sel, inKeyRange)
}
if tenantClause != nil {
addFilter(sel, *tenantClause)
}
rule.Filter = sqlparser.String(sel)
bls.Filter.Rules = append(bls.Filter.Rules, rule)
vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name)
} else {
vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
}
blses = append(blses, bls)

subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName))
subExprs = append(subExprs, sqlparser.NewStrLiteral(key.KeyRangeString(targetShard.KeyRange)))
inKeyRange := sqlparser.NewFuncExpr("in_keyrange", subExprs...)
addFilter(sel, inKeyRange)
}
return blses, nil
if tenantClause != nil {
addFilter(sel, *tenantClause)
}
rule.Filter = sqlparser.String(sel)
return rule, nil
}

func (mz *materializer) deploySchema() error {
Expand Down
Loading

0 comments on commit bdc7309

Please sign in to comment.