Skip to content

Commit

Permalink
enhance: make pchannel level flusher
Browse files Browse the repository at this point in the history
- Add a pchannel level checkpoint for flush processing
- Refactor the recovery of flushers of wal
- make a shared wal scanner first, then make multi datasyncservice on it

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Jan 15, 2025
1 parent 059bef1 commit 17e4298
Show file tree
Hide file tree
Showing 37 changed files with 1,126 additions and 1,296 deletions.
1 change: 0 additions & 1 deletion internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ packages:
Consumer:
github.com/milvus-io/milvus/internal/streamingnode/server/flusher:
interfaces:
Flusher:
FlushMsgHandler:
github.com/milvus-io/milvus/internal/streamingnode/server/wal:
interfaces:
Expand Down
26 changes: 26 additions & 0 deletions internal/flushcommon/pipeline/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package pipeline

import (
"context"
"fmt"
"sync"

"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/compaction"
"github.com/milvus-io/milvus/internal/flushcommon/broker"
"github.com/milvus-io/milvus/internal/flushcommon/io"
Expand Down Expand Up @@ -407,3 +409,27 @@ func NewStreamingNodeDataSyncService(
func NewDataSyncServiceWithMetaCache(metaCache metacache.MetaCache) *DataSyncService {
return &DataSyncService{metacache: metaCache}
}

func NewEmptyStreamingNodeDataSyncService(
initCtx context.Context,
pipelineParams *util.PipelineParams,
input <-chan *msgstream.MsgPack,
vchannelInfo *datapb.VchannelInfo,
schema *schemapb.CollectionSchema,
wbTaskObserverCallback writebuffer.TaskObserverCallback,
dropCallback func(),
) *DataSyncService {
watchInfo := &datapb.ChannelWatchInfo{
Vchan: vchannelInfo,
Schema: schema,
}
metaCache, err := getMetaCacheForStreaming(initCtx, pipelineParams, watchInfo, make([]*datapb.SegmentInfo, 0), make([]*datapb.SegmentInfo, 0))
if err != nil {
panic(fmt.Sprintf("new a empty streaming node data sync service should never be failed, %s", err.Error()))
}
ds, err := getServiceWithChannel(initCtx, pipelineParams, watchInfo, metaCache, make([]*datapb.SegmentInfo, 0), make([]*datapb.SegmentInfo, 0), input, wbTaskObserverCallback, dropCallback)
if err != nil {
panic(fmt.Sprintf("new a empty data sync service should never be failed, %s", err.Error()))
}
return ds
}
6 changes: 5 additions & 1 deletion internal/flushcommon/syncmgr/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type syncManager struct {

tasks *typeutil.ConcurrentMap[string, Task]
taskStats *expirable.LRU[string, Task]
handler config.EventHandler
}

func NewSyncManager(chunkManager storage.ChunkManager) SyncManager {
Expand All @@ -75,7 +76,9 @@ func NewSyncManager(chunkManager storage.ChunkManager) SyncManager {
taskStats: expirable.NewLRU[string, Task](16, nil, time.Minute*15),
}
// setup config update watcher
params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasks.Key, config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler))
handler := config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler)
syncMgr.handler = handler
params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasks.Key, handler)
return syncMgr
}

Expand Down Expand Up @@ -155,6 +158,7 @@ func (mgr *syncManager) TaskStatsJSON() string {
}

func (mgr *syncManager) Close() error {
paramtable.Get().Unwatch(paramtable.Get().DataNodeCfg.MaxParallelSyncMgrTasks.Key, mgr.handler)
timeout := paramtable.Get().CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second)
return mgr.workerPool.ReleaseTimeout(timeout)
}
7 changes: 7 additions & 0 deletions internal/metastore/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,11 @@ type StreamingNodeCataLog interface {

// SaveSegmentAssignments save the segment assignments for the wal.
SaveSegmentAssignments(ctx context.Context, pChannelName string, infos []*streamingpb.SegmentAssignmentMeta) error

// GetConsumeCheckpoint gets the consuming checkpoint of the wal.
// Return nil, nil if the checkpoint is not exist.
GetConsumeCheckpoint(ctx context.Context, pChannelName string) (*streamingpb.WALCheckpoint, error)

// SaveConsumeCheckpoint saves the consuming checkpoint of the wal.
SaveConsumeCheckpoint(ctx context.Context, pChannelName string, checkpoint *streamingpb.WALCheckpoint) error
}
2 changes: 2 additions & 0 deletions internal/metastore/kv/streamingnode/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ const (

DirectoryWAL = "wal"
DirectorySegmentAssign = "segment-assign"

KeyConsumeCheckpoint = "consume-checkpoint"
)
35 changes: 35 additions & 0 deletions internal/metastore/kv/streamingnode/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/milvus-io/milvus/pkg/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
)

// NewCataLog creates a new streaming-node catalog instance.
Expand All @@ -22,11 +23,13 @@ import (
// └── wal
//
// ├── pchannel-1
// │   ├── checkpoint
// │   └── segment-assign
// │   ├── 456398247934
// │   ├── 456398247936
// │   └── 456398247939
// └── pchannel-2
// ├── checkpoint
// └── segment-assign
// ├── 456398247934
// ├── 456398247935
Expand Down Expand Up @@ -96,6 +99,33 @@ func (c *catalog) SaveSegmentAssignments(ctx context.Context, pChannelName strin
return nil
}

// GetConsumeCheckpoint gets the consuming checkpoint of the wal.
func (c *catalog) GetConsumeCheckpoint(ctx context.Context, pchannelName string) (*streamingpb.WALCheckpoint, error) {
key := buildConsumeCheckpointPath(pchannelName)
value, err := c.metaKV.Load(ctx, key)
if errors.Is(err, merr.ErrIoKeyNotFound) {
return nil, nil
}
if err != nil {
return nil, err
}
val := &streamingpb.WALCheckpoint{}
if err = proto.Unmarshal([]byte(value), &streamingpb.WALCheckpoint{}); err != nil {
return nil, err
}
return val, nil
}

// SaveConsumeCheckpoint saves the consuming checkpoint of the wal.
func (c *catalog) SaveConsumeCheckpoint(ctx context.Context, pchannelName string, checkpoint *streamingpb.WALCheckpoint) error {
key := buildConsumeCheckpointPath(pchannelName)
value, err := proto.Marshal(checkpoint)
if err != nil {
return err
}
return c.metaKV.Save(ctx, key, string(value))
}

// buildSegmentAssignmentMetaPath builds the path for segment assignment
func buildSegmentAssignmentMetaPath(pChannelName string) string {
return path.Join(buildWALDirectory(pChannelName), DirectorySegmentAssign) + "/"
Expand All @@ -106,6 +136,11 @@ func buildSegmentAssignmentMetaPathOfSegment(pChannelName string, segmentID int6
return path.Join(buildWALDirectory(pChannelName), DirectorySegmentAssign, strconv.FormatInt(segmentID, 10))
}

// buildConsumeCheckpointPath builds the path for consume checkpoint
func buildConsumeCheckpointPath(pchannelName string) string {
return path.Join(buildWALDirectory(pchannelName), KeyConsumeCheckpoint)
}

// buildWALDirectory builds the path for wal directory
func buildWALDirectory(pchannelName string) string {
return path.Join(MetaPrefix, DirectoryWAL, pchannelName) + "/"
Expand Down
107 changes: 107 additions & 0 deletions internal/mocks/mock_metastore/mock_StreamingNodeCataLog.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 17e4298

Please sign in to comment.