Skip to content

Commit

Permalink
optimize: optimize the speed of buildLockKey (#714)
Browse files Browse the repository at this point in the history
* optimize: optimize the speed of buildLockKey

* improve unit testing of buildLockKey

* fix the situation where primaryKeyValues contains nil value

* update the unit test of buildLockKey

---------

Co-authored-by: JayLiu <[email protected]>
  • Loading branch information
FinnTew and luky116 authored Dec 27, 2024
1 parent af53317 commit 1b72cb8
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 33 deletions.
46 changes: 26 additions & 20 deletions pkg/datasource/sql/undo/builder/basic_undo_log_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,35 +276,41 @@ func (b *BasicUndoLogBuilder) buildLockKey(rows driver.Rows, meta types.TableMet

// the string as local key. the local key example(multi pk): "t_user:1_a,2_b"
func (b *BasicUndoLogBuilder) buildLockKey2(records *types.RecordImage, meta types.TableMeta) string {
var (
lockKeys bytes.Buffer
filedSequence int
)
var lockKeys bytes.Buffer
lockKeys.WriteString(meta.TableName)
lockKeys.WriteString(":")

keys := meta.GetPrimaryKeyOnlyName()
keyIndexMap := make(map[string]int, len(keys))

for _, row := range records.Rows {
if filedSequence > 0 {
for idx, columnName := range keys {
keyIndexMap[columnName] = idx
}

primaryKeyRows := make([][]interface{}, len(records.Rows))

for i, row := range records.Rows {
primaryKeyValues := make([]interface{}, len(keys))
for _, column := range row.Columns {
if idx, exist := keyIndexMap[column.ColumnName]; exist {
primaryKeyValues[idx] = column.Value
}
}
primaryKeyRows[i] = primaryKeyValues
}

for i, primaryKeyValues := range primaryKeyRows {
if i > 0 {
lockKeys.WriteString(",")
}
pkSplitIndex := 0
for _, column := range row.Columns {
var hasKeyColumn bool
for _, key := range keys {
if column.ColumnName == key {
hasKeyColumn = true
if pkSplitIndex > 0 {
lockKeys.WriteString("_")
}
lockKeys.WriteString(fmt.Sprintf("%v", column.Value))
pkSplitIndex++
}
for j, pkVal := range primaryKeyValues {
if j > 0 {
lockKeys.WriteString("_")
}
if hasKeyColumn {
filedSequence++
if pkVal == nil {
continue
}
lockKeys.WriteString(fmt.Sprintf("%v", pkVal))
}
}

Expand Down
139 changes: 126 additions & 13 deletions pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,135 @@ func TestBuildWhereConditionByPKs(t *testing.T) {
}

func TestBuildLockKey(t *testing.T) {
metaData := types.TableMeta{
TableName: "test_name",
Indexs: map[string]types.IndexMeta{
"PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{{ColumnName: "id"}, {ColumnName: "userId"}}},
},
var builder BasicUndoLogBuilder

columnID := types.ColumnMeta{
ColumnName: "id",
}
columnUserId := types.ColumnMeta{
ColumnName: "userId",
}
columnName := types.ColumnMeta{
ColumnName: "name",
}
columnAge := types.ColumnMeta{
ColumnName: "age",
}
columnNonExistent := types.ColumnMeta{
ColumnName: "non_existent",
}

columnsTwoPk := []types.ColumnMeta{columnID, columnUserId}
columnsMixPk := []types.ColumnMeta{columnName, columnAge}

records := types.RecordImage{
TableName: "test_name",
Rows: []types.RowImage{
{Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, ColumnName: "id", Value: 1}, {KeyType: types.IndexTypePrimaryKey, ColumnName: "userId", Value: "one"}}},
{Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, ColumnName: "id", Value: 2}, {KeyType: types.IndexTypePrimaryKey, ColumnName: "userId", Value: "two"}}},
getColumnImage := func(columnName string, value interface{}) types.ColumnImage {
return types.ColumnImage{KeyType: types.IndexTypePrimaryKey, ColumnName: columnName, Value: value}
}

tests := []struct {
name string
metaData types.TableMeta
records types.RecordImage
expected string
}{
{
"Two Primary Keys",
types.TableMeta{
TableName: "test_name",
Indexs: map[string]types.IndexMeta{
"PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: columnsTwoPk},
},
},
types.RecordImage{
TableName: "test_name",
Rows: []types.RowImage{
{[]types.ColumnImage{getColumnImage("id", 1), getColumnImage("userId", "one")}},
{[]types.ColumnImage{getColumnImage("id", 2), getColumnImage("userId", "two")}},
},
},
"test_name:1_one,2_two",
},
{
name: "Single Primary Key",
metaData: types.TableMeta{
TableName: "single_key",
Indexs: map[string]types.IndexMeta{
"PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{columnID}},
},
},
records: types.RecordImage{
TableName: "single_key",
Rows: []types.RowImage{
{Columns: []types.ColumnImage{getColumnImage("id", 100)}},
},
},
expected: "single_key:100",
},
{
name: "Mixed Type Keys",
metaData: types.TableMeta{
TableName: "mixed_key",
Indexs: map[string]types.IndexMeta{
"PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: columnsMixPk},
},
},
records: types.RecordImage{
TableName: "mixed_key",
Rows: []types.RowImage{
{Columns: []types.ColumnImage{getColumnImage("name", "Alice"), getColumnImage("age", 25)}},
},
},
expected: "mixed_key:Alice_25",
},
{
name: "Empty Records",
metaData: types.TableMeta{
TableName: "empty",
Indexs: map[string]types.IndexMeta{
"PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{columnID}},
},
},
records: types.RecordImage{TableName: "empty"},
expected: "empty:",
},
{
name: "Special Characters",
metaData: types.TableMeta{
TableName: "special",
Indexs: map[string]types.IndexMeta{
"PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{columnID}},
},
},
records: types.RecordImage{
TableName: "special",
Rows: []types.RowImage{
{Columns: []types.ColumnImage{getColumnImage("id", "a,b_c")}},
},
},
expected: "special:a,b_c",
},
{
name: "Non-existent Key Name",
metaData: types.TableMeta{
TableName: "error_key",
Indexs: map[string]types.IndexMeta{
"PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{columnNonExistent}},
},
},
records: types.RecordImage{
TableName: "error_key",
Rows: []types.RowImage{
{Columns: []types.ColumnImage{getColumnImage("id", 1)}},
},
},
expected: "error_key:",
},
}

builder := BasicUndoLogBuilder{}
lockKeys := builder.buildLockKey2(&records, metaData)
assert.Equal(t, "test_name:1_one,2_two", lockKeys)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lockKeys := builder.buildLockKey2(&tt.records, tt.metaData)
assert.Equal(t, tt.expected, lockKeys)
})
}
}

0 comments on commit 1b72cb8

Please sign in to comment.