Skip to content

Commit

Permalink
fix: add resource key for import broadcast
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored and SimFG committed Jan 23, 2025
1 parent c79c193 commit dacf2c5
Show file tree
Hide file tree
Showing 10 changed files with 367 additions and 47 deletions.
1 change: 1 addition & 0 deletions internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ packages:
interfaces:
WALAccesser:
Utility:
Broadcast:
github.com/milvus-io/milvus/internal/streamingcoord/server/balancer:
interfaces:
Balancer:
Expand Down
11 changes: 10 additions & 1 deletion internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/componentutil"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/proto/internalpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
Expand Down Expand Up @@ -1750,7 +1752,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
return resp, nil
}
if importCollectionInfo == nil {
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("collection %d not found", in.GetCollectionID())))
resp.Status = merr.Status(merr.WrapErrCollectionNotFound(in.GetCollectionID()))
return resp, nil
}

Expand Down Expand Up @@ -1809,6 +1811,13 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("parse job id failed, err=%w", err)))
return resp, nil
}

// Import is asynchronous consumed from the wal, so we need to wait for the wal to release the resource key.
// The job can be seen by the user after the resource key is acked once at any vchannel.
if err := streaming.WAL().Broadcast().BlockUntilResourceKeyAckOnce(ctx, message.NewImportJobIDResourceKey(jobID)); err != nil {
return nil, err
}

job := s.importMeta.GetJob(ctx, jobID)
if job == nil {
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("import job does not exist, jobID=%d", jobID)))
Expand Down
52 changes: 36 additions & 16 deletions internal/datanode/msghandlerimpl/msg_handler_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@ package msghandlerimpl
import (
"context"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/flushcommon/broker"
"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/internalpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/retry"
)

type msgHandlerImpl struct {
Expand All @@ -51,22 +55,38 @@ func (m *msgHandlerImpl) HandleManualFlush(vchannel string, flushMsg message.Imm
}

func (m *msgHandlerImpl) HandleImport(ctx context.Context, vchannel string, importMsg *msgpb.ImportMsg) error {
importResp, err := m.broker.ImportV2(ctx, &internalpb.ImportRequestInternal{
CollectionID: importMsg.GetCollectionID(),
CollectionName: importMsg.GetCollectionName(),
PartitionIDs: importMsg.GetPartitionIDs(),
ChannelNames: []string{vchannel},
Schema: importMsg.GetSchema(),
Files: lo.Map(importMsg.GetFiles(), util.ConvertInternalImportFile),
Options: funcutil.Map2KeyValuePair(importMsg.GetOptions()),
DataTimestamp: importMsg.GetBase().GetTimestamp(),
JobID: importMsg.GetJobID(),
})
if err = merr.CheckRPCCall(importResp, err); err != nil {
return err
}
log.Ctx(ctx).Info("import message handled", zap.String("job_id", importResp.GetJobID()))
return nil
return retry.Do(ctx, func() (err error) {
defer func() {
if err == nil {
err = streaming.WAL().Broadcast().Ack(ctx, types.BroadcastAckRequest{
BroadcastID: uint64(importMsg.GetJobID()),
VChannel: vchannel,
})
}
}()
importResp, err := m.broker.ImportV2(ctx, &internalpb.ImportRequestInternal{
CollectionID: importMsg.GetCollectionID(),
CollectionName: importMsg.GetCollectionName(),
PartitionIDs: importMsg.GetPartitionIDs(),
ChannelNames: []string{vchannel},
Schema: importMsg.GetSchema(),
Files: lo.Map(importMsg.GetFiles(), util.ConvertInternalImportFile),
Options: funcutil.Map2KeyValuePair(importMsg.GetOptions()),
DataTimestamp: importMsg.GetBase().GetTimestamp(),
JobID: importMsg.GetJobID(),
})
err = merr.CheckRPCCall(importResp, err)
if errors.Is(err, merr.ErrCollectionNotFound) {
log.Ctx(ctx).Warn("import message failed because of collection not found, skip it", zap.String("job_id", importResp.GetJobID()), zap.Error(err))
return nil
}
if err != nil {
log.Ctx(ctx).Warn("import message failed", zap.String("job_id", importResp.GetJobID()), zap.Error(err))
return err
}
log.Ctx(ctx).Info("import message handled", zap.String("job_id", importResp.GetJobID()))
return nil
}, retry.AttemptAlways())
}

func NewMsgHandlerImpl(broker broker.Broker) *msgHandlerImpl {
Expand Down
239 changes: 239 additions & 0 deletions internal/mocks/distributed/mock_streaming/mock_Broadcast.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion internal/proxy/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1747,7 +1747,9 @@ func TestProxy_ImportV2(t *testing.T) {
assert.NoError(t, err)

wal := mock_streaming.NewMockWALAccesser(t)
wal.EXPECT().BroadcastAppend(mock.Anything, mock.Anything).Return(&types.BroadcastAppendResult{}, nil)
b := mock_streaming.NewMockBroadcast(t)
wal.EXPECT().Broadcast().Return(b)
b.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.BroadcastAppendResult{}, nil)
streaming.SetWALForTest(wal)
defer streaming.RecoverWALForTest()
rsp, err = node.ImportV2(ctx, &internalpb.ImportRequest{
Expand Down
4 changes: 3 additions & 1 deletion internal/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4706,7 +4706,9 @@ func TestProxy_Import(t *testing.T) {
assert.NoError(t, err)

wal := mock_streaming.NewMockWALAccesser(t)
wal.EXPECT().BroadcastAppend(mock.Anything, mock.Anything).Return(&types.BroadcastAppendResult{}, nil)
b := mock_streaming.NewMockBroadcast(t)
wal.EXPECT().Broadcast().Return(b)
b.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.BroadcastAppendResult{}, nil)
streaming.SetWALForTest(wal)
defer streaming.RecoverWALForTest()

Expand Down
Loading

0 comments on commit dacf2c5

Please sign in to comment.