Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: multiupdate relation #21368

Merged
merged 3 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/sql/colexec/multi_update/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,23 @@ func TestDeleteS3TableWithUniqueKeyAndSecondaryKey(t *testing.T) {
// ----- util function ----
func buildDeleteTestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool) (*process.Process, *testCase) {
_, ctrl, proc := prepareTestCtx(t, false)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, false)

batchs, affectRows := prepareTestDeleteBatchs(proc.GetMPool(), 2, hasUniqueKey, hasSecondaryKey)
multiUpdateCtxs := prepareTestDeleteMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateWriteTable
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, affectRows, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, affectRows, action, false)
return proc, retCase
}

func buildDeleteS3TestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool) (*process.Process, *testCase) {
_, ctrl, proc := prepareTestCtx(t, true)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, false)

batchs, _ := prepareTestDeleteBatchs(proc.GetMPool(), 12, hasUniqueKey, hasSecondaryKey)
multiUpdateCtxs := prepareTestDeleteMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateWriteS3
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, 0, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, 0, action, false)
return proc, retCase
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/colexec/multi_update/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,37 +98,37 @@ func TestFlushS3Info(t *testing.T) {
hasSecondaryKey := false

_, ctrl, proc := prepareTestCtx(t, true)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, false)

batchs, rowCount := buildFlushS3InfoBatch(proc.GetMPool(), hasUniqueKey, hasSecondaryKey)

multiUpdateCtxs := prepareTestInsertMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateFlushS3Info
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, rowCount, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, rowCount, action, false)

runTestCases(t, proc, []*testCase{retCase})
}

// ----- util function ----
func buildInsertTestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool) (*process.Process, *testCase) {
_, ctrl, proc := prepareTestCtx(t, false)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, false)

batchs, affectRows := prepareTestInsertBatchs(proc.GetMPool(), 2, hasUniqueKey, hasSecondaryKey)
multiUpdateCtxs := prepareTestInsertMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateWriteTable
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, affectRows, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, affectRows, action, false)
return proc, retCase
}

func buildInsertS3TestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool) (*process.Process, *testCase) {
_, ctrl, proc := prepareTestCtx(t, true)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, false)

batchs, _ := prepareTestInsertBatchs(proc.GetMPool(), 10, hasUniqueKey, hasSecondaryKey)
multiUpdateCtxs := prepareTestInsertMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateWriteS3
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, 0, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, 0, action, false)
return proc, retCase
}

Expand Down
19 changes: 14 additions & 5 deletions pkg/sql/colexec/multi_update/multi_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,22 @@ func (update *MultiUpdate) Prepare(proc *process.Process) error {

for _, updateCtx := range update.MultiUpdateCtx {
info := update.ctr.updateCtxInfos[updateCtx.TableDef.Name]
info.Sources = nil
if update.Action != UpdateWriteS3 {
rel, err := colexec.GetRelAndPartitionRelsByObjRef(proc.Ctx, proc, update.Engine, updateCtx.ObjRef)
if err != nil {
return err
if len(info.Sources) == 0 {
info.Sources = nil
rel, err := colexec.GetRelAndPartitionRelsByObjRef(proc.Ctx, proc, update.Engine, updateCtx.ObjRef)
if err != nil {
return err
}
info.Sources = append(info.Sources, rel)
} else {
for _, rel := range info.Sources {
err := rel.Reset(proc.GetTxnOperator())
if err != nil {
return err
}
}
}
info.Sources = append(info.Sources, rel)
}
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/colexec/multi_update/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ func (update *MultiUpdate) Reset(proc *process.Process, pipelineFailed bool, err
if update.ctr.s3Writer != nil {
update.ctr.s3Writer.reset(proc)
}
for _, info := range update.ctr.updateCtxInfos {
info.Sources = nil
}

update.ctr.state = vm.Build
}

Expand Down
17 changes: 10 additions & 7 deletions pkg/sql/colexec/multi_update/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@ func TestUpdateSingleTable(t *testing.T) {
hasUniqueKey := false
hasSecondaryKey := false

proc, case1 := buildUpdateTestCase(t, hasUniqueKey, hasSecondaryKey)
proc, case1 := buildUpdateTestCase(t, hasUniqueKey, hasSecondaryKey, false)
runTestCases(t, proc, []*testCase{case1})

proc, case1 = buildUpdateTestCase(t, hasUniqueKey, hasSecondaryKey, true)
runTestCases(t, proc, []*testCase{case1})
}

func TestUpdateTableWithUniqueKey(t *testing.T) {
hasUniqueKey := true
hasSecondaryKey := false

proc, case1 := buildUpdateTestCase(t, hasUniqueKey, hasSecondaryKey)
proc, case1 := buildUpdateTestCase(t, hasUniqueKey, hasSecondaryKey, false)
runTestCases(t, proc, []*testCase{case1})
}

Expand All @@ -79,25 +82,25 @@ func TestUpdateS3TableWithUniqueKey(t *testing.T) {
}

// ----- util function ----
func buildUpdateTestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool) (*process.Process, *testCase) {
func buildUpdateTestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool, relResetExpectErr bool) (*process.Process, *testCase) {
_, ctrl, proc := prepareTestCtx(t, false)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, relResetExpectErr)

batchs, affectRows := prepareUpdateTestBatchs(proc.GetMPool(), 3, hasUniqueKey, hasSecondaryKey)
multiUpdateCtxs := prepareTestUpdateMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateWriteTable
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, affectRows, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, affectRows, action, relResetExpectErr)
return proc, retCase
}

