From 087bcab5901e7683217aaf0d628af85f1368837e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Sun, 25 Jun 2023 21:55:06 +0800 Subject: [PATCH] add test for leadership Signed-off-by: lhy1024 --- pkg/election/leadership.go | 36 +++++++++--- pkg/election/leadership_test.go | 85 +++++++++++++++++++++++++++++ pkg/utils/etcdutil/etcdutil.go | 6 +- pkg/utils/etcdutil/etcdutil_test.go | 2 +- 4 files changed, 120 insertions(+), 9 deletions(-) diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 6b174c72932..ca51066411a 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -31,7 +31,10 @@ import ( "go.uber.org/zap" ) -const watchLoopUnhealthyTimeout = 60 * time.Second +const ( + watchLoopUnhealthyTimeout = 60 * time.Second + detectHealthyInterval = 10 * time.Second +) // GetLeader gets the corresponding leader from etcd by given leaderPath (as the key). func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) { @@ -185,7 +188,17 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { if ls == nil { return } + + interval := detectHealthyInterval + timeout := watchLoopUnhealthyTimeout + failpoint.Inject("fastTick", func() { + timeout = 5 * time.Second + interval = 1 * time.Second + }) + ticker := time.NewTicker(interval) + defer ticker.Stop() lastHealthyTime := time.Now() + watcher := clientv3.NewWatcher(ls.client) defer watcher.Close() for { @@ -198,25 +211,34 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { // When etcd is not available, the watcher.Watch will block, // so we check the etcd availability first. if _, err := etcdutil.EtcdKVGet(ls.client, ls.leaderKey); err != nil { - if time.Since(lastHealthyTime) > watchLoopUnhealthyTimeout { + if time.Since(lastHealthyTime) > timeout { log.Error("the connect of leadership watcher is unhealthy", zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) return } - // If the watcher is unhealthy, we should cancel the watchChan and retry. - // Because the etcdutil.EtcdKVGet has a timeout, we don't need to sleep here. - watchChanCancel() - continue + select { + case <-serverCtx.Done(): + // server closed, return + return + case <-ticker.C: + watchChanCancel() + continue + } } + watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision)) lastHealthyTime = time.Now() - select { case <-serverCtx.Done(): // server closed, return return + case <-ticker.C: + if _, err := etcdutil.EtcdKVGet(ls.client, ls.leaderKey); err != nil { + watchChanCancel() + continue + } case wresp := <-watchChan: // meet compacted error, use the compact revision. if wresp.CompactRevision != 0 { diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index 63b25378518..1be5cb6e113 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -19,8 +19,10 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/testutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" ) @@ -118,3 +120,86 @@ func TestLeadership(t *testing.T) { re.NoError(lease1.Close()) re.NoError(lease2.Close()) } +func TestExitWatch(t *testing.T) { + re := require.New(t) + leaderKey := "/test_leader" + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/fastTick", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/lessTimeout", "return(true)")) + // Case1: close the client before the watch loop starts + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`)) + client.Close() + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher")) + }) + // Case2: close the client when the watch loop is running + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + // Wait for the watch loop to start + time.Sleep(500 * time.Millisecond) + client.Close() + }) + // Case3: delete the leader key + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + leaderKey := leaderKey + _, err := client.Delete(context.Background(), leaderKey) + re.NoError(err) + }) + // Case4: close the server before the watch loop starts + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`)) + server.Close() + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher")) + }) + // Case5: close the server when the watch loop is running + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + // Wait for the watch loop to start + time.Sleep(500 * time.Millisecond) + server.Close() + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/lessTimeout")) +} + +func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client)) { + re := require.New(t) + cfg := etcdutil.NewTestSingleConfig(t) + etcd, err := embed.StartEtcd(cfg) + defer func() { + etcd.Close() + }() + re.NoError(err) + + ep := cfg.LCUrls[0].String() + client1, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + re.NoError(err) + client2, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + re.NoError(err) + + <-etcd.Server.ReadyNotify() + + leadership1 := NewLeadership(client1, leaderKey, "test_leader_1") + leadership2 := NewLeadership(client2, leaderKey, "test_leader_2") + err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1") + re.NoError(err) + resp, err := client2.Get(context.Background(), leaderKey) + re.NoError(err) + done := make(chan struct{}) + go func() { + leadership2.Watch(context.Background(), resp.Header.Revision) + done <- struct{}{} + }() + + injectFunc(etcd, client2) + + testutil.Eventually(re, func() bool { + select { + case <-done: + return true + default: + return false + } + }) +} diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 28079ee20d3..15ca1786e03 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -127,7 +127,11 @@ func RemoveEtcdMember(client *clientv3.Client, id uint64) (*clientv3.MemberRemov // EtcdKVGet returns the etcd GetResponse by given key or key prefix func EtcdKVGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { - ctx, cancel := context.WithTimeout(c.Ctx(), DefaultRequestTimeout) + timeout := DefaultRequestTimeout + failpoint.Inject("lessTimeout", func() { + timeout = 1 * time.Second + }) + ctx, cancel := context.WithTimeout(c.Ctx(), timeout) defer cancel() start := time.Now() diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 25eed38e004..4caa0f46d1a 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -290,7 +290,7 @@ func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick")) } -func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { +func TestEtcdScaleInAndOut(t *testing.T) { re := require.New(t) // Start a etcd server. cfg1 := NewTestSingleConfig(t)