Skip to content

Commit

Permalink
add more test
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Aug 23, 2023
1 parent 51584a9 commit 96fd5bd
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 4 deletions.
6 changes: 4 additions & 2 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,13 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
continue
}
// We need to request progress to etcd to prevent etcd hold the watchChan,
// note: we must use the same ctx with watcher.
if err := watcher.RequestProgress(watcherCtx); err != nil {
// note: the ctx must be from watcherCtx, otherwise, the RequestProgress request cannot be sent properly.
ctx, cancel := context.WithTimeout(watcherCtx, etcdutil.DefaultDialTimeout)
if err := watcher.RequestProgress(ctx); err != nil {
log.Warn("failed to request progress in leader watch loop",
zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose), zap.Error(err))
}
cancel()
// If no message comes from an etcd watchChan for WatchChTimeoutDuration,
// create a new one and need not to reset lastReceivedResponseTime.
if time.Since(lastReceivedResponseTime) >= etcdutil.WatchChTimeoutDuration {
Expand Down
41 changes: 41 additions & 0 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,47 @@ func TestExitWatch(t *testing.T) {
etcd2.Close()
}
})
// Case7: loss the quorum when the watch loop is running
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) func() {
tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests")
defer os.Remove(tempStdoutFile.Name())
logCfg := &log.Config{}
logCfg.File.Filename = tempStdoutFile.Name()
logCfg.Level = "info"
lg, p, _ := log.InitLogger(logCfg)
log.ReplaceGlobals(lg, p)

cfg1 := server.Config()
cfg2 := etcdutil.NewTestSingleConfig(t)
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting
peerURL := cfg2.LPUrls[0].String()
addResp, err := etcdutil.AddEtcdMember(client, []string{peerURL})
re.NoError(err)
etcd2, err := embed.StartEtcd(cfg2)
re.NoError(err)
re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID)
<-etcd2.Server.ReadyNotify()

cfg3 := etcdutil.NewTestSingleConfig(t)
cfg3.InitialCluster = cfg2.InitialCluster + fmt.Sprintf(",%s=%s", cfg3.Name, &cfg3.LPUrls[0])
cfg3.ClusterState = embed.ClusterStateFlagExisting
peerURL = cfg3.LPUrls[0].String()
addResp, err = etcdutil.AddEtcdMember(client, []string{peerURL})
re.NoError(err)
etcd3, err := embed.StartEtcd(cfg3)
re.NoError(err)
re.Equal(uint64(etcd3.Server.ID()), addResp.Member.ID)
<-etcd3.Server.ReadyNotify()

resp2, err := client.MemberList(context.Background())
re.NoError(err)
re.Equal(3, len(resp2.Members))

etcd2.Server.HardStop()
etcd3.Server.HardStop()
return func() {}
})
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,11 +726,13 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
continue
case <-ticker.C:
// We need to request progress to etcd to prevent etcd hold the watchChan,
// note: we need to use the same ctx with watcher.
if err := watcher.RequestProgress(watcherCtx); err != nil {
// note: the ctx must be from watcherCtx, otherwise, the RequestProgress request cannot be sent properly.
ctx, cancel := context.WithTimeout(watcherCtx, DefaultDialTimeout)
if err := watcher.RequestProgress(ctx); err != nil {
log.Warn("failed to request progress in leader watch loop",
zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err))
}
cancel()
// If no message comes from an etcd watchChan for WatchChTimeoutDuration,
// create a new one and need not to reset lastReceivedResponseTime.
if time.Since(lastReceivedResponseTime) >= WatchChTimeoutDuration {
Expand Down

0 comments on commit 96fd5bd

Please sign in to comment.