Skip to content

Commit 86ae07d

Browse files
authored
enhance: [2.4] Limit the maximum number of segments restored and fail the job if saving the binlog fails (#39473)
1. Limit the maximum number of restored segments to 1024. 2. Fail the import job if saving binlog fails. 3. Fail the import job if saving the import task fails to prevent repeatedly generating dirty importing segments. 4. Update proto. issue: #39331 pr: #39344 --------- Signed-off-by: bigsheeper <[email protected]>
1 parent 6111a0d commit 86ae07d

12 files changed

+31
-15
lines changed

client/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/blang/semver/v4 v4.0.0
77
github.com/cockroachdb/errors v1.9.1
88
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
9-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21
9+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22
1010
github.com/milvus-io/milvus/pkg v0.0.2-0.20240317152703-17b4938985f3
1111
github.com/quasilyte/go-ruleguard/dsl v0.3.22
1212
github.com/samber/lo v1.27.0

client/go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -400,8 +400,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
400400
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
401401
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
402402
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
403-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21 h1:Imb0uGFp/kyPPI5f6dCne8GCJIceuQWzI1H20p5aa4c=
404-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
403+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22 h1:8CV4LUoo0KEFNSmDyDPkfdTapFrX6SBM64cE7UV5EVY=
404+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
405405
github.com/milvus-io/milvus/pkg v0.0.2-0.20240317152703-17b4938985f3 h1:ZBpRWhBa7FTFxW4YYVv9AUESoW1Xyb3KNXTzTqfkZmw=
406406
github.com/milvus-io/milvus/pkg v0.0.2-0.20240317152703-17b4938985f3/go.mod h1:jQ2BUZny1COsgv1Qbcv8dmbppW+V9J/c4YQZNb3EOm8=
407407
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ require (
2626
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
2727
github.com/klauspost/compress v1.17.9
2828
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
29-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7
29+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22
3030
github.com/minio/minio-go/v7 v7.0.73
3131
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
3232
github.com/prometheus/client_golang v1.14.0

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -612,8 +612,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
612612
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
613613
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
614614
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
615-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7 h1:putot5l1gpiucE4CBrYzLoPCAci/BdYFe76GP15xsg4=
616-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
615+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22 h1:8CV4LUoo0KEFNSmDyDPkfdTapFrX6SBM64cE7UV5EVY=
616+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
617617
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI=
618618
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
619619
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=

internal/datacoord/import_checker.go

+4
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,10 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
235235
err = c.imeta.AddTask(t)
236236
if err != nil {
237237
log.Warn("add new import task failed", WrapTaskLog(t, zap.Error(err))...)
238+
updateErr := c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
239+
if updateErr != nil {
240+
log.Warn("failed to update job state to Failed", zap.Error(updateErr))
241+
}
238242
return
239243
}
240244
log.Info("add new import task", WrapTaskLog(t)...)

internal/datacoord/import_checker_test.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -258,13 +258,17 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() {
258258

259259
catalog.ExpectedCalls = nil
260260
catalog.EXPECT().SaveImportTask(mock.Anything).Return(mockErr)
261+
catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil)
261262
s.checker.checkPreImportingJob(job)
262263
importTasks := s.imeta.GetTaskBy(WithJob(job.GetJobID()), WithType(ImportTaskType))
263264
s.Equal(0, len(importTasks))
264-
s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(job.GetJobID()).GetState())
265+
s.Equal(internalpb.ImportJobState_Failed, s.imeta.GetJob(job.GetJobID()).GetState())
265266

266267
alloc.ExpectedCalls = nil
267268
alloc.EXPECT().allocN(mock.Anything).Return(0, 0, mockErr)
269+
err := s.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
270+
s.NoError(err)
271+
s.checker.checkPreImportingJob(job)
268272
importTasks = s.imeta.GetTaskBy(WithJob(job.GetJobID()), WithType(ImportTaskType))
269273
s.Equal(0, len(importTasks))
270274
s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(job.GetJobID()).GetState())

internal/datacoord/import_scheduler.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,7 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
279279
return
280280
}
281281
if resp.GetState() == datapb.ImportTaskStateV2_Failed {
282-
err = s.imeta.UpdateJob(task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
283-
UpdateJobReason(resp.GetReason()))
282+
err = s.imeta.UpdateJob(task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(resp.GetReason()))
284283
if err != nil {
285284
log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(err))
286285
}
@@ -325,7 +324,11 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
325324
op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed)
326325
err = s.meta.UpdateSegmentsInfo(op1, op2)
327326
if err != nil {
328-
log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.Error(err))...)
327+
updateErr := s.imeta.UpdateJob(task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
328+
if updateErr != nil {
329+
log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(updateErr))
330+
}
331+
log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.String("err", err.Error()))...)
329332
return
330333
}
331334
select {

internal/datacoord/services.go

+5
Original file line numberDiff line numberDiff line change
@@ -1672,6 +1672,11 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
16721672
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("no binlog to import, input=%s", in.GetFiles())))
16731673
return resp, nil
16741674
}
1675+
if len(files) > paramtable.Get().DataCoordCfg.MaxFilesPerImportReq.GetAsInt() {
1676+
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("The max number of import files should not exceed %d, but got %d",
1677+
paramtable.Get().DataCoordCfg.MaxFilesPerImportReq.GetAsInt(), len(files))))
1678+
return resp, nil
1679+
}
16751680
log.Info("list binlogs prefixes for import", zap.Any("binlog_prefixes", files))
16761681
}
16771682

internal/proxy/task_insert.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
139139
if it.schemaTimestamp != 0 {
140140
if it.schemaTimestamp != colInfo.updateTimestamp {
141141
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
142-
log.Ctx(ctx).Warn("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
142+
log.Ctx(ctx).Info("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
143143
return err
144144
}
145145
}

internal/proxy/task_upsert.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
306306
if it.schemaTimestamp != 0 {
307307
if it.schemaTimestamp != colInfo.updateTimestamp {
308308
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
309-
log.Warn("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
309+
log.Info("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
310310
return err
311311
}
312312
}

pkg/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ require (
1212
github.com/expr-lang/expr v1.15.7
1313
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
1414
github.com/klauspost/compress v1.17.7
15-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7
15+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22
1616
github.com/nats-io/nats-server/v2 v2.10.12
1717
github.com/nats-io/nats.go v1.34.1
1818
github.com/panjf2000/ants/v2 v2.7.2

pkg/go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -503,8 +503,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
503503
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
504504
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
505505
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
506-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7 h1:putot5l1gpiucE4CBrYzLoPCAci/BdYFe76GP15xsg4=
507-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
506+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22 h1:8CV4LUoo0KEFNSmDyDPkfdTapFrX6SBM64cE7UV5EVY=
507+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
508508
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
509509
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
510510
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=

0 commit comments

Comments
 (0)