Skip to content

Commit

Permalink
etcdutil: detect timeout with multi point
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jun 21, 2023
1 parent ac31f87 commit 2ba4279
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 29 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
// Fix panic in unit test with go >= 1.14, ref: etcd-io/bbolt#201 https://github.com/etcd-io/bbolt/pull/201
go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/etcd/api/v3 v3.5.9
go.uber.org/atomic v1.10.0
go.uber.org/dig v1.9.0 // indirect
go.uber.org/fx v1.12.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,8 @@ go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793 h1:fqmtdYQlwZ/vKWSz5amW+a4cnjg23ojz5iL7rjf08Wg=
go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793/go.mod h1:eBhtbxXP1qpW0F6+WxoJ64DM1Mrfx46PHtVxEdkLe0I=
go.etcd.io/etcd/api/v3 v3.5.9 h1:4wSsluwyTbGGmyjJktOf3wFQoTBIURXHnq9n/G/JQHs=
go.etcd.io/etcd/api/v3 v3.5.9/go.mod h1:uyAal843mC8uUVSLWz6eHa/d971iDGnCRpmKd2Z+X8k=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0])
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(s.ctx, tlsConfig, []url.URL(u)[0])
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls[0])
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(s.ctx, tlsConfig, s.backendUrls[0])
return err
}

Expand Down
206 changes: 195 additions & 11 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,23 @@ package etcdutil
import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"net/http"
"net/url"
"sort"
"sync"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/mvcc/mvccpb"
Expand Down Expand Up @@ -197,8 +201,8 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value
}

// CreateClients creates etcd v3 client and http client.
func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) {
client, err := CreateEtcdClient(tlsConfig, acUrls)
func CreateClients(ctx context.Context, tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) {
client, err := CreateEtcdClient(ctx, tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
Expand Down Expand Up @@ -245,23 +249,203 @@ func createEtcdClientWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL)
return client, err
}

// CreateEtcdClient creates etcd v3 client.
// Note: it will be used by legacy pd-server, and only connect to leader only.
func CreateEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) {
var etcdStateGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "server",
Name: "etcd_client",
Help: "Etcd raft states.",
}, []string{"type"})

func init() {
prometheus.MustRegister(etcdStateGauge)
}

func newClient(tlsConfig *tls.Config, acURL string) (*clientv3.Client, error) {
lgc := zap.NewProductionConfig()
lgc.Encoding = log.ZapEncodingName
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{acURL.String()},
DialTimeout: defaultEtcdClientTimeout,
TLS: tlsConfig,
LogConfig: &lgc,
Endpoints: []string{acURL},
DialTimeout: defaultEtcdClientTimeout,
TLS: tlsConfig,
LogConfig: &lgc,
DialKeepAliveTime: defaultDialKeepAliveTime,
DialKeepAliveTimeout: defaultDialKeepAliveTimeout,
})
if err == nil {
log.Info("create etcd v3 client", zap.String("endpoints", acURL.String()))
return client, err
}

// CreateEtcdClient creates etcd v3 client.
// Note: it will be used by legacy pd-server, and only connect to leader only.
func CreateEtcdClient(ctx context.Context, tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) {
// TODO: avoid goroutine leak.
client, err := newClient(tlsConfig, acURL.String())
if err != nil {
return nil, err
}
checker := &healthyChecker{
tlsConfig: tlsConfig,
}
eps := syncUrls(ctx, client)
checker.update(eps)

// TODO: remove these goroutines after etcd client support balancer by custom.
go func(ctx context.Context, client *clientv3.Client) {
defer logutil.LogPanic()
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
healthyEps := checker.patrol(ctx)
if len(healthyEps) == 0 {
log.Error("[etcd client] no available endpoint")
} else {
usedEps := client.Endpoints()
if !isEqual(healthyEps, usedEps) {
client.SetEndpoints(healthyEps...)
change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps))
etcdStateGauge.WithLabelValues("ep").Set(float64(len(healthyEps)))
log.Info("[etcd client] update endpoints", zap.String("num-change", change),
zap.Strings("last-endpoints", usedEps), zap.Strings("endpoints", client.Endpoints()))
}
}
}
}
}(ctx, client)

// Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine.
go func(ctx context.Context, client *clientv3.Client) {
defer logutil.LogPanic()
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
eps := syncUrls(ctx, client)
checker.update(eps)
}
}
}(ctx, client)

return client, err
}

// offlineTimeout is the timeout for an unhealthy etcd endpoint to be offline from healthy checker.
const offlineTimeout = 30 * time.Minute

type healthyClient struct {
*clientv3.Client
lastHealth time.Time
}

type healthyChecker struct {
sync.Map // map[string]*healthyClient
tlsConfig *tls.Config
}

func (checker *healthyChecker) patrol(ctx context.Context) []string {
// See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105
var wg sync.WaitGroup
count := 0
checker.Range(func(key, value interface{}) bool {
count++
return true
})
hch := make(chan string, count)
healthyList := make([]string, 0, count)
checker.Range(func(key, value interface{}) bool {
wg.Add(1)
go func(key, value interface{}) {
defer wg.Done()
defer logutil.LogPanic()
ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), DefaultRequestTimeout)
defer cancel()
ep := key.(string)
client := value.(*healthyClient)
_, err := client.Get(ctx, "health")
// permission denied is OK since proposal goes through consensus to get it
if err == nil || err == rpctypes.ErrPermissionDenied {
hch <- ep
checker.Store(ep, &healthyClient{
Client: client.Client,
lastHealth: time.Now(),
})
return
}
}(key, value)
return true
})
wg.Wait()
close(hch)
for h := range hch {
healthyList = append(healthyList, h)
}
return healthyList
}

