Skip to content

Commit

Permalink
enhance: erase the rpc level when wal is located at same node
Browse files Browse the repository at this point in the history
- Make the wal scanner interface same with streaming scanner.
- Use wal if the wal is located at current node.
- Otherwise fallback the old logic.

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Jan 2, 2025
1 parent a3dfb70 commit 1e80731
Show file tree
Hide file tree
Showing 38 changed files with 723 additions and 662 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ func (rc *resumableConsumerImpl) resumeLoop() {
// consumer need to resume when error occur, so message handler shouldn't close if the internal consumer encounter failure.
nopCloseMH := nopCloseHandler{
Handler: rc.mh,
HandleInterceptor: func(ctx context.Context, msg message.ImmutableMessage, handle handleFunc) (bool, error) {
g := rc.metrics.StartConsume(msg.EstimateSize())
ok, err := handle(ctx, msg)
g.Finish()
return ok, err
HandleInterceptor: func(handleParam message.HandleParam, h message.Handler) message.HandleResult {
if handleParam.Message != nil {
g := rc.metrics.StartConsume(handleParam.Message.EstimateSize())
defer func() { g.Finish() }()
}
return h.Handle(handleParam)
},
}

Expand Down
43 changes: 25 additions & 18 deletions internal/distributed/streaming/internal/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/consumer"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
)
Expand All @@ -22,22 +23,25 @@ func TestResumableConsumer(t *testing.T) {
ch := make(chan struct{})
c.EXPECT().Done().Return(ch)
c.EXPECT().Error().Return(errors.New("test"))
c.EXPECT().Close().Return()
c.EXPECT().Close().Return(nil)
rc := NewResumableConsumer(func(ctx context.Context, opts *handler.ConsumerOptions) (consumer.Consumer, error) {
if i == 0 {
i++
ok, err := opts.MessageHandler.Handle(context.Background(), message.NewImmutableMesasge(
walimplstest.NewTestMessageID(123),
[]byte("payload"),
map[string]string{
"key": "value",
"_t": "1",
"_tt": message.EncodeUint64(456),
"_v": "1",
"_lc": walimplstest.NewTestMessageID(123).Marshal(),
}))
assert.True(t, ok)
assert.NoError(t, err)
result := opts.MessageHandler.Handle(message.HandleParam{
Ctx: context.Background(),
Message: message.NewImmutableMesasge(
walimplstest.NewTestMessageID(123),
[]byte("payload"),
map[string]string{
"key": "value",
"_t": "1",
"_tt": message.EncodeUint64(456),
"_v": "1",
"_lc": walimplstest.NewTestMessageID(123).Marshal(),
}),
})
assert.True(t, result.MessageHandled)
assert.NoError(t, result.Error)
return c, nil
} else if i == 1 {
i++
Expand All @@ -46,15 +50,15 @@ func TestResumableConsumer(t *testing.T) {
newC := mock_consumer.NewMockConsumer(t)
newC.EXPECT().Done().Return(make(<-chan struct{}))
newC.EXPECT().Error().Return(errors.New("test"))
newC.EXPECT().Close().Return()
newC.EXPECT().Close().Return(nil)
return newC, nil
}, &ConsumerOptions{
PChannel: "test",
DeliverPolicy: options.DeliverPolicyAll(),
DeliverFilters: []options.DeliverFilter{
options.DeliverFilterTimeTickGT(1),
},
MessageHandler: message.ChanMessageHandler(make(chan message.ImmutableMessage, 2)),
MessageHandler: adaptor.ChanMessageHandler(make(chan message.ImmutableMessage, 2)),
})

select {
Expand All @@ -76,10 +80,13 @@ func TestResumableConsumer(t *testing.T) {
func TestHandler(t *testing.T) {
ch := make(chan message.ImmutableMessage, 100)
hNop := nopCloseHandler{
Handler: message.ChanMessageHandler(ch),
Handler: adaptor.ChanMessageHandler(ch),
}
hNop.Handle(context.Background(), nil)
assert.Nil(t, <-ch)
hNop.Handle(message.HandleParam{
Ctx: context.Background(),
Message: message.NewImmutableMesasge(walimplstest.NewTestMessageID(123), []byte("payload"), nil),
})
assert.NotNil(t, <-ch)
hNop.Close()
select {
case <-ch:
Expand Down
12 changes: 4 additions & 8 deletions internal/distributed/streaming/internal/consumer/handler.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
package consumer

import (
"context"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
)

type handleFunc func(ctx context.Context, msg message.ImmutableMessage) (bool, error)

// nopCloseHandler is a handler that do nothing when close.
type nopCloseHandler struct {
message.Handler
HandleInterceptor func(ctx context.Context, msg message.ImmutableMessage, handle handleFunc) (bool, error)
HandleInterceptor func(handleParam message.HandleParam, h message.Handler) message.HandleResult
}

// Handle is the callback for handling message.
func (nch nopCloseHandler) Handle(ctx context.Context, msg message.ImmutableMessage) (bool, error) {
func (nch nopCloseHandler) Handle(handleParam message.HandleParam) message.HandleResult {
if nch.HandleInterceptor != nil {
return nch.HandleInterceptor(ctx, msg, nch.Handler.Handle)
return nch.HandleInterceptor(handleParam, nch.Handler)
}
return nch.Handler.Handle(ctx, msg)
return nch.Handler.Handle(handleParam)
}

// Close is called after all messages are handled or handling is interrupted.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package consumer

import (
"context"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
)

Expand All @@ -13,16 +11,20 @@ type timeTickOrderMessageHandler struct {
lastTimeTick uint64
}

func (mh *timeTickOrderMessageHandler) Handle(ctx context.Context, msg message.ImmutableMessage) (bool, error) {
lastConfirmedMessageID := msg.LastConfirmedMessageID()
timetick := msg.TimeTick()
func (mh *timeTickOrderMessageHandler) Handle(handleParam message.HandleParam) message.HandleResult {
var lastConfirmedMessageID message.MessageID
var lastTimeTick uint64
if handleParam.Message != nil {
lastConfirmedMessageID = handleParam.Message.LastConfirmedMessageID()
lastTimeTick = handleParam.Message.TimeTick()
}

ok, err := mh.inner.Handle(ctx, msg)
if ok {
result := mh.inner.Handle(handleParam)
if result.MessageHandled {
mh.lastConfirmedMessageID = lastConfirmedMessageID
mh.lastTimeTick = timetick
mh.lastTimeTick = lastTimeTick
}
return ok, err
return result
}

func (mh *timeTickOrderMessageHandler) Close() {
Expand Down
5 changes: 3 additions & 2 deletions internal/distributed/streaming/internal/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -77,7 +78,7 @@ type ResumableProducer struct {
}

// Produce produce a new message to log service.
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (result *producer.ProduceResult, err error) {
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (result *types.AppendResult, err error) {
if !p.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, errors.Wrapf(errs.ErrClosed, "produce on closed producer")
}
Expand All @@ -94,7 +95,7 @@ func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMess
return nil, err
}

produceResult, err := producerHandler.Produce(ctx, msg)
produceResult, err := producerHandler.Append(ctx, msg)
if err == nil {
return produceResult, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/producer"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)

func TestResumableProducer(t *testing.T) {
p := mock_producer.NewMockProducer(t)
msgID := mock_message.NewMockMessageID(t)
p.EXPECT().Produce(mock.Anything, mock.Anything).Return(&producer.ProduceResult{
p.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.AppendResult{
MessageID: msgID,
TimeTick: 100,
}, nil)
Expand Down Expand Up @@ -47,11 +48,11 @@ func TestResumableProducer(t *testing.T) {
} else if i == 2 {
p := mock_producer.NewMockProducer(t)
msgID := mock_message.NewMockMessageID(t)
p.EXPECT().Produce(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*producer.ProduceResult, error) {
p.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*types.AppendResult, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
return &producer.ProduceResult{
return &types.AppendResult{
MessageID: msgID,
TimeTick: 100,
}, nil
Expand Down
3 changes: 2 additions & 1 deletion internal/distributed/streaming/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -100,7 +101,7 @@ func TestStreamingConsume(t *testing.T) {
t.Skip()
streaming.Init()
defer streaming.Release()
ch := make(message.ChanMessageHandler, 10)
ch := make(adaptor.ChanMessageHandler, 10)
s := streaming.WAL().Read(context.Background(), streaming.ReadOption{
VChannel: vChannel,
DeliverPolicy: options.DeliverPolicyAll(),
Expand Down
2 changes: 1 addition & 1 deletion internal/distributed/streaming/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestWAL(t *testing.T) {
return true
}
})
p.EXPECT().Produce(mock.Anything, mock.Anything).Return(&types.AppendResult{
p.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.AppendResult{
MessageID: walimplstest.NewTestMessageID(1),
TimeTick: 10,
TxnCtx: &message.TxnContext{
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1e80731

Please sign in to comment.