Skip to content

Commit

Permalink
Minor tweaks
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Feb 1, 2025
1 parent b87608b commit ab995b5
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
6 changes: 3 additions & 3 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,8 +1093,8 @@ func TestVStreamHeartbeats(t *testing.T) {

// TestVStreamPushdownFilters confirms that pushdown filters are applied correctly
// when they are specified in the VStream API via the rule.Filter.
// It also confirms that we use the proper collations for the vstream filter when
// using varchar fields.
// It also confirms that we use the proper collation for the VStream filter when
// using VARCHAR fields.
func TestVStreamPushdownFilters(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
Expand All @@ -1118,7 +1118,7 @@ func TestVStreamPushdownFilters(t *testing.T) {
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

// Make sure that we get at least one paul event in the copy phase.
// Make sure that we get at least one paul row event in the copy phase.
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('PAUĹ')", ks), 1, false)
require.NoError(t, err)

Expand Down
18 changes: 10 additions & 8 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ type Plan struct {
// of the table.
Filters []Filter

// Predicates in the Filter query that we can push down to
// MySQL to reduce the returned rows we need to filter.
// This will contain any valid expressions in the Filter's
// WHERE clause with the exception of the in_keyrange()
// function which is a filter that must be applied by the
// vstreamer (it's not a valid MySQL function).
// Predicates in the Filter query that we can push down to MySQL
// to reduce the returned rows we need to filter in the VStreamer
// during the copy phase. This will contain any valid expressions
// in the Filter's WHERE clause with the exception of the
// in_keyrange() function which is a filter that must be applied
// by the VStreamer (it's not a valid MySQL function). Note that
// the Filter cannot contain any MySQL functions because the
// VStreamer cannot filter binlog events using them.
whereExprsToPushDown []sqlparser.Expr

// Convert any integer values seen in the binlog events for ENUM or SET
Expand Down Expand Up @@ -631,8 +633,8 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
log.Errorf("DEBUG: adding to list of pushdown expressions: %v", sqlparser.String(expr))
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
case *sqlparser.FuncExpr:
// We cannot filter binlog events in vstreamer using MySQL functions so
// we only allow the in_keyrange() function, which is vstreamer specific.
// We cannot filter binlog events in VStreamer using MySQL functions so
// we only allow the in_keyrange() function, which is VStreamer specific.
if !expr.Name.EqualString("in_keyrange") {
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error
indexHint = fmt.Sprintf(" force index (%s)", escapedPKIndexName)
}
buf.Myprintf(" from %v%s", sqlparser.NewIdentifierCS(rs.plan.Table.Name), indexHint)
if len(rs.lastpk) != 0 {
if len(rs.lastpk) != 0 { // We're in the copy phase and need to resume
if len(rs.lastpk) != len(rs.pkColumns) {
return "", fmt.Errorf("cannot build a row streamer plan for the %s table as a lastpk value was provided and the number of primary key values within it (%v) does not match the number of primary key columns in the table (%d)",
st.Name, rs.lastpk, rs.pkColumns)
}
buf.WriteString(" where ")
addPushdownExpressions()
// First we add any predicates that should be pushed down.
addPushdownExpressions()
if len(rs.plan.whereExprsToPushDown) > 0 {
buf.Myprintf(" and ")
}
Expand All @@ -318,7 +318,7 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error
rs.lastpk[lastcol].EncodeSQL(buf)
buf.Myprintf(")")
}
} else if len(rs.plan.whereExprsToPushDown) > 0 {
} else if len(rs.plan.whereExprsToPushDown) > 0 { // We're in the running/replicating phase
buf.Myprintf(" where ")
addPushdownExpressions()
}
Expand Down

0 comments on commit ab995b5

Please sign in to comment.