Skip to content

Commit

Permalink
refactor: multiupdate relation (#21368)
Browse files Browse the repository at this point in the history
multiupdate relation using reset interface

Approved by: @ouyuanning
  • Loading branch information
huby2358 authored Feb 11, 2025
1 parent 82d59e6 commit ab9421f
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 34 deletions.
8 changes: 4 additions & 4 deletions pkg/sql/colexec/multi_update/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,23 @@ func TestDeleteS3TableWithUniqueKeyAndSecondaryKey(t *testing.T) {
// ----- util function ----
func buildDeleteTestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool) (*process.Process, *testCase) {
_, ctrl, proc := prepareTestCtx(t, false)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, false)

batchs, affectRows := prepareTestDeleteBatchs(proc.GetMPool(), 2, hasUniqueKey, hasSecondaryKey)
multiUpdateCtxs := prepareTestDeleteMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateWriteTable
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, affectRows, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, affectRows, action, false)
return proc, retCase
}

func buildDeleteS3TestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool) (*process.Process, *testCase) {
_, ctrl, proc := prepareTestCtx(t, true)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, false)

batchs, _ := prepareTestDeleteBatchs(proc.GetMPool(), 12, hasUniqueKey, hasSecondaryKey)
multiUpdateCtxs := prepareTestDeleteMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateWriteS3
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, 0, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, 0, action, false)
return proc, retCase
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/colexec/multi_update/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,37 +98,37 @@ func TestFlushS3Info(t *testing.T) {
hasSecondaryKey := false

_, ctrl, proc := prepareTestCtx(t, true)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, false)

batchs, rowCount := buildFlushS3InfoBatch(proc.GetMPool(), hasUniqueKey, hasSecondaryKey)

multiUpdateCtxs := prepareTestInsertMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateFlushS3Info
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, rowCount, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, rowCount, action, false)

runTestCases(t, proc, []*testCase{retCase})
}

// ----- util function ----
func buildInsertTestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool) (*process.Process, *testCase) {
_, ctrl, proc := prepareTestCtx(t, false)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, false)

batchs, affectRows := prepareTestInsertBatchs(proc.GetMPool(), 2, hasUniqueKey, hasSecondaryKey)
multiUpdateCtxs := prepareTestInsertMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateWriteTable
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, affectRows, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, affectRows, action, false)
return proc, retCase
}

func buildInsertS3TestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool) (*process.Process, *testCase) {
_, ctrl, proc := prepareTestCtx(t, true)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, false)

batchs, _ := prepareTestInsertBatchs(proc.GetMPool(), 10, hasUniqueKey, hasSecondaryKey)
multiUpdateCtxs := prepareTestInsertMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateWriteS3
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, 0, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, 0, action, false)
return proc, retCase
}

Expand Down
19 changes: 14 additions & 5 deletions pkg/sql/colexec/multi_update/multi_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,22 @@ func (update *MultiUpdate) Prepare(proc *process.Process) error {

for _, updateCtx := range update.MultiUpdateCtx {
info := update.ctr.updateCtxInfos[updateCtx.TableDef.Name]
info.Sources = nil
if update.Action != UpdateWriteS3 {
rel, err := colexec.GetRelAndPartitionRelsByObjRef(proc.Ctx, proc, update.Engine, updateCtx.ObjRef)
if err != nil {
return err
if len(info.Sources) == 0 {
info.Sources = nil
rel, err := colexec.GetRelAndPartitionRelsByObjRef(proc.Ctx, proc, update.Engine, updateCtx.ObjRef)
if err != nil {
return err
}
info.Sources = append(info.Sources, rel)
} else {
for _, rel := range info.Sources {
err := rel.Reset(proc.GetTxnOperator())
if err != nil {
return err
}
}
}
info.Sources = append(info.Sources, rel)
}
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/colexec/multi_update/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ func (update *MultiUpdate) Reset(proc *process.Process, pipelineFailed bool, err
if update.ctr.s3Writer != nil {
update.ctr.s3Writer.reset(proc)
}
for _, info := range update.ctr.updateCtxInfos {
info.Sources = nil
}

update.ctr.state = vm.Build
}

Expand Down
17 changes: 10 additions & 7 deletions pkg/sql/colexec/multi_update/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@ func TestUpdateSingleTable(t *testing.T) {
hasUniqueKey := false
hasSecondaryKey := false

proc, case1 := buildUpdateTestCase(t, hasUniqueKey, hasSecondaryKey)
proc, case1 := buildUpdateTestCase(t, hasUniqueKey, hasSecondaryKey, false)
runTestCases(t, proc, []*testCase{case1})

proc, case1 = buildUpdateTestCase(t, hasUniqueKey, hasSecondaryKey, true)
runTestCases(t, proc, []*testCase{case1})
}

func TestUpdateTableWithUniqueKey(t *testing.T) {
hasUniqueKey := true
hasSecondaryKey := false

proc, case1 := buildUpdateTestCase(t, hasUniqueKey, hasSecondaryKey)
proc, case1 := buildUpdateTestCase(t, hasUniqueKey, hasSecondaryKey, false)
runTestCases(t, proc, []*testCase{case1})
}

Expand All @@ -79,25 +82,25 @@ func TestUpdateS3TableWithUniqueKey(t *testing.T) {
}

// ----- util function ----
func buildUpdateTestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool) (*process.Process, *testCase) {
func buildUpdateTestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool, relResetExpectErr bool) (*process.Process, *testCase) {
_, ctrl, proc := prepareTestCtx(t, false)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, relResetExpectErr)

batchs, affectRows := prepareUpdateTestBatchs(proc.GetMPool(), 3, hasUniqueKey, hasSecondaryKey)
multiUpdateCtxs := prepareTestUpdateMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateWriteTable
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, affectRows, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, affectRows, action, relResetExpectErr)
return proc, retCase
}

