From 96fd5bda1b8814af6c2a82789be80aecc8353bd1 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 23 Aug 2023 14:52:38 +0800 Subject: [PATCH] add more test Signed-off-by: lhy1024 --- pkg/election/leadership.go | 6 +++-- pkg/election/leadership_test.go | 41 +++++++++++++++++++++++++++++++++ pkg/utils/etcdutil/etcdutil.go | 6 +++-- 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 90719678a64..255037fe26d 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -265,11 +265,13 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { continue } // We need to request progress to etcd to prevent etcd hold the watchChan, - // note: we must use the same ctx with watcher. - if err := watcher.RequestProgress(watcherCtx); err != nil { + // note: the ctx must be from watcherCtx, otherwise, the RequestProgress request cannot be sent properly. + ctx, cancel := context.WithTimeout(watcherCtx, etcdutil.DefaultDialTimeout) + if err := watcher.RequestProgress(ctx); err != nil { log.Warn("failed to request progress in leader watch loop", zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose), zap.Error(err)) } + cancel() // If no message comes from an etcd watchChan for WatchChTimeoutDuration, // create a new one and need not to reset lastReceivedResponseTime. if time.Since(lastReceivedResponseTime) >= etcdutil.WatchChTimeoutDuration { diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index d481cc79326..fd35c334f74 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -190,6 +190,47 @@ func TestExitWatch(t *testing.T) { etcd2.Close() } }) + // Case7: loss the quorum when the watch loop is running + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) func() { + tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests") + defer os.Remove(tempStdoutFile.Name()) + logCfg := &log.Config{} + logCfg.File.Filename = tempStdoutFile.Name() + logCfg.Level = "info" + lg, p, _ := log.InitLogger(logCfg) + log.ReplaceGlobals(lg, p) + + cfg1 := server.Config() + cfg2 := etcdutil.NewTestSingleConfig(t) + cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) + cfg2.ClusterState = embed.ClusterStateFlagExisting + peerURL := cfg2.LPUrls[0].String() + addResp, err := etcdutil.AddEtcdMember(client, []string{peerURL}) + re.NoError(err) + etcd2, err := embed.StartEtcd(cfg2) + re.NoError(err) + re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) + <-etcd2.Server.ReadyNotify() + + cfg3 := etcdutil.NewTestSingleConfig(t) + cfg3.InitialCluster = cfg2.InitialCluster + fmt.Sprintf(",%s=%s", cfg3.Name, &cfg3.LPUrls[0]) + cfg3.ClusterState = embed.ClusterStateFlagExisting + peerURL = cfg3.LPUrls[0].String() + addResp, err = etcdutil.AddEtcdMember(client, []string{peerURL}) + re.NoError(err) + etcd3, err := embed.StartEtcd(cfg3) + re.NoError(err) + re.Equal(uint64(etcd3.Server.ID()), addResp.Member.ID) + <-etcd3.Server.ReadyNotify() + + resp2, err := client.MemberList(context.Background()) + re.NoError(err) + re.Equal(3, len(resp2.Members)) + + etcd2.Server.HardStop() + etcd3.Server.HardStop() + return func() {} + }) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) } diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index e8c0b902316..fcfd2d459ff 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -726,11 +726,13 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision continue case <-ticker.C: // We need to request progress to etcd to prevent etcd hold the watchChan, - // note: we need to use the same ctx with watcher. - if err := watcher.RequestProgress(watcherCtx); err != nil { + // note: the ctx must be from watcherCtx, otherwise, the RequestProgress request cannot be sent properly. + ctx, cancel := context.WithTimeout(watcherCtx, DefaultDialTimeout) + if err := watcher.RequestProgress(ctx); err != nil { log.Warn("failed to request progress in leader watch loop", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err)) } + cancel() // If no message comes from an etcd watchChan for WatchChTimeoutDuration, // create a new one and need not to reset lastReceivedResponseTime. if time.Since(lastReceivedResponseTime) >= WatchChTimeoutDuration {