Skip to content

Commit

Permalink
Merge branch 'dev' into feat/transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
dk-lockdown authored Aug 19, 2022
2 parents 1ef30ef + 47f1059 commit 1837a1b
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 12 deletions.
16 changes: 14 additions & 2 deletions pkg/executor/read_write_splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,25 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(
if !ok {
return nil, 0, errors.New("there is no transaction")
}
defer executor.localTransactionMap.Delete(connectionID)
if stmt.SavepointName == "" {
defer executor.localTransactionMap.Delete(connectionID)
}
tx = txi.(proto.Tx)
// TODO add metrics
if result, err = tx.Rollback(spanCtx, stmt); err != nil {
return nil, 0, err
}
return result, 0, err
case *ast.ReleaseSavepointStmt:
txi, ok := executor.localTransactionMap.Load(connectionID)
if !ok {
return nil, 0, errors.New("there is no transaction")
}
tx = txi.(proto.Tx)
if result, err = tx.ReleaseSavepoint(spanCtx, stmt.Name); err != nil {
return nil, 0, err
}
return result, 0, err
case *ast.InsertStmt, *ast.DeleteStmt, *ast.UpdateStmt:
txi, ok := executor.localTransactionMap.Load(connectionID)
if ok {
Expand Down Expand Up @@ -335,5 +347,5 @@ func (executor *ReadWriteSplittingExecutor) doPostFilter(ctx context.Context, re
return err
}
}
return nil
return err
}
2 changes: 1 addition & 1 deletion pkg/executor/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,5 +348,5 @@ func (executor *ShardingExecutor) doPostFilter(ctx context.Context, result proto
return err
}
}
return nil
return err
}
16 changes: 14 additions & 2 deletions pkg/executor/single_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,25 @@ func (executor *SingleDBExecutor) ExecutorComQuery(
if !ok {
return nil, 0, errors.New("there is no transaction")
}
defer executor.localTransactionMap.Delete(connectionID)
if stmt.SavepointName == "" {
defer executor.localTransactionMap.Delete(connectionID)
}
tx = txi.(proto.Tx)
// TODO add metrics
if result, err = tx.Rollback(spanCtx, stmt); err != nil {
return nil, 0, err
}
return result, 0, err
case *ast.ReleaseSavepointStmt:
txi, ok := executor.localTransactionMap.Load(connectionID)
if !ok {
return nil, 0, errors.New("there is no transaction")
}
tx = txi.(proto.Tx)
if result, err = tx.ReleaseSavepoint(spanCtx, stmt.Name); err != nil {
return nil, 0, err
}
return result, 0, err
default:
txi, ok := executor.localTransactionMap.Load(connectionID)
if ok {
Expand Down Expand Up @@ -285,5 +297,5 @@ func (executor *SingleDBExecutor) doPostFilter(ctx context.Context, result proto
return err
}
}
return nil
return err
}
3 changes: 3 additions & 0 deletions pkg/listener/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,9 @@ func (l *MysqlListener) ExecuteCommand(ctx context.Context, c *mysql.Conn, data
return err
}
}
case constant.ComResetConnection:
c.RecycleReadPacket()
return c.WriteOKPacket(0, 0, c.StatusFlags(), 0)
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/proto/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ type (
ExecuteSqlDirectly(sql string, args ...interface{}) (Result, uint16, error)
Commit(ctx context.Context) (Result, error)
Rollback(ctx context.Context, stmt *ast.RollbackStmt) (Result, error)
ReleaseSavepoint(ctx context.Context, savepoint string) (result Result, err error)
}

DBManager interface {
Expand Down
19 changes: 17 additions & 2 deletions pkg/sql/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,24 @@ func (tx *Tx) Rollback(ctx context.Context, stmt *ast.RollbackStmt) (result prot
result, err = tx.conn.Execute(ctx, fmt.Sprintf("ROLLBACK TO %s", stmt.SavepointName), false)
} else {
result, err = tx.conn.Execute(ctx, "ROLLBACK", false)
tx.db.pool.Put(tx.conn)
tx.Close()
}
tx.db.pool.Put(tx.conn)
tx.Close()
return
}

func (tx *Tx) ReleaseSavepoint(ctx context.Context, savepoint string) (result proto.Result, err error) {
_, span := tracing.GetTraceSpan(ctx, tracing.TxReleaseSavePoint)
span.SetAttributes(attribute.KeyValue{Key: "db", Value: attribute.StringValue(tx.db.name)})
defer span.End()

if tx.closed.Load() {
return nil, nil
}
if tx.db == nil || tx.db.IsClosed() {
return nil, err2.ErrInvalidConn
}
result, err = tx.conn.Execute(ctx, fmt.Sprintf("RELEASE SAVEPOINT %s", savepoint), false)
return
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/tracing/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ const (
GroupTransactionBegin = "group_transaction_begin"

// tx
TxQuery = "tx_query"
TxExecSQL = "tx_exec_sql"
TxExecStmt = "tx_exec_stmt"
TxCommit = "db_tx_commit"
TxRollback = "db_tx_rollback"
TxQuery = "tx_query"
TxExecSQL = "tx_exec_sql"
TxExecStmt = "tx_exec_stmt"
TxCommit = "db_local_transaction_commit"
TxRollback = "db_local_transaction_rollback"
TxReleaseSavePoint = "db_local_transaction_release_savepoint"

// group tx
GroupTxQuery = "group_tx_query"
Expand Down
15 changes: 15 additions & 0 deletions testdata/mock_tx.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 1837a1b

Please sign in to comment.