Skip to content

Commit 27fb8d9

Browse files
authored
fix: create multiple identical indexes by accident (#40180)
issue: #40163 pr: #40179 --------- Signed-off-by: zhenshan.cao <[email protected]>
1 parent eee98fd commit 27fb8d9

7 files changed

+127
-166
lines changed

internal/datacoord/compaction_task_clustering_test.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -660,13 +660,12 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
660660
s.Run("collection has index, segment is not indexed", func() {
661661
task := s.generateBasicTask(false)
662662
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
663-
index := &model.Index{
663+
664+
indexReq := &indexpb.CreateIndexRequest{
664665
CollectionID: 1,
665-
IndexID: 3,
666666
}
667-
668667
task.updateAndSaveTaskMeta(setResultSegments([]int64{10, 11}))
669-
err := s.meta.indexMeta.CreateIndex(context.TODO(), index)
668+
_, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3)
670669
s.NoError(err)
671670

672671
s.False(task.Process())
@@ -676,11 +675,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
676675
s.Run("collection has index, segment indexed", func() {
677676
task := s.generateBasicTask(false)
678677
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
679-
index := &model.Index{
678+
indexReq := &indexpb.CreateIndexRequest{
680679
CollectionID: 1,
681-
IndexID: 3,
682680
}
683-
err := s.meta.indexMeta.CreateIndex(context.TODO(), index)
681+
_, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3)
684682
s.NoError(err)
685683

686684
s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{

internal/datacoord/handler_test.go

+27-31
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
mocks2 "github.com/milvus-io/milvus/internal/mocks"
1717
"github.com/milvus-io/milvus/pkg/v2/common"
1818
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
19+
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
1920
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
2021
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
2122
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
@@ -40,11 +41,11 @@ func TestGetQueryVChanPositionsRetrieveM2N(t *testing.T) {
4041
},
4142
},
4243
})
43-
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
44+
indexReq := &indexpb.CreateIndexRequest{
4445
CollectionID: 1,
4546
FieldID: 2,
46-
IndexID: 1,
47-
})
47+
}
48+
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
4849
require.NoError(t, err)
4950

5051
segArgs := []struct {
@@ -152,12 +153,12 @@ func TestGetQueryVChanPositions(t *testing.T) {
152153
},
153154
})
154155

155-
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
156-
TenantID: "",
156+
indexReq := &indexpb.CreateIndexRequest{
157157
CollectionID: 0,
158158
FieldID: 2,
159-
IndexID: 1,
160-
})
159+
}
160+
161+
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
161162
assert.NoError(t, err)
162163

163164
s1 := &datapb.SegmentInfo{
@@ -331,12 +332,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
331332
ID: 0,
332333
Schema: schema,
333334
})
334-
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
335-
TenantID: "",
335+
indexReq := &indexpb.CreateIndexRequest{
336336
CollectionID: 0,
337337
FieldID: 2,
338-
IndexID: 1,
339-
})
338+
}
339+
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
340340
assert.NoError(t, err)
341341
c := &datapb.SegmentInfo{
342342
ID: 1,
@@ -401,12 +401,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
401401
ID: 0,
402402
Schema: schema,
403403
})
404-
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
405-
TenantID: "",
404+
indexReq := &indexpb.CreateIndexRequest{
406405
CollectionID: 0,
407406
FieldID: 2,
408-
IndexID: 1,
409-
})
407+
}
408+
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
410409
assert.NoError(t, err)
411410
a := &datapb.SegmentInfo{
412411
ID: 99,
@@ -487,12 +486,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
487486
ID: 0,
488487
Schema: schema,
489488
})
490-
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
491-
TenantID: "",
489+
indexReq := &indexpb.CreateIndexRequest{
492490
CollectionID: 0,
493491
FieldID: 2,
494-
IndexID: 1,
495-
})
492+
}
493+
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
496494
assert.NoError(t, err)
497495
c := &datapb.SegmentInfo{
498496
ID: 1,
@@ -597,12 +595,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
597595
Partitions: []int64{0},
598596
Schema: schema,
599597
})
600-
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
601-
TenantID: "",
598+
indexReq := &indexpb.CreateIndexRequest{
602599
CollectionID: 0,
603600
FieldID: 2,
604-
IndexID: 1,
605-
})
601+
}
602+
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
606603
assert.NoError(t, err)
607604
seg1 := &datapb.SegmentInfo{
608605
ID: 1,
@@ -978,12 +975,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
978975
Partitions: []int64{0},
979976
Schema: schema,
980977
})
981-
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
982-
TenantID: "",
978+
indexReq := &indexpb.CreateIndexRequest{
983979
CollectionID: 0,
984980
FieldID: 2,
985-
IndexID: 1,
986-
})
981+
}
982+
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
987983
assert.NoError(t, err)
988984
seg1 := &datapb.SegmentInfo{
989985
ID: 1,
@@ -1182,12 +1178,12 @@ func TestGetCurrentSegmentsView(t *testing.T) {
11821178
Partitions: []int64{0},
11831179
Schema: schema,
11841180
})
1185-
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
1186-
TenantID: "",
1181+
1182+
indexReq := &indexpb.CreateIndexRequest{
11871183
CollectionID: 0,
11881184
FieldID: 2,
1189-
IndexID: 1,
1190-
})
1185+
}
1186+
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
11911187
assert.NoError(t, err)
11921188
seg1 := &datapb.SegmentInfo{
11931189
ID: 1,

internal/datacoord/index_meta.go

+44-9
Original file line numberDiff line numberDiff line change
@@ -326,10 +326,14 @@ func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool
326326
return !notEq
327327
}
328328

