Skip to content

Commit 6111a0d

Browse files
authored
enhance: [2.4]Add schema update time verification for insert and upsert to use cache (#39405)
enhance: Add schema update time verification for insert and upsert to use cache issue: #39093 Related to pr: #39096 Signed-off-by: Xianhui.Lin <[email protected]>
1 parent d07ae21 commit 6111a0d

19 files changed

+233
-150
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ require (
2626
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
2727
github.com/klauspost/compress v1.17.9
2828
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
29-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21
29+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7
3030
github.com/minio/minio-go/v7 v7.0.73
3131
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
3232
github.com/prometheus/client_golang v1.14.0

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -612,8 +612,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
612612
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
613613
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
614614
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
615-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21 h1:Imb0uGFp/kyPPI5f6dCne8GCJIceuQWzI1H20p5aa4c=
616-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
615+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7 h1:putot5l1gpiucE4CBrYzLoPCAci/BdYFe76GP15xsg4=
616+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
617617
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI=
618618
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
619619
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=

internal/metastore/model/collection.go

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Collection struct {
2828
Properties []*commonpb.KeyValuePair
2929
State pb.CollectionState
3030
EnableDynamicField bool
31+
UpdateTimestamp uint64
3132
}
3233

3334
func (c *Collection) Available() bool {
@@ -54,6 +55,7 @@ func (c *Collection) Clone() *Collection {
5455
Properties: common.CloneKeyValuePairs(c.Properties),
5556
State: c.State,
5657
EnableDynamicField: c.EnableDynamicField,
58+
UpdateTimestamp: c.UpdateTimestamp,
5759
}
5860
}
5961

@@ -110,6 +112,7 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection {
110112
State: coll.State,
111113
Properties: coll.Properties,
112114
EnableDynamicField: coll.Schema.EnableDynamicField,
115+
UpdateTimestamp: coll.UpdateTimestamp,
113116
}
114117
}
115118

@@ -171,6 +174,7 @@ func marshalCollectionModelWithConfig(coll *Collection, c *config) *pb.Collectio
171174
StartPositions: coll.StartPositions,
172175
State: coll.State,
173176
Properties: coll.Properties,
177+
UpdateTimestamp: coll.UpdateTimestamp,
174178
}
175179

176180
if c.withPartitions {

internal/proxy/impl.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -2610,10 +2610,11 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
26102610
Version: msgpb.InsertDataVersion_ColumnBased,
26112611
},
26122612
},
2613-
idAllocator: node.rowIDAllocator,
2614-
segIDAssigner: node.segAssigner,
2615-
chMgr: node.chMgr,
2616-
chTicker: node.chTicker,
2613+
idAllocator: node.rowIDAllocator,
2614+
segIDAssigner: node.segAssigner,
2615+
chMgr: node.chMgr,
2616+
chTicker: node.chTicker,
2617+
schemaTimestamp: request.SchemaTimestamp,
26172618
}
26182619

26192620
constructFailedResponse := func(err error) *milvuspb.MutationResult {
@@ -2847,10 +2848,11 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
28472848
},
28482849
},
28492850

2850-
idAllocator: node.rowIDAllocator,
2851-
segIDAssigner: node.segAssigner,
2852-
chMgr: node.chMgr,
2853-
chTicker: node.chTicker,
2851+
idAllocator: node.rowIDAllocator,
2852+
segIDAssigner: node.segAssigner,
2853+
chMgr: node.chMgr,
2854+
chTicker: node.chTicker,
2855+
schemaTimestamp: request.SchemaTimestamp,
28542856
}
28552857

28562858
log.Debug("Enqueue upsert request in Proxy",

internal/proxy/meta_cache.go

+2
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ type collectionInfo struct {
103103
createdUtcTimestamp uint64
104104
consistencyLevel commonpb.ConsistencyLevel
105105
partitionKeyIsolation bool
106+
updateTimestamp uint64
106107
}
107108

108109
type databaseInfo struct {
@@ -478,6 +479,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
478479
createdUtcTimestamp: collection.CreatedUtcTimestamp,
479480
consistencyLevel: collection.ConsistencyLevel,
480481
partitionKeyIsolation: isolation,
482+
updateTimestamp: collection.UpdateTimestamp,
481483
}
482484

483485
log.Ctx(ctx).Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName),

internal/proxy/task.go