func buildUpdateS3TestCase(t *testing.T, hasUniqueKey bool, hasSecondaryKey bool) (*process.Process, *testCase) {
_, ctrl, proc := prepareTestCtx(t, true)
eng := prepareTestEng(ctrl)
eng := prepareTestEng(ctrl, false)

batchs, _ := prepareUpdateTestBatchs(proc.GetMPool(), 10, hasUniqueKey, hasSecondaryKey)
multiUpdateCtxs := prepareTestUpdateMultiUpdateCtx(hasUniqueKey, hasSecondaryKey)
action := UpdateWriteS3
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, 0, action)
retCase := buildTestCase(multiUpdateCtxs, eng, batchs, 0, action, false)
return proc, retCase
}

Expand Down
37 changes: 28 additions & 9 deletions pkg/sql/colexec/multi_update/util_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -49,10 +50,11 @@ var (
)

type testCase struct {
op *MultiUpdate
inputBatchs []*batch.Batch
expectErr bool
affectedRows uint64
op *MultiUpdate
inputBatchs []*batch.Batch
expectErr bool
relResetExpectErr bool
affectedRows uint64
}

func runTestCases(t *testing.T, proc *process.Process, tcs []*testCase) {
Expand Down Expand Up @@ -101,7 +103,17 @@ func runTestCases(t *testing.T, proc *process.Process, tcs []*testCase) {
if tc.op.ctr.s3Writer != nil {
tc.op.ctr.s3Writer.flushThreshold = 2 * mpool.MB
}
if tc.relResetExpectErr {
require.Error(t, err)
for _, bat := range tc.inputBatchs {
bat.Clean(proc.GetMPool())
}
tc.op.Children[0].Free(proc, false, nil)
tc.op.Free(proc, true, err)
continue
}
require.NoError(t, err)

for {
res, err = vm.Exec(tc.op, proc)
if res.Batch == nil || res.Status == vm.ExecStop {
Expand Down Expand Up @@ -172,7 +184,7 @@ func prepareTestCtx(t *testing.T, withFs bool) (context.Context, *gomock.Control
return ctx, ctrl, proc
}

func prepareTestEng(ctrl *gomock.Controller) engine.Engine {
func prepareTestEng(ctrl *gomock.Controller, relationResetReturnError bool) engine.Engine {
eng := mock_frontend.NewMockEngine(ctrl)
eng.EXPECT().New(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
eng.EXPECT().Hints().Return(engine.Hints{
Expand All @@ -184,6 +196,12 @@ func prepareTestEng(ctrl *gomock.Controller) engine.Engine {

relation := mock_frontend.NewMockRelation(ctrl)
relation.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
if relationResetReturnError {
relation.EXPECT().Reset(gomock.Any()).Return(moerr.NewInternalErrorNoCtx("")).AnyTimes()
} else {
relation.EXPECT().Reset(gomock.Any()).Return(nil).AnyTimes()
}

relation.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

database.EXPECT().Relation(gomock.Any(), gomock.Any(), gomock.Any()).Return(relation, nil).AnyTimes()
Expand Down Expand Up @@ -293,7 +311,7 @@ func buildTestCase(
eng engine.Engine,
inputBats []*batch.Batch,
affectRows uint64,
action UpdateAction) *testCase {
action UpdateAction, relResetExpectErr bool) *testCase {

retCase := &testCase{
op: &MultiUpdate{
Expand All @@ -310,9 +328,10 @@ func buildTestCase(
},
},
},
inputBatchs: inputBats,
expectErr: false,
affectedRows: affectRows,
inputBatchs: inputBats,
expectErr: false,
relResetExpectErr: relResetExpectErr,
affectedRows: affectRows,
}

return retCase
Expand Down
Loading