329+
// CanCreateIndex currently is used in Unittest
329330
func (m *indexMeta) CanCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, error) {
330331
m.RLock()
331332
defer m.RUnlock()
333+
return m.canCreateIndex(req)
334+
}
332335

336+
func (m *indexMeta) canCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, error) {
333337
indexes, ok := m.indexes[req.CollectionID]
334338
if !ok {
335339
return 0, nil
@@ -384,23 +388,51 @@ func (m *indexMeta) HasSameReq(req *indexpb.CreateIndexRequest) (bool, UniqueID)
384388
return false, 0
385389
}
386390

387-
func (m *indexMeta) CreateIndex(ctx context.Context, index *model.Index) error {
388-
log.Ctx(ctx).Info("meta update: CreateIndex", zap.Int64("collectionID", index.CollectionID),
389-
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName))
391+
func (m *indexMeta) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest, allocatedIndexID UniqueID) (UniqueID, error) {
390392
m.Lock()
391393
defer m.Unlock()
392394

395+
indexID, err := m.canCreateIndex(req)
396+
if err != nil {
397+
return indexID, err
398+
}
399+
400+
if indexID == 0 {
401+
indexID = allocatedIndexID
402+
} else {
403+
return indexID, nil
404+
}
405+
406+
// exclude the mmap.enable param, because it will be conflicted with the index's mmap.enable param
407+
typeParams := DeleteParams(req.GetTypeParams(), []string{common.MmapEnabledKey})
408+
index := &model.Index{
409+
CollectionID: req.GetCollectionID(),
410+
FieldID: req.GetFieldID(),
411+
IndexID: indexID,
412+
IndexName: req.GetIndexName(),
413+
TypeParams: typeParams,
414+
IndexParams: req.GetIndexParams(),
415+
CreateTime: req.GetTimestamp(),
416+
IsAutoIndex: req.GetIsAutoIndex(),
417+
UserIndexParams: req.GetUserIndexParams(),
418+
}
419+
if err := ValidateIndexParams(index); err != nil {
420+
return indexID, err
421+
}
422+
log.Ctx(ctx).Info("meta update: CreateIndex", zap.Int64("collectionID", index.CollectionID),
423+
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName))
424+
393425
if err := m.catalog.CreateIndex(ctx, index); err != nil {
394426
log.Ctx(ctx).Error("meta update: CreateIndex save meta fail", zap.Int64("collectionID", index.CollectionID),
395427
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID),
396428
zap.String("indexName", index.IndexName), zap.Error(err))
397-
return err
429+
return indexID, err
398430
}
399431

400432
m.updateCollectionIndex(index)
401433
log.Ctx(ctx).Info("meta update: CreateIndex success", zap.Int64("collectionID", index.CollectionID),
402434
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName))
403-
return nil
435+
return indexID, nil
404436
}
405437