+1
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error {
667667
t.result.Properties = result.Properties
668668
t.result.DbName = result.GetDbName()
669669
t.result.NumPartitions = result.NumPartitions
670+
t.result.UpdateTimestamp = result.UpdateTimestamp
670671
for _, field := range result.Schema.Fields {
671672
if field.IsDynamic {
672673
continue

internal/proxy/task_insert.go

+28-9
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,16 @@ type insertTask struct {
2929
insertMsg *BaseInsertTask
3030
ctx context.Context
3131

32-
result *milvuspb.MutationResult
33-
idAllocator *allocator.IDAllocator
34-
segIDAssigner *segIDAssigner
35-
chMgr channelsMgr
36-
chTicker channelsTimeTicker
37-
vChannels []vChan
38-
pChannels []pChan
39-
schema *schemapb.CollectionSchema
40-
partitionKeys *schemapb.FieldData
32+
result *milvuspb.MutationResult
33+
idAllocator *allocator.IDAllocator
34+
segIDAssigner *segIDAssigner
35+
chMgr channelsMgr
36+
chTicker channelsTimeTicker
37+
vChannels []vChan
38+
pChannels []pChan
39+
schema *schemapb.CollectionSchema
40+
partitionKeys *schemapb.FieldData
41+
schemaTimestamp uint64
4142
}
4243

4344
// TraceCtx returns insertTask context
@@ -125,6 +126,24 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
125126
return merr.WrapErrAsInputError(merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize"))
126127
}
127128

129+
collID, err := globalMetaCache.GetCollectionID(context.Background(), it.insertMsg.GetDbName(), collectionName)
130+
if err != nil {
131+
log.Ctx(ctx).Warn("fail to get collection id", zap.Error(err))
132+
return err
133+
}
134+
colInfo, err := globalMetaCache.GetCollectionInfo(ctx, it.insertMsg.GetDbName(), collectionName, collID)
135+
if err != nil {
136+
log.Ctx(ctx).Warn("fail to get collection info", zap.Error(err))
137+
return err
138+
}
139+
if it.schemaTimestamp != 0 {
140+
if it.schemaTimestamp != colInfo.updateTimestamp {
141+
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
142+
log.Ctx(ctx).Warn("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
143+
return err
144+
}
145+
}
146+
128147
schema, err := globalMetaCache.GetCollectionSchema(ctx, it.insertMsg.GetDbName(), collectionName)
129148
if err != nil {
130149
log.Warn("get collection schema from global meta cache failed", zap.String("collectionName", collectionName), zap.Error(err))

internal/proxy/task_upsert.go

+20-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ type upsertTask struct {
6565
partitionKeys *schemapb.FieldData
6666
// automatic generate pk as new pk wehen autoID == true
6767
// delete task need use the oldIds
68-
oldIds *schemapb.IDs
68+
oldIds *schemapb.IDs
69+
schemaTimestamp uint64
6970
}
7071

7172
// TraceCtx returns upsertTask context
@@ -292,6 +293,24 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
292293
Timestamp: it.EndTs(),
293294
}
294295

296+
collID, err := globalMetaCache.GetCollectionID(context.Background(), it.req.GetDbName(), collectionName)
297+
if err != nil {
298+
log.Warn("fail to get collection id", zap.Error(err))
299+
return err
300+
}
301+
colInfo, err := globalMetaCache.GetCollectionInfo(ctx, it.req.GetDbName(), collectionName, collID)
302+
if err != nil {
303+
log.Warn("fail to get collection info", zap.Error(err))
304+
return err
305+
}
306+
if it.schemaTimestamp != 0 {
307+
if it.schemaTimestamp != colInfo.updateTimestamp {
308+
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
309+
log.Warn("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
310+
return err
311+
}
312+
}
313+
295314
schema, err := globalMetaCache.GetCollectionSchema(ctx, it.req.GetDbName(), collectionName)
296315
if err != nil {
297316
log.Warn("Failed to get collection schema",

internal/rootcoord/alter_collection_task.go

+12
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error {
7676
}
7777

7878
ts := a.GetTs()
79+
80+
tso, err := a.core.tsoAllocator.GenerateTSO(1)
81+
if err == nil {
82+
newColl.UpdateTimestamp = tso
83+
}
84+
7985
redoTask := newBaseRedoTask(a.core.stepExecutor)
8086
redoTask.AddSyncStep(&AlterCollectionStep{
8187
baseStep: baseStep{core: a.core},
@@ -185,6 +191,12 @@ func (a *alterCollectionFieldTask) Execute(ctx context.Context) error {
185191
return err
186192
}
187193
ts := a.GetTs()
194+
195+
tso, err := a.core.tsoAllocator.GenerateTSO(1)
196+
if err == nil {
197+
newColl.UpdateTimestamp = tso
198+
}
199+
188200
redoTask := newBaseRedoTask(a.core.stepExecutor)
189201
redoTask.AddSyncStep(&AlterCollectionStep{
190202
baseStep: baseStep{core: a.core},

internal/rootcoord/alter_collection_task_test.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
9494
).Return(errors.New("err"))
9595
meta.On("ListAliasesByID", mock.Anything).Return([]string{})
9696

97-
core := newTestCore(withValidProxyManager(), withMeta(meta))
97+
core := newTestCore(withValidProxyManager(), withMeta(meta), withInvalidTsoAllocator())
9898
task := &alterCollectionTask{
9999
baseTask: newBaseTask(context.Background(), core),
100100
Req: &milvuspb.AlterCollectionRequest{
@@ -129,7 +129,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
129129
return errors.New("err")
130130
}
131131

132-
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker))
132+
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withInvalidTsoAllocator())
133133
task := &alterCollectionTask{
134134
baseTask: newBaseTask(context.Background(), core),
135135
Req: &milvuspb.AlterCollectionRequest{
@@ -164,7 +164,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
164164
return errors.New("err")
165165
}
166166

167-
core := newTestCore(withInvalidProxyManager(), withMeta(meta), withBroker(broker))
167+
core := newTestCore(withInvalidProxyManager(), withMeta(meta), withBroker(broker), withInvalidTsoAllocator())
168168
task := &alterCollectionTask{
169169
baseTask: newBaseTask(context.Background(), core),
170170
Req: &milvuspb.AlterCollectionRequest{
@@ -198,7 +198,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
198198
},
199199
},
200200
}, nil)
201-
core := newTestCore(withValidProxyManager(), withMeta(meta))
201+
core := newTestCore(withValidProxyManager(), withMeta(meta), withInvalidTsoAllocator())
202202
task := &alterCollectionTask{
203203
baseTask: newBaseTask(context.Background(), core),
204204
Req: &milvuspb.AlterCollectionRequest{
@@ -238,7 +238,8 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
238238
return nil
239239
}
240240

241-
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker))
241+
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withInvalidTsoAllocator())
242+
242243
task := &alterCollectionTask{
243244
baseTask: newBaseTask(context.Background(), core),
244245
Req: &milvuspb.AlterCollectionRequest{

internal/rootcoord/create_collection_task.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,6 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
474474
State: pb.PartitionState_PartitionCreated,
475475
}
476476
}
477-
478477
collInfo := model.Collection{
479478
CollectionID: collID,
480479
DBID: t.dbID,
@@ -492,6 +491,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
492491
Partitions: partitions,
493492
Properties: t.Req.Properties,
494493
EnableDynamicField: t.schema.EnableDynamicField,
494+
UpdateTimestamp: ts,
495495
}
496496

497497
// We cannot check the idempotency inside meta table when adding collection, since we'll execute duplicate steps

internal/rootcoord/root_coord.go

+1
Original file line numberDiff line numberDiff line change
@@ -1178,6 +1178,7 @@ func convertModelToDesc(collInfo *model.Collection, aliases []string, dbName str
11781178
resp.Properties = collInfo.Properties
11791179
resp.NumPartitions = int64(len(collInfo.Partitions))
11801180
resp.DbId = collInfo.DBID
1181+
resp.UpdateTimestamp = collInfo.UpdateTimestamp
11811182
return resp
11821183
}
11831184

pkg/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ require (
1212
github.com/expr-lang/expr v1.15.7
1313
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
1414
github.com/klauspost/compress v1.17.7
15-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21
15+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7
1616
github.com/nats-io/nats-server/v2 v2.10.12
1717
github.com/nats-io/nats.go v1.34.1
1818
github.com/panjf2000/ants/v2 v2.7.2

pkg/go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -503,8 +503,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
503503
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
504504
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
505505
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
506-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21 h1:Imb0uGFp/kyPPI5f6dCne8GCJIceuQWzI1H20p5aa4c=
507-
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
506+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7 h1:putot5l1gpiucE4CBrYzLoPCAci/BdYFe76GP15xsg4=
507+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
508508
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
509509
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
510510
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=

pkg/proto/etcd_meta.proto

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ message CollectionInfo {
6969
CollectionState state = 13; // To keep compatible with older version, default state is `Created`.
7070
repeated common.KeyValuePair properties = 14;
7171
int64 db_id = 15;
72+
uint64 UpdateTimestamp = 16;
7273
}
7374

7475
message PartitionInfo {

0 commit comments

Comments
 (0)