Skip to content

Commit

Permalink
add test for leadership
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jun 25, 2023
1 parent 5ec12cc commit 087bcab
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 9 deletions.
36 changes: 29 additions & 7 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import (
"go.uber.org/zap"
)

const watchLoopUnhealthyTimeout = 60 * time.Second
const (
watchLoopUnhealthyTimeout = 60 * time.Second
detectHealthyInterval = 10 * time.Second
)

// GetLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
Expand Down Expand Up @@ -185,7 +188,17 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
if ls == nil {
return
}

interval := detectHealthyInterval
timeout := watchLoopUnhealthyTimeout
failpoint.Inject("fastTick", func() {
timeout = 5 * time.Second
interval = 1 * time.Second
})
ticker := time.NewTicker(interval)
defer ticker.Stop()
lastHealthyTime := time.Now()

watcher := clientv3.NewWatcher(ls.client)
defer watcher.Close()
for {
Expand All @@ -198,25 +211,34 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
// When etcd is not available, the watcher.Watch will block,
// so we check the etcd availability first.
if _, err := etcdutil.EtcdKVGet(ls.client, ls.leaderKey); err != nil {
if time.Since(lastHealthyTime) > watchLoopUnhealthyTimeout {
if time.Since(lastHealthyTime) > timeout {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
// If the watcher is unhealthy, we should cancel the watchChan and retry.
// Because the etcdutil.EtcdKVGet has a timeout, we don't need to sleep here.
watchChanCancel()
continue
select {
case <-serverCtx.Done():
// server closed, return
return
case <-ticker.C:
watchChanCancel()
continue
}
}

watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision))
lastHealthyTime = time.Now()

select {
case <-serverCtx.Done():
// server closed, return
return
case <-ticker.C:
if _, err := etcdutil.EtcdKVGet(ls.client, ls.leaderKey); err != nil {
watchChanCancel()
continue
}
case wresp := <-watchChan:
// meet compacted error, use the compact revision.
if wresp.CompactRevision != 0 {
Expand Down
85 changes: 85 additions & 0 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/testutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)
Expand Down Expand Up @@ -118,3 +120,86 @@ func TestLeadership(t *testing.T) {
re.NoError(lease1.Close())
re.NoError(lease2.Close())
}
func TestExitWatch(t *testing.T) {
re := require.New(t)
leaderKey := "/test_leader"
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/fastTick", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/lessTimeout", "return(true)"))
// Case1: close the client before the watch loop starts
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`))
client.Close()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher"))
})
// Case2: close the client when the watch loop is running
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
// Wait for the watch loop to start
time.Sleep(500 * time.Millisecond)
client.Close()
})
// Case3: delete the leader key
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
leaderKey := leaderKey
_, err := client.Delete(context.Background(), leaderKey)
re.NoError(err)
})
// Case4: close the server before the watch loop starts
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`))
server.Close()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher"))
})
// Case5: close the server when the watch loop is running
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
// Wait for the watch loop to start
time.Sleep(500 * time.Millisecond)
server.Close()
})
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/lessTimeout"))
}

func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client)) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client1, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)
client2, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

<-etcd.Server.ReadyNotify()

leadership1 := NewLeadership(client1, leaderKey, "test_leader_1")
leadership2 := NewLeadership(client2, leaderKey, "test_leader_2")
err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1")
re.NoError(err)
resp, err := client2.Get(context.Background(), leaderKey)
re.NoError(err)
done := make(chan struct{})
go func() {
leadership2.Watch(context.Background(), resp.Header.Revision)
done <- struct{}{}
}()

injectFunc(etcd, client2)

testutil.Eventually(re, func() bool {
select {
case <-done:
return true
default:
return false
}
})
}
6 changes: 5 additions & 1 deletion pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ func RemoveEtcdMember(client *clientv3.Client, id uint64) (*clientv3.MemberRemov

// EtcdKVGet returns the etcd GetResponse by given key or key prefix
func EtcdKVGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
ctx, cancel := context.WithTimeout(c.Ctx(), DefaultRequestTimeout)
timeout := DefaultRequestTimeout
failpoint.Inject("lessTimeout", func() {
timeout = 1 * time.Second
})
ctx, cancel := context.WithTimeout(c.Ctx(), timeout)
defer cancel()

start := time.Now()
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestEtcdWithHangLeaderEnableCheck(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick"))
}

func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) {
func TestEtcdScaleInAndOut(t *testing.T) {
re := require.New(t)
// Start a etcd server.
cfg1 := NewTestSingleConfig(t)
Expand Down

0 comments on commit 087bcab

Please sign in to comment.