diff --git a/go/testdata/small-slow-query-transactions.json b/go/testdata/small-slow-query-transactions.json index ce2669e..2061b06 100644 --- a/go/testdata/small-slow-query-transactions.json +++ b/go/testdata/small-slow-query-transactions.json @@ -2,17 +2,51 @@ "fileType": "transactions", "signatures": [ { + "count": 2, "query-signatures": [ - "update tblA where foo = :0 and id = :1 set apa", - "update tblB where bar = :0 and id = :2 set monkey" - ], - "predicates": [ - "tblA.foo = 0", - "tblA.id = ?", - "tblB.bar = 0", - "tblB.id = ?" - ], - "count": 2 + { + "op": "update", + "affected_table": "tblA", + "updated_columns": [ + "apa" + ], + "predicates": [ + { + "table": "tblA", + "col": "foo", + "op": 0, + "val": 0 + }, + { + "table": "tblA", + "col": "id", + "op": 0, + "val": -1 + } + ] + }, + { + "op": "update", + "affected_table": "tblB", + "updated_columns": [ + "monkey" + ], + "predicates": [ + { + "table": "tblB", + "col": "bar", + "op": 0, + "val": 0 + }, + { + "table": "tblB", + "col": "id", + "op": 0, + "val": -1 + } + ] + } + ] } ] } diff --git a/go/transactions/transaction_signature.go b/go/transactions/transaction_signature.go index 3af1248..0bfc75b 100644 --- a/go/transactions/transaction_signature.go +++ b/go/transactions/transaction_signature.go @@ -17,9 +17,9 @@ limitations under the License. package transactions import ( - "cmp" "encoding/json" "fmt" + "hash" "hash/fnv" "sort" "strconv" @@ -29,9 +29,15 @@ import ( type ( TxSignature struct { - Queries []string `json:"queries"` - Predicates []predicateInfo `json:"predicates"` - Count int `json:"count"` + Count int `json:"count"` + Queries []TxQuery `json:"qqueries"` + } + + TxQuery struct { + Op string `json:"op"` + AffectedTable string `json:"affected_table"` + UpdatedColumns []string `json:"updated_columns,omitempty"` + Predicates []predicateInfo `json:"predicates,omitempty"` } txSignatureMap struct { @@ -39,10 +45,10 @@ type ( } predicateInfo struct { - Table string - Col string - Op sqlparser.ComparisonExprOperator - Val int + Table string `json:"table"` + Col string `json:"col"` + Op sqlparser.ComparisonExprOperator `json:"op"` + Val int `json:"val"` } ) @@ -54,33 +60,13 @@ func (pi predicateInfo) String() string { return fmt.Sprintf("%s.%s %s %s", pi.Table, pi.Col, pi.Op.ToString(), val) } -func (pi predicateInfo) compareTo(b predicateInfo) int { - if pi.Table != b.Table { - return cmp.Compare(pi.Table, b.Table) - } - if pi.Col != b.Col { - return cmp.Compare(pi.Col, b.Col) - } - if pi.Op != b.Op { - return cmp.Compare(pi.Op, b.Op) - } - return cmp.Compare(pi.Val, b.Val) -} - func (tx *TxSignature) MarshalJSON() ([]byte, error) { - predicateStrings := make([]string, len(tx.Predicates)) - for i, predicate := range tx.Predicates { - predicateStrings[i] = predicate.String() - } - return json.Marshal(struct { - Queries []string `json:"query-signatures"` - Predicates []string `json:"predicates"` - Count int `json:"count"` + Count int `json:"count"` + Queries []TxQuery `json:"query-signatures"` }{ - Queries: tx.Queries, - Predicates: predicateStrings, - Count: tx.Count, + Count: tx.Count, + Queries: tx.Queries, }) } @@ -88,35 +74,55 @@ func (tx *TxSignature) Hash64() uint64 { hasher := fnv.New64a() for _, query := range tx.Queries { - _, _ = hasher.Write([]byte(query)) - _, _ = hasher.Write([]byte{0}) - } - - for _, pred := range tx.Predicates { - _, _ = hasher.Write([]byte(pred.String())) - _, _ = hasher.Write([]byte{0}) + query.addToHash(hasher) } return hasher.Sum64() } -func (tx *TxSignature) addPredicate(predicates []predicateInfo) { - for _, predicate := range predicates { - index := sort.Search(len(tx.Predicates), func(i int) bool { - return tx.Predicates[i].compareTo(predicate) >= 0 - }) +func (tx TxQuery) addToHash(hash hash.Hash64) { + _, _ = hash.Write([]byte(tx.Op)) + _, _ = hash.Write([]byte{0}) + _, _ = hash.Write([]byte(tx.AffectedTable)) + _, _ = hash.Write([]byte{0}) - if index < len(tx.Predicates) && tx.Predicates[index].compareTo(predicate) == 0 { - continue // Predicate already exists; skip it - } + for _, col := range tx.UpdatedColumns { + _, _ = hash.Write([]byte(col)) + _, _ = hash.Write([]byte{0}) + } - // Insert the predicate at the correct position - tx.Predicates = append(tx.Predicates, predicate) // Expand the slice by one - copy(tx.Predicates[index+1:], tx.Predicates[index:]) // Shift elements to the right - tx.Predicates[index] = predicate // Place the new predicate + for _, pred := range tx.Predicates { + _, _ = hash.Write([]byte(pred.String())) + _, _ = hash.Write([]byte{0}) } } +func (tx TxQuery) Equals(other TxQuery) bool { + if tx.Op != other.Op { + return false + } + if tx.AffectedTable != other.AffectedTable { + return false + } + if len(tx.UpdatedColumns) != len(other.UpdatedColumns) { + return false + } + for i := range tx.UpdatedColumns { + if tx.UpdatedColumns[i] != other.UpdatedColumns[i] { + return false + } + } + if len(tx.Predicates) != len(other.Predicates) { + return false + } + for i := range tx.Predicates { + if tx.Predicates[i] != other.Predicates[i] { + return false + } + } + return true +} + func newTxSignatureMap() *txSignatureMap { return &txSignatureMap{ data: make(map[uint64][]*TxSignature), @@ -152,16 +158,7 @@ func (tx *TxSignature) Equals(other *TxSignature) bool { return false } for i := range tx.Queries { - if tx.Queries[i] != other.Queries[i] { - return false - } - } - - if len(tx.Predicates) != len(other.Predicates) { - return false - } - for i := range tx.Predicates { - if tx.Predicates[i] != other.Predicates[i] { + if !tx.Queries[i].Equals(other.Queries[i]) { return false } } @@ -171,37 +168,47 @@ func (tx *TxSignature) Equals(other *TxSignature) bool { // CleanUp removes values that are only used once and replaces them with -1 func (tx *TxSignature) CleanUp() *TxSignature { - newPredicates := make([]predicateInfo, 0, len(tx.Predicates)) usedValues := make(map[int]int) // First let's count how many times each value is used - for _, pred := range tx.Predicates { - usedValues[pred.Val]++ + for _, query := range tx.Queries { + for _, predicate := range query.Predicates { + usedValues[predicate.Val]++ + } } // Now we replace values only used once with -1 newCount := 0 newValues := make(map[int]int) - for _, pred := range tx.Predicates { - if usedValues[pred.Val] == 1 { - pred.Val = -1 - } else { - newVal, found := newValues[pred.Val] - if !found { - // Assign a new value to this predicate - newVal = newCount - newCount++ - newValues[pred.Val] = newVal + newQueries := make([]TxQuery, 0, len(tx.Queries)) + for _, query := range tx.Queries { + newPredicates := make([]predicateInfo, 0, len(query.Predicates)) + for _, predicate := range query.Predicates { + if usedValues[predicate.Val] == 1 { + predicate.Val = -1 + } else { + newVal, found := newValues[predicate.Val] + if !found { + // Assign a new value to this predicate + newVal = newCount + newCount++ + newValues[predicate.Val] = newVal + } + predicate.Val = newVal } - pred.Val = newVal + newPredicates = append(newPredicates, predicate) } - newPredicates = append(newPredicates, pred) + newQueries = append(newQueries, TxQuery{ + Op: query.Op, + AffectedTable: query.AffectedTable, + UpdatedColumns: query.UpdatedColumns, + Predicates: newPredicates, + }) } return &TxSignature{ - Queries: tx.Queries, - Predicates: newPredicates, - Count: tx.Count, + Queries: newQueries, + Count: tx.Count, } } diff --git a/go/transactions/transaction_signature_test.go b/go/transactions/transaction_signature_test.go deleted file mode 100644 index fde26d5..0000000 --- a/go/transactions/transaction_signature_test.go +++ /dev/null @@ -1,92 +0,0 @@ -/* -Copyright 2024 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package transactions - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "vitess.io/vitess/go/vt/sqlparser" -) - -func TestTxSignature_addPredicate(t *testing.T) { - tests := []struct { - name string - existing []predicateInfo - newOnes []predicateInfo - expectedResult []predicateInfo - }{ - { - name: "Add single predicate to empty list", - existing: []predicateInfo{}, - newOnes: []predicateInfo{ - {Table: "table", Col: "id", Op: sqlparser.EqualOp, Val: 1}, - }, - expectedResult: []predicateInfo{ - {Table: "table", Col: "id", Op: sqlparser.EqualOp, Val: 1}, - }, - }, - { - name: "Add one predicates, have one", - existing: []predicateInfo{ - {Table: "table", Col: "id", Op: sqlparser.EqualOp, Val: 1}, - }, - newOnes: []predicateInfo{ - {Table: "table", Col: "name", Op: sqlparser.LikeOp, Val: 2}, - }, - expectedResult: []predicateInfo{ - {Table: "table", Col: "id", Op: sqlparser.EqualOp, Val: 1}, - {Table: "table", Col: "name", Op: sqlparser.LikeOp, Val: 2}, - }, - }, - { - name: "Add one predicates, have one, reverse order", - existing: []predicateInfo{ - {Table: "table", Col: "name", Op: sqlparser.LikeOp, Val: 2}, - }, - newOnes: []predicateInfo{ - {Table: "table", Col: "id", Op: sqlparser.EqualOp, Val: 1}, - }, - expectedResult: []predicateInfo{ - {Table: "table", Col: "id", Op: sqlparser.EqualOp, Val: 1}, - {Table: "table", Col: "name", Op: sqlparser.LikeOp, Val: 2}, - }, - }, - { - name: "Add existing predicate", - existing: []predicateInfo{ - {Table: "table", Col: "id", Op: sqlparser.EqualOp, Val: 1}, - }, - newOnes: []predicateInfo{ - {Table: "table", Col: "id", Op: sqlparser.EqualOp, Val: 1}, - }, - expectedResult: []predicateInfo{ - {Table: "table", Col: "id", Op: sqlparser.EqualOp, Val: 1}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tx := &TxSignature{ - Predicates: tt.existing, - } - tx.addPredicate(tt.newOnes) - assert.Equal(t, tt.expectedResult, tx.Predicates) - }) - } -} diff --git a/go/transactions/transactions.go b/go/transactions/transactions.go index de35cd8..b6469e2 100644 --- a/go/transactions/transactions.go +++ b/go/transactions/transactions.go @@ -21,7 +21,6 @@ import ( "fmt" "io" "os" - "strconv" "strings" "sync" @@ -223,92 +222,46 @@ func (s *state) consume(ch <-chan []sqlparser.Statement, wg *sync.WaitGroup) { } } -func querySignatureForUpd(query *sqlparser.Update) string { - buffer := sqlparser.NewTrackedBuffer(nil) - builder := buffer.Builder - builder.WriteString("update ") - - for i, tbl := range query.TableExprs { - tbl.Format(buffer) - if i < len(query.TableExprs)-1 { - buffer.WriteString(", ") - } - } - +func (s *state) consumeUpdate(query *sqlparser.Update, st *semantics.SemTable, n *normalizer, tx *TxSignature) { + // Find all predicates in the where clause that use a column and a literal + var predicates []predicateInfo if query.Where != nil { - query.Where.Format(buffer) + predicates = getPredicates(query.Where.Expr, st, n) } - builder.WriteString(" set ") - - for i, expr := range query.Exprs { - expr.Name.Format(buffer) - if i < len(query.Exprs)-1 { - buffer.WriteString(", ") - } + updatedColumns := make([]string, 0, len(query.Exprs)) + for _, expr := range query.Exprs { + updatedColumns = append(updatedColumns, sqlparser.String(expr.Name.Name)) } - return builder.String() -} - -func (s *state) consumeUpdate(query *sqlparser.Update, st *semantics.SemTable, n *normalizer, tx *TxSignature) { - defer func() { - tx.Queries = append(tx.Queries, querySignatureForUpd(query)) - }() - - if query.Where == nil { - return + if len(query.TableExprs) != 1 { + // TODO: Implement support for multi-table updates + panic("multi-table updates not supported") } - // Find all predicates in the where clause that use a column and a literal - tx.addPredicate(getPredicates(query.Where.Expr, st, n)) - query.Where = normalizeWhere(query.Where, n) + tx.Queries = append(tx.Queries, TxQuery{ + Op: "update", + AffectedTable: sqlparser.String(query.TableExprs[0]), + UpdatedColumns: updatedColumns, + Predicates: predicates, + }) } func (s *state) consumeDelete(del *sqlparser.Delete, st *semantics.SemTable, n *normalizer, tx *TxSignature) { - defer func() { - tx.Queries = append(tx.Queries, sqlparser.String(del)) - }() - - if del.Where == nil { - return + var predicates []predicateInfo + if del.Where != nil { + predicates = getPredicates(del.Where.Expr, st, n) } - - // Find all predicates in the where clause that use a column and a literal - tx.addPredicate(getPredicates(del.Where.Expr, st, n)) - del.Where = normalizeWhere(del.Where, n) -} - -func normalizeWhere(where *sqlparser.Where, n *normalizer) (newWhere *sqlparser.Where) { - newWhere = new(sqlparser.Where) - predicates := sqlparser.SplitAndExpression(nil, where.Expr) - for _, predicate := range predicates { - switch cmp := predicate.(type) { - case *sqlparser.ComparisonExpr: - lhs, lhsOK := cmp.Left.(*sqlparser.Literal) - rhs, rhsOK := cmp.Right.(*sqlparser.Literal) - if !lhsOK && !rhsOK || lhsOK && rhsOK { - newWhere.Expr = sqlparser.AndExpressions(newWhere.Expr) - continue - } - - var newCmp sqlparser.ComparisonExpr - newCmp.Operator = cmp.Operator - if lhsOK { - id := n.normalize(lhs.Val) - newCmp.Left = sqlparser.NewArgument(strconv.Itoa(id)) - newCmp.Right = cmp.Right - } else { - id := n.normalize(rhs.Val) - newCmp.Right = sqlparser.NewArgument(strconv.Itoa(id)) - newCmp.Left = cmp.Left - } - newWhere.Expr = sqlparser.AndExpressions(newWhere.Expr, &newCmp) - default: - newWhere.Expr = sqlparser.AndExpressions(newWhere.Expr, predicate) - } + if len(del.TableExprs) != 1 { + // TODO: Implement support for multi-table deletes + panic("multi-table updates not supported") } - return + + tx.Queries = append(tx.Queries, TxQuery{ + Op: "delete", + AffectedTable: sqlparser.String(del.TableExprs[0]), + Predicates: predicates, + }) } func (s *state) addSignature(tx *TxSignature) { @@ -324,10 +277,13 @@ func (s *state) run(out io.Writer, cfg Config) { loader := cfg.Loader.Load(cfg.FileName) ch := make(chan []sqlparser.Statement, 1000) + noOfConsumers := 1 var wg sync.WaitGroup - wg.Add(1) + for range noOfConsumers { + wg.Add(1) + go s.consume(ch, &wg) + } - go s.consume(ch, &wg) go func() { s.startProducing(loader, defaultAutocommit, ch) close(ch)