Skip to content

Commit

Permalink
Use writebuffer, sync manager refactory in datanode (#28320)
Browse files Browse the repository at this point in the history
See also #27675
This PR make previously merged refactory of datanode go online
- Use write node to replace insert/delete node
- Use write buffer manager to control all buffers
- Use sync manager to control sync tasks instead of flush manager

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Nov 15, 2023
1 parent fd3ae4c commit 0b90507
Show file tree
Hide file tree
Showing 63 changed files with 2,859 additions and 10,414 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Broker --dir=$(PWD)/internal/datanode/broker --output=$(PWD)/internal/datanode/broker/ --filename=mock_broker.go --with-expecter --structname=MockBroker --outpkg=broker --inpackage
$(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/datanode/metacache --output=$(PWD)/internal/datanode/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage
$(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --output=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=Manager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockManager --outpkg=writebuffer --inpackage

generate-mockery-metastore: getdeps
$(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks
Expand Down
2 changes: 2 additions & 0 deletions internal/datanode/binlog_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
var (
errUploadToBlobStorage = errors.New("upload to blob storage wrong")
errDownloadFromBlobStorage = errors.New("download from blob storage wrong")
// errStart used for retry start
errStart = errors.New("start")
)

type downloader interface {
Expand Down
12 changes: 6 additions & 6 deletions internal/datanode/binlog_io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,19 +190,19 @@ func TestBinlogIOInnerMethods(t *testing.T) {

tests := []struct {
isvalid bool
deletepk primaryKey
deletepk storage.PrimaryKey
ts uint64

description string
}{
{true, newInt64PrimaryKey(1), 1111111, "valid input"},
{true, storage.NewInt64PrimaryKey(1), 1111111, "valid input"},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
if test.isvalid {
k, v, err := b.genDeltaBlobs(&DeleteData{
Pks: []primaryKey{test.deletepk},
Pks: []storage.PrimaryKey{test.deletepk},
Tss: []uint64{test.ts},
}, meta.GetID(), 10, 1)

Expand All @@ -217,12 +217,12 @@ func TestBinlogIOInnerMethods(t *testing.T) {
})

t.Run("Test genDeltaBlobs error", func(t *testing.T) {
pk := newInt64PrimaryKey(1)
pk := storage.NewInt64PrimaryKey(1)

t.Run("Test serialize error", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
b := &binlogIO{cm, alloc}
k, v, err := b.genDeltaBlobs(&DeleteData{Pks: []primaryKey{pk}, Tss: []uint64{}}, 1, 1, 1)
k, v, err := b.genDeltaBlobs(&DeleteData{Pks: []storage.PrimaryKey{pk}, Tss: []uint64{}}, 1, 1, 1)
assert.Error(t, err)
assert.Empty(t, k)
assert.Empty(t, v)
Expand All @@ -232,7 +232,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().AllocOne().Call.Return(int64(0), fmt.Errorf("mock AllocOne error"))
bin := binlogIO{cm, alloc}
k, v, err := bin.genDeltaBlobs(&DeleteData{Pks: []primaryKey{pk}, Tss: []uint64{1}}, 1, 1, 1)
k, v, err := bin.genDeltaBlobs(&DeleteData{Pks: []storage.PrimaryKey{pk}, Tss: []uint64{1}}, 1, 1, 1)
assert.Error(t, err)
assert.Empty(t, k)
assert.Empty(t, v)
Expand Down
Loading

0 comments on commit 0b90507

Please sign in to comment.