Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix etcdutil2 #34

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {

func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) {
tsoServiceKey := discovery.TSOPath(clusterID)
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey)

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
Expand Down Expand Up @@ -249,7 +248,7 @@ func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID ui
putFn,
deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithRange(tsoServiceEndKey),
clientv3.WithPrefix(),
)
}

Expand Down
11 changes: 4 additions & 7 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,6 @@ func (kgm *KeyspaceGroupManager) GetServiceConfig() ServiceConfig {
// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress}
// Value: discover.ServiceRegistryEntry
func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(kgm.tsoServiceKey) + "/"

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(kv.Value, s); err != nil {
Expand Down Expand Up @@ -518,7 +516,7 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
putFn,
deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithRange(tsoServiceEndKey),
clientv3.WithPrefix(),
)
kgm.tsoNodesWatcher.StartWatchLoop()
if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil {
Expand All @@ -535,9 +533,8 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
// Value: endpoint.KeyspaceGroup
func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error {
rootPath := kgm.legacySvcRootPath
startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(mcsutils.DefaultKeyspaceGroupID)}, "/")
endKey := strings.Join(
[]string{rootPath, clientv3.GetPrefixRangeEnd(endpoint.KeyspaceGroupIDPrefix())}, "/")
startKey := strings.Join(
[]string{rootPath, endpoint.KeyspaceGroupIDPrefix()}, "/")

defaultKGConfigured := false
putFn := func(kv *mvccpb.KeyValue) error {
Expand Down Expand Up @@ -577,7 +574,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error {
putFn,
deleteFn,
postEventsFn,
clientv3.WithRange(endKey),
clientv3.WithPrefix(),
)
if kgm.loadKeyspaceGroupsTimeout > 0 {
kgm.groupWatcher.SetLoadTimeout(kgm.loadKeyspaceGroupsTimeout)
Expand Down
28 changes: 22 additions & 6 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,10 +691,9 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 {
)
ticker := time.NewTicker(defaultLoadFromEtcdRetryInterval)
defer ticker.Stop()
ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout)
defer cancel()

for i := 0; i < lw.loadRetryTimes; i++ {
ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout)
defer cancel()
failpoint.Inject("loadTemporaryFail", func(val failpoint.Value) {
if maxFailTimes, ok := val.(int); ok && i < maxFailTimes {
err = errors.New("fail to read from etcd")
Expand Down Expand Up @@ -864,8 +863,6 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
}

func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) {
ctx, cancel := context.WithTimeout(ctx, DefaultRequestTimeout)
defer cancel()
startKey := lw.key
// If limit is 0, it means no limit.
// If limit is not 0, we need to add 1 to limit to get the next key.
Expand All @@ -883,10 +880,29 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error)
zap.String("key", lw.key), zap.Error(err))
}
}()

// TODO: Convert withPrefix to withRange(endKey) with easier way.
// In most cases, 'Get(foo, WithPrefix())' is equivalent to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
// However, when the startKey changes, the two are no longer equivalent.
// For example, the end key for 'WithRange(GetPrefixRangeEnd(foo))' is consistently 'fop'.
// But when using 'Get(foo1, WithPrefix())', the end key becomes 'foo2', not 'fop'.
// In future version, etcd provides IsOptsWithPrefix() to determine whether the WithPrefix is called.
opts := append(lw.opts,
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(limit),
)
op := clientv3.NewOp()
op.WithKeyBytes([]byte(lw.key))
for _, opt := range lw.opts {
opt(op)
}
if len(op.RangeBytes()) != 0 {
opts = append(opts, clientv3.WithRange(string(op.RangeBytes())))
}

