Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: make pchannel level flusher #39275

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ packages:
Consumer:
github.com/milvus-io/milvus/internal/streamingnode/server/flusher:
interfaces:
Flusher:
FlushMsgHandler:
github.com/milvus-io/milvus/internal/streamingnode/server/wal:
interfaces:
Expand Down Expand Up @@ -68,6 +67,9 @@ packages:
github.com/milvus-io/milvus/internal/util/segcore:
interfaces:
CSegment:
github.com/milvus-io/milvus/internal/storage:
interfaces:
ChunkManager:
github.com/milvus-io/milvus/internal/util/streamingutil/service/discoverer:
interfaces:
Discoverer:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ func (rc *resumableConsumerImpl) resumeLoop() {
// consumer need to resume when error occur, so message handler shouldn't close if the internal consumer encounter failure.
nopCloseMH := nopCloseHandler{
Handler: rc.mh,
HandleInterceptor: func(ctx context.Context, msg message.ImmutableMessage, handle handleFunc) (bool, error) {
g := rc.metrics.StartConsume(msg.EstimateSize())
ok, err := handle(ctx, msg)
g.Finish()
return ok, err
HandleInterceptor: func(handleParam message.HandleParam, h message.Handler) message.HandleResult {
if handleParam.Message != nil {
g := rc.metrics.StartConsume(handleParam.Message.EstimateSize())
defer func() { g.Finish() }()
}
return h.Handle(handleParam)
},
}

Expand Down
43 changes: 25 additions & 18 deletions internal/distributed/streaming/internal/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/consumer"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
)
Expand All @@ -22,22 +23,25 @@ func TestResumableConsumer(t *testing.T) {
ch := make(chan struct{})
c.EXPECT().Done().Return(ch)
c.EXPECT().Error().Return(errors.New("test"))
c.EXPECT().Close().Return()
c.EXPECT().Close().Return(nil)
rc := NewResumableConsumer(func(ctx context.Context, opts *handler.ConsumerOptions) (consumer.Consumer, error) {
if i == 0 {
i++
ok, err := opts.MessageHandler.Handle(context.Background(), message.NewImmutableMesasge(
walimplstest.NewTestMessageID(123),
[]byte("payload"),
map[string]string{
"key": "value",
"_t": "1",
"_tt": message.EncodeUint64(456),
"_v": "1",
"_lc": walimplstest.NewTestMessageID(123).Marshal(),
}))
assert.True(t, ok)
assert.NoError(t, err)
result := opts.MessageHandler.Handle(message.HandleParam{
Ctx: context.Background(),
Message: message.NewImmutableMesasge(
walimplstest.NewTestMessageID(123),
[]byte("payload"),
map[string]string{
"key": "value",
"_t": "1",
"_tt": message.EncodeUint64(456),
"_v": "1",
"_lc": walimplstest.NewTestMessageID(123).Marshal(),
}),
})
assert.True(t, result.MessageHandled)
assert.NoError(t, result.Error)
return c, nil
} else if i == 1 {
i++
Expand All @@ -46,15 +50,15 @@ func TestResumableConsumer(t *testing.T) {
newC := mock_consumer.NewMockConsumer(t)
newC.EXPECT().Done().Return(make(<-chan struct{}))
newC.EXPECT().Error().Return(errors.New("test"))
newC.EXPECT().Close().Return()
newC.EXPECT().Close().Return(nil)
return newC, nil
}, &ConsumerOptions{
PChannel: "test",
DeliverPolicy: options.DeliverPolicyAll(),
DeliverFilters: []options.DeliverFilter{
options.DeliverFilterTimeTickGT(1),
},
MessageHandler: message.ChanMessageHandler(make(chan message.ImmutableMessage, 2)),
MessageHandler: adaptor.ChanMessageHandler(make(chan message.ImmutableMessage, 2)),
})

select {
Expand All @@ -76,10 +80,13 @@ func TestResumableConsumer(t *testing.T) {
func TestHandler(t *testing.T) {
ch := make(chan message.ImmutableMessage, 100)
hNop := nopCloseHandler{
Handler: message.ChanMessageHandler(ch),
Handler: adaptor.ChanMessageHandler(ch),
}
hNop.Handle(context.Background(), nil)
assert.Nil(t, <-ch)
hNop.Handle(message.HandleParam{
Ctx: context.Background(),
Message: message.NewImmutableMesasge(walimplstest.NewTestMessageID(123), []byte("payload"), nil),
})
assert.NotNil(t, <-ch)
hNop.Close()
select {
case <-ch:
Expand Down
12 changes: 4 additions & 8 deletions internal/distributed/streaming/internal/consumer/handler.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
package consumer

import (
"context"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
)

type handleFunc func(ctx context.Context, msg message.ImmutableMessage) (bool, error)

// nopCloseHandler is a handler that do nothing when close.
type nopCloseHandler struct {
message.Handler
HandleInterceptor func(ctx context.Context, msg message.ImmutableMessage, handle handleFunc) (bool, error)
HandleInterceptor func(handleParam message.HandleParam, h message.Handler) message.HandleResult
}

// Handle is the callback for handling message.
func (nch nopCloseHandler) Handle(ctx context.Context, msg message.ImmutableMessage) (bool, error) {
func (nch nopCloseHandler) Handle(handleParam message.HandleParam) message.HandleResult {
if nch.HandleInterceptor != nil {
return nch.HandleInterceptor(ctx, msg, nch.Handler.Handle)
return nch.HandleInterceptor(handleParam, nch.Handler)
}
return nch.Handler.Handle(ctx, msg)
return nch.Handler.Handle(handleParam)
}

// Close is called after all messages are handled or handling is interrupted.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package consumer

import (
"context"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
)

Expand All @@ -13,16 +11,20 @@ type timeTickOrderMessageHandler struct {
lastTimeTick uint64
}

func (mh *timeTickOrderMessageHandler) Handle(ctx context.Context, msg message.ImmutableMessage) (bool, error) {
lastConfirmedMessageID := msg.LastConfirmedMessageID()
timetick := msg.TimeTick()
func (mh *timeTickOrderMessageHandler) Handle(handleParam message.HandleParam) message.HandleResult {
var lastConfirmedMessageID message.MessageID
var lastTimeTick uint64
if handleParam.Message != nil {
lastConfirmedMessageID = handleParam.Message.LastConfirmedMessageID()
lastTimeTick = handleParam.Message.TimeTick()
}

ok, err := mh.inner.Handle(ctx, msg)
if ok {
result := mh.inner.Handle(handleParam)
if result.MessageHandled {
mh.lastConfirmedMessageID = lastConfirmedMessageID
mh.lastTimeTick = timetick
mh.lastTimeTick = lastTimeTick
}
return ok, err
return result
}

func (mh *timeTickOrderMessageHandler) Close() {
Expand Down
5 changes: 3 additions & 2 deletions internal/distributed/streaming/internal/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -77,7 +78,7 @@ type ResumableProducer struct {
}

// Produce produce a new message to log service.
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (result *producer.ProduceResult, err error) {
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (result *types.AppendResult, err error) {
if !p.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, errors.Wrapf(errs.ErrClosed, "produce on closed producer")
}
Expand All @@ -94,7 +95,7 @@ func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMess
return nil, err
}

produceResult, err := producerHandler.Produce(ctx, msg)
produceResult, err := producerHandler.Append(ctx, msg)
if err == nil {
return produceResult, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/producer"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)

func TestResumableProducer(t *testing.T) {
p := mock_producer.NewMockProducer(t)
msgID := mock_message.NewMockMessageID(t)
p.EXPECT().Produce(mock.Anything, mock.Anything).Return(&producer.ProduceResult{
p.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.AppendResult{
MessageID: msgID,
TimeTick: 100,
}, nil)
Expand Down Expand Up @@ -47,11 +48,11 @@ func TestResumableProducer(t *testing.T) {
} else if i == 2 {
p := mock_producer.NewMockProducer(t)
msgID := mock_message.NewMockMessageID(t)
p.EXPECT().Produce(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*producer.ProduceResult, error) {
p.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*types.AppendResult, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
return &producer.ProduceResult{
return &types.AppendResult{
MessageID: msgID,
TimeTick: 100,
}, nil
Expand Down
3 changes: 2 additions & 1 deletion internal/distributed/streaming/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -109,7 +110,7 @@ func TestStreamingConsume(t *testing.T) {
t.Skip()
streaming.Init()
defer streaming.Release()
ch := make(message.ChanMessageHandler, 10)
ch := make(adaptor.ChanMessageHandler, 10)
s := streaming.WAL().Read(context.Background(), streaming.ReadOption{
VChannel: vChannels[0],
DeliverPolicy: options.DeliverPolicyAll(),
Expand Down
2 changes: 1 addition & 1 deletion internal/distributed/streaming/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestWAL(t *testing.T) {
return true
}
})
p.EXPECT().Produce(mock.Anything, mock.Anything).Return(&types.AppendResult{
p.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.AppendResult{
MessageID: walimplstest.NewTestMessageID(1),
TimeTick: 10,
TxnCtx: &message.TxnContext{
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/streamingnode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ func (s *Server) init() (err error) {
WithDataCoordClient(s.dataCoord).
WithSession(s.session).
WithMetaKV(s.metaKV).
WithChunkManager(s.chunkManager).
Build()
if err := s.streamingnode.Init(s.ctx); err != nil {
return errors.Wrap(err, "StreamingNode service init failed")
Expand Down
26 changes: 26 additions & 0 deletions internal/flushcommon/pipeline/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package pipeline

import (
"context"
"fmt"
"sync"

"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/compaction"
"github.com/milvus-io/milvus/internal/flushcommon/broker"
"github.com/milvus-io/milvus/internal/flushcommon/io"
Expand Down Expand Up @@ -407,3 +409,27 @@ func NewStreamingNodeDataSyncService(
func NewDataSyncServiceWithMetaCache(metaCache metacache.MetaCache) *DataSyncService {
return &DataSyncService{metacache: metaCache}
}

func NewEmptyStreamingNodeDataSyncService(
initCtx context.Context,
pipelineParams *util.PipelineParams,
input <-chan *msgstream.MsgPack,
vchannelInfo *datapb.VchannelInfo,
schema *schemapb.CollectionSchema,
wbTaskObserverCallback writebuffer.TaskObserverCallback,
dropCallback func(),
) *DataSyncService {
watchInfo := &datapb.ChannelWatchInfo{
Vchan: vchannelInfo,
Schema: schema,
}
metaCache, err := getMetaCacheForStreaming(initCtx, pipelineParams, watchInfo, make([]*datapb.SegmentInfo, 0), make([]*datapb.SegmentInfo, 0))
if err != nil {
panic(fmt.Sprintf("new a empty streaming node data sync service should never be failed, %s", err.Error()))
}
ds, err := getServiceWithChannel(initCtx, pipelineParams, watchInfo, metaCache, make([]*datapb.SegmentInfo, 0), make([]*datapb.SegmentInfo, 0), input, wbTaskObserverCallback, dropCallback)
if err != nil {
panic(fmt.Sprintf("new a empty data sync service should never be failed, %s", err.Error()))
}
return ds
}
6 changes: 5 additions & 1 deletion internal/flushcommon/syncmgr/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type syncManager struct {

tasks *typeutil.ConcurrentMap[string, Task]
taskStats *expirable.LRU[string, Task]
handler config.EventHandler
}

func NewSyncManager(chunkManager storage.ChunkManager) SyncManager {
Expand All @@ -75,7 +76,9 @@ func NewSyncManager(chunkManager storage.ChunkManager) SyncManager {
taskStats: expirable.NewLRU[string, Task](16, nil, time.Minute*15),
}
// setup config update watcher
params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasks.Key, config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler))
handler := config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler)
syncMgr.handler = handler
params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasks.Key, handler)
return syncMgr
}

Expand Down Expand Up @@ -155,6 +158,7 @@ func (mgr *syncManager) TaskStatsJSON() string {
}

func (mgr *syncManager) Close() error {
paramtable.Get().Unwatch(paramtable.Get().DataNodeCfg.MaxParallelSyncMgrTasks.Key, mgr.handler)
timeout := paramtable.Get().CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second)
return mgr.workerPool.ReleaseTimeout(timeout)
}
7 changes: 7 additions & 0 deletions internal/metastore/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,11 @@ type StreamingNodeCataLog interface {

// SaveSegmentAssignments save the segment assignments for the wal.
SaveSegmentAssignments(ctx context.Context, pChannelName string, infos []*streamingpb.SegmentAssignmentMeta) error

// GetConsumeCheckpoint gets the consuming checkpoint of the wal.
// Return nil, nil if the checkpoint is not exist.
GetConsumeCheckpoint(ctx context.Context, pChannelName string) (*streamingpb.WALCheckpoint, error)

// SaveConsumeCheckpoint saves the consuming checkpoint of the wal.
SaveConsumeCheckpoint(ctx context.Context, pChannelName string, checkpoint *streamingpb.WALCheckpoint) error
}
2 changes: 2 additions & 0 deletions internal/metastore/kv/streamingnode/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ const (

DirectoryWAL = "wal"
DirectorySegmentAssign = "segment-assign"

KeyConsumeCheckpoint = "consume-checkpoint"
)
Loading
Loading