func (checker *healthyChecker) update(eps []string) {
for _, ep := range eps {
// check if client exists, if not, create one, if exists, check if it's offline
if client, ok := checker.Load(ep); ok {
if time.Since(client.(*healthyClient).lastHealth) > offlineTimeout {
log.Info("[etcd client] some endpoint maybe offline", zap.String("endpoint", ep))
checker.Delete(ep)
}
continue
}
client, err := newClient(checker.tlsConfig, ep)
if err != nil {
log.Error("[etcd client] failed to create etcd healthy client", zap.Error(err))
continue
}
checker.Store(ep, &healthyClient{
Client: client,
lastHealth: time.Now(),
})
}
}

func syncUrls(ctx context.Context, client *clientv3.Client) []string {
// See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170
ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), DefaultRequestTimeout)
defer cancel()
now := time.Now()
mresp, err := client.MemberList(ctx)
if err != nil {
log.Error("[etcd client] failed to list members", errs.ZapError(err))
return []string{}
}
if time.Since(now) > defaultEtcdClientTimeout {
log.Warn("[etcd client] sync etcd members slow", zap.Duration("cost", time.Since(now)), zap.Int("members", len(mresp.Members)))
}
var eps []string
for _, m := range mresp.Members {
if len(m.Name) != 0 && !m.IsLearner {
eps = append(eps, m.ClientURLs...)
}
}
return eps
}

func isEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
sort.Strings(a)
sort.Strings(b)
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}

func createHTTPClient(tlsConfig *tls.Config) *http.Client {
return &http.Client{
Transport: &http.Transport{
Expand Down
50 changes: 44 additions & 6 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,44 @@ func TestEtcdClientSync(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))
}

func TestEtcdClientSync2(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start a etcd server.
cfg1 := NewTestSingleConfig(t)
etcd1, err := embed.StartEtcd(cfg1)
defer func() {
etcd1.Close()
}()
re.NoError(err)

// Create a etcd client with etcd1 as endpoint.
ep1 := cfg1.LCUrls[0].String()
urls, err := types.NewURLs([]string{ep1})
re.NoError(err)
client1, err := CreateEtcdClient(ctx, nil, urls[0])
defer func() {
client1.Close()
}()
re.NoError(err)
<-etcd1.Server.ReadyNotify()
// Add a new member.
etcd2 := checkAddEtcdMember(t, cfg1, client1)
defer etcd2.Close()
checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2})
time.Sleep(5 * time.Second)
// Remove the first member and close the etcd1.
_, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
re.NoError(err)
etcd1.Close()
time.Sleep(5 * time.Second)
// Check the client can get the new member with the new endpoints.
checkMembers(re, client1, []*embed.Etcd{etcd2})
time.Sleep(5 * time.Second)
}

func TestEtcdWithHangLeaderEnableCheck(t *testing.T) {
re := require.New(t)
var err error
Expand Down Expand Up @@ -305,12 +343,12 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) {
// Create two etcd clients with etcd1 as endpoint.
urls, err := types.NewURLs([]string{ep1})
re.NoError(err)
client1, err := CreateEtcdClient(nil, urls[0]) // execute member change operation with this client
client1, err := CreateEtcdClient(context.Background(), nil, urls[0]) // execute member change operation with this client
defer func() {
client1.Close()
}()
re.NoError(err)
client2, err := CreateEtcdClient(nil, urls[0]) // check member change with this client
client2, err := CreateEtcdClient(context.Background(), nil, urls[0]) // check member change with this client
defer func() {
client2.Close()
}()
Expand Down Expand Up @@ -482,7 +520,7 @@ func (suite *loopWatcherTestSuite) SetupSuite() {
ep1 := suite.config.LCUrls[0].String()
urls, err := types.NewURLs([]string{ep1})
suite.NoError(err)
suite.client, err = CreateEtcdClient(nil, urls[0])
suite.client, err = CreateEtcdClient(context.Background(), nil, urls[0])
suite.NoError(err)
suite.cleans = append(suite.cleans, func() {
suite.client.Close()
Expand Down Expand Up @@ -685,23 +723,23 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() {

// Case2: close the etcd client and put a new value after watcher restarts
suite.client.Close()
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0])
suite.client, err = CreateEtcdClient(context.Background(), nil, suite.config.LCUrls[0])
suite.NoError(err)
watcher.updateClientCh <- suite.client
suite.put("TestWatcherBreak", "2")
checkCache("2")

// Case3: close the etcd client and put a new value before watcher restarts
suite.client.Close()
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0])
suite.client, err = CreateEtcdClient(context.Background(), nil, suite.config.LCUrls[0])
suite.NoError(err)
suite.put("TestWatcherBreak", "3")
watcher.updateClientCh <- suite.client
checkCache("3")

// Case4: close the etcd client and put a new value with compact
suite.client.Close()
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0])
suite.client, err = CreateEtcdClient(context.Background(), nil, suite.config.LCUrls[0])
suite.NoError(err)
suite.put("TestWatcherBreak", "4")
resp, err := EtcdKVGet(suite.client, "TestWatcherBreak")
Expand Down
Loading

0 comments on commit 2ba4279

Please sign in to comment.