for {
// Sort by key to get the next key and we don't need to worry about the performance,
// Because the default sort is just SortByKey and SortAscend
opts := append(lw.opts, clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), clientv3.WithLimit(limit))
resp, err := clientv3.NewKV(lw.client).Get(ctx, startKey, opts...)
if err != nil {
log.Error("load failed in watch loop", zap.String("name", lw.name),
Expand Down
66 changes: 42 additions & 24 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,7 @@ func (suite *loopWatcherTestSuite) TearDownSuite() {

func (suite *loopWatcherTestSuite) TestLoadWithoutKey() {
re := suite.Require()
cache := struct {
syncutil.RWMutex
data map[string]struct{}
}{
data: make(map[string]struct{}),
}
cache := make(map[string]struct{})
watcher := NewLoopWatcher(
suite.ctx,
&suite.wg,
Expand All @@ -413,9 +408,7 @@ func (suite *loopWatcherTestSuite) TestLoadWithoutKey() {
"TestLoadWithoutKey",
func([]*clientv3.Event) error { return nil },
func(kv *mvccpb.KeyValue) error {
cache.Lock()
defer cache.Unlock()
cache.data[string(kv.Key)] = struct{}{}
cache[string(kv.Key)] = struct{}{}
return nil
},
func(kv *mvccpb.KeyValue) error { return nil },
Expand All @@ -424,9 +417,7 @@ func (suite *loopWatcherTestSuite) TestLoadWithoutKey() {
watcher.StartWatchLoop()
err := watcher.WaitLoad()
re.NoError(err) // although no key, watcher returns no error
cache.RLock()
defer cache.RUnlock()
re.Empty(cache.data)
re.Empty(cache)
}

func (suite *loopWatcherTestSuite) TestCallBack() {
Expand Down Expand Up @@ -499,12 +490,7 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() {
for i := 0; i < count; i++ {
suite.put(re, fmt.Sprintf("TestWatcherLoadLimit%d", i), "")
}
cache := struct {
syncutil.RWMutex
data []string
}{
data: make([]string, 0),
}
cache := make([]string, 0)
watcher := NewLoopWatcher(
ctx,
&suite.wg,
Expand All @@ -513,9 +499,7 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() {
"TestWatcherLoadLimit",
func([]*clientv3.Event) error { return nil },
func(kv *mvccpb.KeyValue) error {
cache.Lock()
defer cache.Unlock()
cache.data = append(cache.data, string(kv.Key))
cache = append(cache, string(kv.Key))
return nil
},
func(kv *mvccpb.KeyValue) error {
Expand All @@ -526,17 +510,51 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() {
},
clientv3.WithPrefix(),
)
watcher.SetLoadBatchSize(int64(limit))
watcher.StartWatchLoop()
err := watcher.WaitLoad()
re.NoError(err)
cache.RLock()
re.Len(cache.data, count)
cache.RUnlock()
re.Len(cache, count)
cancel()
}
}
}

func (suite *loopWatcherTestSuite) TestWatcherLoadLargeKey() {
re := suite.Require()
// use default limit to test 16384 key in etcd
count := 16384
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()
for i := 0; i < count; i++ {
suite.put(re, fmt.Sprintf("TestWatcherLoadLargeKey/test-%d", i), "")
}
cache := make([]string, 0)
watcher := NewLoopWatcher(
ctx,
&suite.wg,
suite.client,
"test",
"TestWatcherLoadLargeKey",
func([]*clientv3.Event) error { return nil },
func(kv *mvccpb.KeyValue) error {
cache = append(cache, string(kv.Key))
return nil
},
func(kv *mvccpb.KeyValue) error {
return nil
},
func([]*clientv3.Event) error {
return nil
},
clientv3.WithPrefix(),
)
watcher.StartWatchLoop()
err := watcher.WaitLoad()
re.NoError(err)
re.Len(cache, count)
}

func (suite *loopWatcherTestSuite) TestWatcherBreak() {
re := suite.Require()
cache := struct {
Expand Down
2 changes: 1 addition & 1 deletion server/keyspace_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques
putFn,
deleteFn,
postEventsFn,
clientv3.WithRange(clientv3.GetPrefixRangeEnd(startKey)),
clientv3.WithPrefix(),
)
watcher.StartWatchLoop()
if err := watcher.WaitLoad(); err != nil {
Expand Down
Loading