Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

for pick #7

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 75 additions & 17 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand All @@ -30,6 +31,11 @@ import (
"go.uber.org/zap"
)

const (
watchLoopUnhealthyTimeout = 60 * time.Second
detectHealthyInterval = 10 * time.Second
)

// GetLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
leader := &pdpb.Member{}
Expand Down Expand Up @@ -182,26 +188,81 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
if ls == nil {
return
}

interval := detectHealthyInterval
timeout := watchLoopUnhealthyTimeout
failpoint.Inject("fastTick", func() {
timeout = 5 * time.Second
interval = 1 * time.Second
})
ticker := time.NewTicker(interval)
defer ticker.Stop()
lastHealthyTime := time.Now()

watcher := clientv3.NewWatcher(ls.client)
defer watcher.Close()
ctx, cancel := context.WithCancel(serverCtx)
defer cancel()
// The revision is the revision of last modification on this key.
// If the revision is compacted, will meet required revision has been compacted error.
// In this case, use the compact revision to re-watch the key.
var watchChanCancel *context.CancelFunc
defer func() {
if watchChanCancel != nil {
(*watchChanCancel)()
}
}()
for {
failpoint.Inject("delayWatcher", nil)
rch := watcher.Watch(ctx, ls.leaderKey, clientv3.WithRev(revision))
for wresp := range rch {
if watchChanCancel != nil {
(*watchChanCancel)()
}
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watchChanCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx))
watchChanCancel = &cancel

// When etcd is not available, the watcher.Watch will block,
// so we check the etcd availability first.
if !etcdutil.IsHealthy(serverCtx, ls.client) {
if time.Since(lastHealthyTime) > timeout {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
select {
case <-serverCtx.Done():
// server closed, return
return
case <-ticker.C:
// continue to check the etcd availability
continue
}
}

watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision))
WatchChan:
select {
case <-serverCtx.Done():
// server closed, return
return
case <-ticker.C:
if !etcdutil.IsHealthy(serverCtx, ls.client) {
if time.Since(lastHealthyTime) > watchLoopUnhealthyTimeout {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
goto WatchChan
}
case wresp := <-watchChan:
// meet compacted error, use the compact revision.
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
break
}
if wresp.Canceled {
continue
} else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("leadership watcher is canceled with",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
Expand All @@ -213,19 +274,16 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
for _, ev := range wresp.Events {
if ev.Type == mvccpb.DELETE {
log.Info("current leadership is deleted",
zap.Int64("revision", wresp.Header.Revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
}
revision = wresp.Header.Revision + 1
}

select {
case <-ctx.Done():
// server closed, return
return
default:
}
lastHealthyTime = time.Now()
goto WatchChan // use goto to avoid to create a new watchChan
}
}

Expand Down
86 changes: 86 additions & 0 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/testutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)
Expand Down Expand Up @@ -118,3 +120,87 @@ func TestLeadership(t *testing.T) {
re.NoError(lease1.Close())
re.NoError(lease2.Close())
}

func TestExitWatch(t *testing.T) {
re := require.New(t)
leaderKey := "/test_leader"
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/fastTick", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)"))
// Case1: close the client before the watch loop starts
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`))
client.Close()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher"))
})
// Case2: close the client when the watch loop is running
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
// Wait for the watch loop to start
time.Sleep(500 * time.Millisecond)
client.Close()
})
// Case3: delete the leader key
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
leaderKey := leaderKey
_, err := client.Delete(context.Background(), leaderKey)
re.NoError(err)
})
// Case4: close the server before the watch loop starts
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`))
server.Close()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher"))
})
// Case5: close the server when the watch loop is running
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
// Wait for the watch loop to start
time.Sleep(500 * time.Millisecond)
server.Close()
})
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
}

func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client)) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client1, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)
client2, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

<-etcd.Server.ReadyNotify()

leadership1 := NewLeadership(client1, leaderKey, "test_leader_1")
leadership2 := NewLeadership(client2, leaderKey, "test_leader_2")
err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1")
re.NoError(err)
resp, err := client2.Get(context.Background(), leaderKey)
re.NoError(err)
done := make(chan struct{})
go func() {
leadership2.Watch(context.Background(), resp.Header.Revision)
done <- struct{}{}
}()

injectFunc(etcd, client2)

testutil.Eventually(re, func() bool {
select {
case <-done:
return true
default:
return false
}
})
}
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0])
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u))
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls[0])
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls)
return err
}

Expand Down
Loading