func buildUpdateS3TestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool) (*process.Process, *testCase) {
_, ctrl, proc := prepareTestCtx(t, true)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, false)

batchs, _ := prepareUpdateTestBatchs(proc.GetMPool(), 10, hasUniqueKey, hasSecondaryKey)
multiUpdateCtxs := prepareTestUpdateMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateWriteS3
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, 0, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, 0, action, false)
return proc, retCase
}

Expand Down
37 changes: 28 additions & 9 deletions pkg/sql/colexec/multi_update/util_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -49,10 +50,11 @@ var (
)

type testCase struct {
op *MultiUpdate
inputBatchs []*batch.Batch
expectErr bool
affectedRows uint64
op *MultiUpdate
inputBatchs []*batch.Batch
expectErr bool
relResetExpectErr bool
affectedRows uint64
}

func runTestCases(t *testing.T, proc *process.Process, tcs []*testCase) {
Expand Down Expand Up @@ -101,7 +103,17 @@ func runTestCases(t *testing.T, proc *process.Process, tcs []*testCase) {
if tc.op.ctr.s3Writer != nil {
tc.op.ctr.s3Writer.flushThreshold = 2 * mpool.MB
}
if tc.relResetExpectErr {
require.Error(t, err)
for _, bat := range tc.inputBatchs {
bat.Clean(proc.GetMPool())
}
tc.op.Children[0].Free(proc, false, nil)
tc.op.Free(proc, true, err)
continue
}
require.NoError(t, err)

for {
res, err = vm.Exec(tc.op, proc)
if res.Batch == nil || res.Status == vm.ExecStop {
Expand Down Expand Up @@ -172,7 +184,7 @@ func prepareTestCtx(t *testing.T, withFs bool) (context.Context, *gomock.Control
return ctx, ctrl, proc
}

func prepareTestEng(ctrl *gomock.Controller) engine.Engine {
func prepareTestEng(ctrl *gomock.Controller, relationResetReturnError bool) engine.Engine {
eng := mock_frontend.NewMockEngine(ctrl)
eng.EXPECT().New(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
eng.EXPECT().Hints().Return(engine.Hints{
Expand All @@ -184,6 +196,12 @@ func prepareTestEng(ctrl *gomock.Controller) engine.Engine {

relation := mock_frontend.NewMockRelation(ctrl)
relation.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
if relationResetReturnError {
relation.EXPECT().Reset(gomock.Any()).Return(moerr.NewInternalErrorNoCtx("")).AnyTimes()
} else {
relation.EXPECT().Reset(gomock.Any()).Return(nil).AnyTimes()
}

relation.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

database.EXPECT().Relation(gomock.Any(), gomock.Any(), gomock.Any()).Return(relation, nil).AnyTimes()
Expand Down Expand Up @@ -293,7 +311,7 @@ func buildTestCase(
eng engine.Engine,
inputBats []*batch.Batch,
affectRows uint64,
action UpdateAction) *testCase {
action UpdateAction, relResetExpectErr bool) *testCase {

retCase := &testCase{
op: &MultiUpdate{
Expand All @@ -310,9 +328,10 @@ func buildTestCase(
},
},
},
inputBatchs: inputBats,
expectErr: false,
affectedRows: affectRows,
inputBatchs: inputBats,
expectErr: false,
relResetExpectErr: relResetExpectErr,
affectedRows: affectRows,
}

return retCase
Expand Down

0 comments on commit ab9421f

Please sign in to comment.