406438
func (m *indexMeta) AlterIndex(ctx context.Context, indexes ...*model.Index) error {
@@ -553,10 +585,7 @@ func (m *indexMeta) GetIndexesForCollection(collID UniqueID, indexName string) [
553585
return indexInfos
554586
}
555587

556-
func (m *indexMeta) GetFieldIndexes(collID, fieldID UniqueID, indexName string) []*model.Index {
557-
m.RLock()
558-
defer m.RUnlock()
559-
588+
func (m *indexMeta) getFieldIndexes(collID, fieldID UniqueID, indexName string) []*model.Index {
560589
indexInfos := make([]*model.Index, 0)
561590
for _, index := range m.indexes[collID] {
562591
if index.IsDeleted || index.FieldID != fieldID {
@@ -569,6 +598,12 @@ func (m *indexMeta) GetFieldIndexes(collID, fieldID UniqueID, indexName string)
569598
return indexInfos
570599
}
571600

601+
func (m *indexMeta) GetFieldIndexes(collID, fieldID UniqueID, indexName string) []*model.Index {
602+
m.RLock()
603+
defer m.RUnlock()
604+
return m.getFieldIndexes(collID, fieldID, indexName)
605+
}
606+
572607
// MarkIndexAsDeleted will mark the corresponding index as deleted, and recycleUnusedIndexFiles will recycle these tasks.
573608
func (m *indexMeta) MarkIndexAsDeleted(ctx context.Context, collID UniqueID, indexIDs []UniqueID) error {
574609
log.Ctx(ctx).Info("IndexCoord metaTable MarkIndexAsDeleted", zap.Int64("collectionID", collID),

internal/datacoord/index_meta_test.go

+16-29
Original file line numberDiff line numberDiff line change
@@ -269,21 +269,8 @@ func TestMeta_CanCreateIndex(t *testing.T) {
269269
tmpIndexID, err := m.CanCreateIndex(req)
270270
assert.NoError(t, err)
271271
assert.Equal(t, int64(0), tmpIndexID)
272-
index := &model.Index{
273-
TenantID: "",
274-
CollectionID: collID,
275-
FieldID: fieldID,
276-
IndexID: indexID,
277-
IndexName: indexName,
278-
IsDeleted: false,
279-
CreateTime: 0,
280-
TypeParams: typeParams,
281-
IndexParams: indexParams,
282-
IsAutoIndex: false,
283-
UserIndexParams: userIndexParams,
284-
}
285272

286-
err = m.CreateIndex(context.TODO(), index)
273+
_, err = m.CreateIndex(context.TODO(), req, indexID)
287274
assert.NoError(t, err)
288275

289276
tmpIndexID, err = m.CanCreateIndex(req)
@@ -458,21 +445,21 @@ func TestMeta_CreateIndex(t *testing.T) {
458445
Value: "FLAT",
459446
},
460447
}
461-
index := &model.Index{
462-
TenantID: "",
463-
CollectionID: 1,
464-
FieldID: 2,
465-
IndexID: 3,
466-
IndexName: "_default_idx",
467-
IsDeleted: false,
468-
CreateTime: 12,
469-
TypeParams: []*commonpb.KeyValuePair{
470-
{
471-
Key: common.DimKey,
472-
Value: "128",
473-
},
448+
449+
typeParams := []*commonpb.KeyValuePair{
450+
{
451+
Key: common.DimKey,
452+
Value: "128",
474453
},
454+
}
455+
456+
req := &indexpb.CreateIndexRequest{
457+
CollectionID: 1,
458+
FieldID: 2,
459+
IndexName: indexName,
460+
TypeParams: typeParams,
475461
IndexParams: indexParams,
462+
Timestamp: 12,
476463
IsAutoIndex: false,
477464
UserIndexParams: indexParams,
478465
}
@@ -485,7 +472,7 @@ func TestMeta_CreateIndex(t *testing.T) {
485472
).Return(nil)
486473

487474
m := newSegmentIndexMeta(sc)
488-
err := m.CreateIndex(context.TODO(), index)
475+
_, err := m.CreateIndex(context.TODO(), req, 3)
489476
assert.NoError(t, err)
490477
})
491478

@@ -497,7 +484,7 @@ func TestMeta_CreateIndex(t *testing.T) {
497484
).Return(errors.New("fail"))
498485

499486
m := newSegmentIndexMeta(ec)
500-
err := m.CreateIndex(context.TODO(), index)
487+
_, err := m.CreateIndex(context.TODO(), req, 4)
501488
assert.Error(t, err)
502489
})
503490
}

0 commit comments

Comments
 (0)