From 58f5ad70be45ead94a712fc3f251e4685d05b7ab Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 15 Jan 2024 22:00:49 +0800 Subject: [PATCH 1/3] update Signed-off-by: lhy1024 --- pkg/utils/etcdutil/etcdutil.go | 23 ++++++++-- pkg/utils/etcdutil/etcdutil_test.go | 66 ++++++++++++++++++----------- 2 files changed, 62 insertions(+), 27 deletions(-) diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 11c640fe4ef..460dcd4f5d3 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -864,8 +864,6 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision } func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) { - ctx, cancel := context.WithTimeout(ctx, DefaultRequestTimeout) - defer cancel() startKey := lw.key // If limit is 0, it means no limit. // If limit is not 0, we need to add 1 to limit to get the next key. @@ -883,10 +881,29 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) zap.String("key", lw.key), zap.Error(err)) } }() + + // TODO: Convert withPrefix to withRange(endKey) with easier way. + // In most cases, 'Get(foo, WithPrefix())' is equivalent to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'. + // However, when the startKey changes, the two are no longer equivalent. + // For example, the end key for 'WithRange(GetPrefixRangeEnd(foo))' is consistently 'fop'. + // But when using 'Get(foo1, WithPrefix())', the end key becomes 'foo2', not 'fop'. + // In future version, etcd provides IsOptsWithPrefix() to determine whether the WithPrefix is called. + opts := append(lw.opts, + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), + clientv3.WithLimit(limit), + ) + op := clientv3.NewOp() + op.WithKeyBytes([]byte(lw.key)) + for _, opt := range lw.opts { + opt(op) + } + if len(op.RangeBytes()) != 0 { + opts = append(opts, clientv3.WithRange(string(op.RangeBytes()))) + } + for { // Sort by key to get the next key and we don't need to worry about the performance, // Because the default sort is just SortByKey and SortAscend - opts := append(lw.opts, clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), clientv3.WithLimit(limit)) resp, err := clientv3.NewKV(lw.client).Get(ctx, startKey, opts...) if err != nil { log.Error("load failed in watch loop", zap.String("name", lw.name), diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index d415d2d1873..2972c079bb7 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -399,12 +399,7 @@ func (suite *loopWatcherTestSuite) TearDownSuite() { func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { re := suite.Require() - cache := struct { - syncutil.RWMutex - data map[string]struct{} - }{ - data: make(map[string]struct{}), - } + cache := make(map[string]struct{}) watcher := NewLoopWatcher( suite.ctx, &suite.wg, @@ -413,9 +408,7 @@ func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { "TestLoadWithoutKey", func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { - cache.Lock() - defer cache.Unlock() - cache.data[string(kv.Key)] = struct{}{} + cache[string(kv.Key)] = struct{}{} return nil }, func(kv *mvccpb.KeyValue) error { return nil }, @@ -424,9 +417,7 @@ func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { watcher.StartWatchLoop() err := watcher.WaitLoad() re.NoError(err) // although no key, watcher returns no error - cache.RLock() - defer cache.RUnlock() - re.Empty(cache.data) + re.Empty(cache) } func (suite *loopWatcherTestSuite) TestCallBack() { @@ -499,12 +490,7 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() { for i := 0; i < count; i++ { suite.put(re, fmt.Sprintf("TestWatcherLoadLimit%d", i), "") } - cache := struct { - syncutil.RWMutex - data []string - }{ - data: make([]string, 0), - } + cache := make([]string, 0) watcher := NewLoopWatcher( ctx, &suite.wg, @@ -513,9 +499,7 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() { "TestWatcherLoadLimit", func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { - cache.Lock() - defer cache.Unlock() - cache.data = append(cache.data, string(kv.Key)) + cache = append(cache, string(kv.Key)) return nil }, func(kv *mvccpb.KeyValue) error { @@ -526,17 +510,51 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() { }, clientv3.WithPrefix(), ) + watcher.SetLoadBatchSize(int64(limit)) watcher.StartWatchLoop() err := watcher.WaitLoad() re.NoError(err) - cache.RLock() - re.Len(cache.data, count) - cache.RUnlock() + re.Len(cache, count) cancel() } } } +func (suite *loopWatcherTestSuite) TestWatcherLoadLargeKey() { + re := suite.Require() + // use default limit to test 16384 key in etcd + count := 16384 + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + for i := 0; i < count; i++ { + suite.put(re, fmt.Sprintf("TestWatcherLoadLargeKey/test-%d", i), "") + } + cache := make([]string, 0) + watcher := NewLoopWatcher( + ctx, + &suite.wg, + suite.client, + "test", + "TestWatcherLoadLargeKey", + func([]*clientv3.Event) error { return nil }, + func(kv *mvccpb.KeyValue) error { + cache = append(cache, string(kv.Key)) + return nil + }, + func(kv *mvccpb.KeyValue) error { + return nil + }, + func([]*clientv3.Event) error { + return nil + }, + clientv3.WithPrefix(), + ) + watcher.StartWatchLoop() + err := watcher.WaitLoad() + re.NoError(err) + re.Len(cache, count) +} + func (suite *loopWatcherTestSuite) TestWatcherBreak() { re := suite.Require() cache := struct { From b5c4a7181f5879dd2421589761cada035f05f651 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 15 Jan 2024 22:13:12 +0800 Subject: [PATCH 2/3] use clientv3.WithPrefix(), Signed-off-by: lhy1024 --- pkg/keyspace/tso_keyspace_group.go | 3 +-- pkg/tso/keyspace_group_manager.go | 4 +--- pkg/utils/etcdutil/etcdutil.go | 5 ++--- server/keyspace_service.go | 2 +- 4 files changed, 5 insertions(+), 9 deletions(-) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 55c9adf66d9..92bfcbb7357 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -216,7 +216,6 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) { func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) { tsoServiceKey := discovery.TSOPath(clusterID) - tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) putFn := func(kv *mvccpb.KeyValue) error { s := &discovery.ServiceRegistryEntry{} @@ -249,7 +248,7 @@ func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID ui putFn, deleteFn, func([]*clientv3.Event) error { return nil }, - clientv3.WithRange(tsoServiceEndKey), + clientv3.WithPrefix(), ) } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index c48c066a2aa..b4b3f4ee470 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -485,8 +485,6 @@ func (kgm *KeyspaceGroupManager) GetServiceConfig() ServiceConfig { // Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress} // Value: discover.ServiceRegistryEntry func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { - tsoServiceEndKey := clientv3.GetPrefixRangeEnd(kgm.tsoServiceKey) + "/" - putFn := func(kv *mvccpb.KeyValue) error { s := &discovery.ServiceRegistryEntry{} if err := json.Unmarshal(kv.Value, s); err != nil { @@ -518,7 +516,7 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { putFn, deleteFn, func([]*clientv3.Event) error { return nil }, - clientv3.WithRange(tsoServiceEndKey), + clientv3.WithPrefix(), ) kgm.tsoNodesWatcher.StartWatchLoop() if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil { diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 460dcd4f5d3..59a0aa1e12a 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -691,10 +691,9 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 { ) ticker := time.NewTicker(defaultLoadFromEtcdRetryInterval) defer ticker.Stop() - ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout) - defer cancel() - for i := 0; i < lw.loadRetryTimes; i++ { + ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout) + defer cancel() failpoint.Inject("loadTemporaryFail", func(val failpoint.Value) { if maxFailTimes, ok := val.(int); ok && i < maxFailTimes { err = errors.New("fail to read from etcd") diff --git a/server/keyspace_service.go b/server/keyspace_service.go index 11d912a5f54..313d9f04fe8 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -116,7 +116,7 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques putFn, deleteFn, postEventsFn, - clientv3.WithRange(clientv3.GetPrefixRangeEnd(startKey)), + clientv3.WithPrefix(), ) watcher.StartWatchLoop() if err := watcher.WaitLoad(); err != nil { From 00d8112c25e9389e54d717b7351234a00eacef9b Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 15 Jan 2024 22:26:06 +0800 Subject: [PATCH 3/3] add more Signed-off-by: lhy1024 --- pkg/tso/keyspace_group_manager.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index b4b3f4ee470..b92e089e8b8 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -533,9 +533,8 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { // Value: endpoint.KeyspaceGroup func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { rootPath := kgm.legacySvcRootPath - startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(mcsutils.DefaultKeyspaceGroupID)}, "/") - endKey := strings.Join( - []string{rootPath, clientv3.GetPrefixRangeEnd(endpoint.KeyspaceGroupIDPrefix())}, "/") + startKey := strings.Join( + []string{rootPath, endpoint.KeyspaceGroupIDPrefix()}, "/") defaultKGConfigured := false putFn := func(kv *mvccpb.KeyValue) error { @@ -575,7 +574,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { putFn, deleteFn, postEventsFn, - clientv3.WithRange(endKey), + clientv3.WithPrefix(), ) if kgm.loadKeyspaceGroupsTimeout > 0 { kgm.groupWatcher.SetLoadTimeout(kgm.loadKeyspaceGroupsTimeout)