Skip to content

Commit

Permalink
for pick
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jul 5, 2023
1 parent ac31f87 commit 560f354
Show file tree
Hide file tree
Showing 8 changed files with 505 additions and 128 deletions.
78 changes: 61 additions & 17 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand All @@ -30,6 +31,11 @@ import (
"go.uber.org/zap"
)

const (
watchLoopUnhealthyTimeout = 60 * time.Second
detectHealthyInterval = 10 * time.Second
)

// GetLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
leader := &pdpb.Member{}
Expand Down Expand Up @@ -182,26 +188,68 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
if ls == nil {
return
}

interval := detectHealthyInterval
timeout := watchLoopUnhealthyTimeout
failpoint.Inject("fastTick", func() {
timeout = 5 * time.Second
interval = 1 * time.Second
})
ticker := time.NewTicker(interval)
defer ticker.Stop()
lastHealthyTime := time.Now()

watcher := clientv3.NewWatcher(ls.client)
defer watcher.Close()
ctx, cancel := context.WithCancel(serverCtx)
defer cancel()
// The revision is the revision of last modification on this key.
// If the revision is compacted, will meet required revision has been compacted error.
// In this case, use the compact revision to re-watch the key.
for {
failpoint.Inject("delayWatcher", nil)
rch := watcher.Watch(ctx, ls.leaderKey, clientv3.WithRev(revision))
for wresp := range rch {
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watchChanCtx, watchChanCancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx))
defer watchChanCancel()

// When etcd is not available, the watcher.Watch will block,
// so we check the etcd availability first.
if !etcdutil.IsHealthy(serverCtx, ls.client) {
if time.Since(lastHealthyTime) > timeout {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
select {
case <-serverCtx.Done():
// server closed, return
return
case <-ticker.C:
watchChanCancel()
continue
}
}

watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision))
lastHealthyTime = time.Now()
WatchChan:
select {
case <-serverCtx.Done():
// server closed, return
return
case <-ticker.C:
if !etcdutil.IsHealthy(serverCtx, ls.client) {
watchChanCancel()
continue
}
case wresp := <-watchChan:
// meet compacted error, use the compact revision.
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
break
}
if wresp.Canceled {
watchChanCancel()
continue
} else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("leadership watcher is canceled with",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
Expand All @@ -213,19 +261,15 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
for _, ev := range wresp.Events {
if ev.Type == mvccpb.DELETE {
log.Info("current leadership is deleted",
zap.Int64("revision", wresp.Header.Revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
}
revision = wresp.Header.Revision + 1
}

select {
case <-ctx.Done():
// server closed, return
return
default:
}
goto WatchChan // use goto to avoid to create a new watchChan
}
}

Expand Down
86 changes: 86 additions & 0 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/testutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)
Expand Down Expand Up @@ -118,3 +120,87 @@ func TestLeadership(t *testing.T) {
re.NoError(lease1.Close())
re.NoError(lease2.Close())
}

func TestExitWatch(t *testing.T) {
re := require.New(t)
leaderKey := "/test_leader"
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/fastTick", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)"))
// Case1: close the client before the watch loop starts
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`))
client.Close()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher"))
})
// Case2: close the client when the watch loop is running
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
// Wait for the watch loop to start
time.Sleep(500 * time.Millisecond)
client.Close()
})
// Case3: delete the leader key
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
leaderKey := leaderKey
_, err := client.Delete(context.Background(), leaderKey)
re.NoError(err)
})
// Case4: close the server before the watch loop starts
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`))
server.Close()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher"))
})
// Case5: close the server when the watch loop is running
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
// Wait for the watch loop to start
time.Sleep(500 * time.Millisecond)
server.Close()
})
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
}

func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client)) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client1, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)
client2, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

<-etcd.Server.ReadyNotify()

leadership1 := NewLeadership(client1, leaderKey, "test_leader_1")
leadership2 := NewLeadership(client2, leaderKey, "test_leader_2")
err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1")
re.NoError(err)
resp, err := client2.Get(context.Background(), leaderKey)
re.NoError(err)
done := make(chan struct{})
go func() {
leadership2.Watch(context.Background(), resp.Header.Revision)
done <- struct{}{}
}()

injectFunc(etcd, client2)

testutil.Eventually(re, func() bool {
select {
case <-done:
return true
default:
return false
}
})
}
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0])
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u))
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls[0])
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls)
return err
}

Expand Down
Loading

0 comments on commit 560f354

Please sign in to comment.