Skip to content

Commit

Permalink
table: Update delta from column slice, instead of map (#53670)
Browse files Browse the repository at this point in the history
close #53660
  • Loading branch information
ekexium authored Jun 1, 2024
1 parent 4670bf5 commit 8483dc5
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 10 deletions.
29 changes: 29 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,35 @@ func (tc *TransactionContext) UpdateDeltaForTable(physicalTableID int64, delta i
tc.TableDeltaMap[physicalTableID] = item
}

// ColSize is a data struct to store the delta information for a table.
type ColSize struct {
ColID int64
Size int64
}

// UpdateDeltaForTableFromColSlice is the same as UpdateDeltaForTable, but it accepts a slice of column size.
func (tc *TransactionContext) UpdateDeltaForTableFromColSlice(
physicalTableID int64, delta int64,
count int64, colSizes []ColSize,
) {
tc.tdmLock.Lock()
defer tc.tdmLock.Unlock()
if tc.TableDeltaMap == nil {
tc.TableDeltaMap = make(map[int64]TableDelta)
}
item := tc.TableDeltaMap[physicalTableID]
if item.ColSize == nil && len(colSizes) > 0 {
item.ColSize = make(map[int64]int64, len(colSizes))
}
item.Delta += delta
item.Count += count
item.TableID = physicalTableID
for _, s := range colSizes {
item.ColSize[s.ColID] += s.Size
}
tc.TableDeltaMap[physicalTableID] = item
}

// GetKeyInPessimisticLockCache gets a key in pessimistic lock cache.
func (tc *TransactionContext) GetKeyInPessimisticLockCache(key kv.Key) (val []byte, ok bool) {
if tc.pessimisticLockCache == nil && tc.CurrentStmtPessimisticLockCache == nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/table/tables/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import (
func BenchmarkAddRecordInPipelinedDML(b *testing.B) {
store, dom := testkit.CreateMockStoreAndDomain(b)
tk := testkit.NewTestKit(b, store)
_, err := tk.Session().Execute(context.Background(), "CREATE TABLE test.t (a int primary key auto_increment, b varchar(255))")
_, err := tk.Session().Execute(
context.Background(),
"CREATE TABLE test.t (a int primary key auto_increment, b varchar(255))",
)
require.NoError(b, err)
ctx := tk.Session()
vars := ctx.GetSessionVars()
Expand Down
18 changes: 9 additions & 9 deletions pkg/table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx table.MutateContext
return err
}
}
colSize := make(map[int64]int64, len(t.Cols()))
colSize := make([]variable.ColSize, len(t.Cols()))
for id, col := range t.Cols() {
size, err := codec.EstimateValueSize(sc.TypeCtx(), newData[id])
if err != nil {
Expand All @@ -683,9 +683,9 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx table.MutateContext
continue
}
oldLen := size - 1
colSize[col.ID] = int64(newLen - oldLen)
colSize[id] = variable.ColSize{ColID: col.ID, Size: int64(newLen - oldLen)}
}
sessVars.TxnCtx.UpdateDeltaForTable(t.physicalTableID, 0, 1, colSize)
sessVars.TxnCtx.UpdateDeltaForTableFromColSlice(t.physicalTableID, 0, 1, colSize)
return nil
}

Expand Down Expand Up @@ -1171,15 +1171,15 @@ func (t *TableCommon) AddRecord(sctx table.MutateContext, r []types.Datum, opts
sessVars.TxnCtx.InsertTTLRowsCount += 1
}

colSize := make(map[int64]int64, len(r))
colSize := make([]variable.ColSize, len(t.Cols()))
for id, col := range t.Cols() {
size, err := codec.EstimateValueSize(sc.TypeCtx(), r[id])
if err != nil {
continue
}
colSize[col.ID] = int64(size) - 1
colSize[id] = variable.ColSize{ColID: col.ID, Size: int64(size - 1)}
}
sessVars.TxnCtx.UpdateDeltaForTable(t.physicalTableID, 1, 1, colSize)
sessVars.TxnCtx.UpdateDeltaForTableFromColSlice(t.physicalTableID, 1, 1, colSize)
return recordID, nil
}

Expand Down Expand Up @@ -1448,15 +1448,15 @@ func (t *TableCommon) RemoveRecord(ctx table.MutateContext, h kv.Handle, r []typ
if ctx.GetSessionVars().TxnCtx == nil {
return nil
}
colSize := make(map[int64]int64, len(t.Cols()))
colSize := make([]variable.ColSize, len(t.Cols()))
for id, col := range t.Cols() {
size, err := codec.EstimateValueSize(sc.TypeCtx(), r[id])
if err != nil {
continue
}
colSize[col.ID] = -int64(size - 1)
colSize[id] = variable.ColSize{ColID: col.ID, Size: -int64(size - 1)}
}
ctx.GetSessionVars().TxnCtx.UpdateDeltaForTable(t.physicalTableID, -1, 1, colSize)
ctx.GetSessionVars().TxnCtx.UpdateDeltaForTableFromColSlice(t.physicalTableID, -1, 1, colSize)
return err
}

Expand Down

0 comments on commit 8483dc5

Please sign in to comment.