Skip to content

Commit

Permalink
add etcd unit test
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jun 26, 2023
1 parent 087bcab commit 12d592e
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 48 deletions.
4 changes: 2 additions & 2 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {

// When etcd is not available, the watcher.Watch will block,
// so we check the etcd availability first.
if _, err := etcdutil.EtcdKVGet(ls.client, ls.leaderKey); err != nil {
if !etcdutil.IsHealthy(serverCtx, ls.client) {
if time.Since(lastHealthyTime) > timeout {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
Expand All @@ -235,7 +235,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
// server closed, return
return
case <-ticker.C:
if _, err := etcdutil.EtcdKVGet(ls.client, ls.leaderKey); err != nil {
if !etcdutil.IsHealthy(serverCtx, ls.client) {
watchChanCancel()
continue
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func TestExitWatch(t *testing.T) {
re := require.New(t)
leaderKey := "/test_leader"
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/fastTick", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/lessTimeout", "return(true)"))
// Case1: close the client before the watch loop starts
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`))
Expand Down Expand Up @@ -156,7 +155,6 @@ func TestExitWatch(t *testing.T) {
server.Close()
})
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/lessTimeout"))
}

func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client)) {
Expand Down
26 changes: 16 additions & 10 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,7 @@ func RemoveEtcdMember(client *clientv3.Client, id uint64) (*clientv3.MemberRemov

// EtcdKVGet returns the etcd GetResponse by given key or key prefix
func EtcdKVGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
timeout := DefaultRequestTimeout
failpoint.Inject("lessTimeout", func() {
timeout = 1 * time.Second
})
ctx, cancel := context.WithTimeout(c.Ctx(), timeout)
ctx, cancel := context.WithTimeout(c.Ctx(), DefaultRequestTimeout)
defer cancel()

start := time.Now()
Expand All @@ -148,6 +144,20 @@ func EtcdKVGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clie
return resp, nil
}

// IsHealthy checks if the etcd is healthy.
func IsHealthy(ctx context.Context, client *clientv3.Client) bool {
timeout := DefaultRequestTimeout
failpoint.Inject("fastTick", func() {
timeout = 100 * time.Millisecond
})
ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), timeout)
defer cancel()
_, err := client.Get(ctx, "health")
// permission denied is OK since proposal goes through consensus to get it
// See: https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L124
return err == nil || err == rpctypes.ErrPermissionDenied
}

// GetValue gets value with key from etcd.
func GetValue(c *clientv3.Client, key string, opts ...clientv3.OpOption) ([]byte, error) {
resp, err := get(c, key, opts...)
Expand Down Expand Up @@ -328,13 +338,9 @@ func (checker *healthyChecker) patrol(ctx context.Context) []string {
go func(key, value interface{}) {
defer wg.Done()
defer logutil.LogPanic()
ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), DefaultRequestTimeout)
defer cancel()
ep := key.(string)
client := value.(*healthyClient)
_, err := client.Get(ctx, "health")
// permission denied is OK since proposal goes through consensus to get it
if err == nil || err == rpctypes.ErrPermissionDenied {
if IsHealthy(ctx, client.Client) {
hch <- ep
checker.Store(ep, &healthyClient{
Client: client.Client,
Expand Down
113 changes: 79 additions & 34 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"crypto/tls"
"fmt"
"io"
"math/rand"
"net"
"strings"
"sync"
Expand Down Expand Up @@ -244,10 +245,7 @@ func TestEtcdClientSync(t *testing.T) {
re.NoError(err)

// Create a etcd client with etcd1 as endpoint.
ep1 := cfg1.LCUrls[0].String()
urls, err := types.NewURLs([]string{ep1})
re.NoError(err)
client1, err := CreateEtcdClient(nil, urls)
client1, err := CreateEtcdClient(nil, cfg1.LCUrls)
defer func() {
client1.Close()
}()
Expand All @@ -258,7 +256,10 @@ func TestEtcdClientSync(t *testing.T) {
etcd2 := checkAddEtcdMember(t, cfg1, client1)
defer etcd2.Close()
checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2})
time.Sleep(200 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2
testutil.Eventually(re, func() bool {
// wait for etcd client sync endpoints
return len(client1.Endpoints()) == 2
})

// Remove the first member and close the etcd1.
_, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
Expand All @@ -267,29 +268,13 @@ func TestEtcdClientSync(t *testing.T) {

// Check the client can get the new member with the new endpoints.
testutil.Eventually(re, func() bool {
listResp, err := ListEtcdMembers(client1)
return err == nil && len(listResp.Members) == 1 && listResp.Members[0].ID == uint64(etcd2.Server.ID())
// wait for etcd client sync endpoints
return len(client1.Endpoints()) == 1
})

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
}

func TestEtcdWithHangLeaderEnableCheck(t *testing.T) {
re := require.New(t)
var err error
// Test with enable check.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)"))
err = checkEtcdWithHangLeader(t)
re.NoError(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))

// Test with disable check.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick", "return(true)"))
err = checkEtcdWithHangLeader(t)
re.Error(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick"))
}

func TestEtcdScaleInAndOut(t *testing.T) {
re := require.New(t)
// Start a etcd server.
Expand All @@ -299,18 +284,15 @@ func TestEtcdScaleInAndOut(t *testing.T) {
etcd1.Close()
}()
re.NoError(err)
ep1 := cfg1.LCUrls[0].String()
<-etcd1.Server.ReadyNotify()

// Create two etcd clients with etcd1 as endpoint.
urls, err := types.NewURLs([]string{ep1})
re.NoError(err)
client1, err := CreateEtcdClient(nil, urls) // execute member change operation with this client
client1, err := CreateEtcdClient(nil, cfg1.LCUrls) // execute member change operation with this client
defer func() {
client1.Close()
}()
re.NoError(err)
client2, err := CreateEtcdClient(nil, urls) // check member change with this client
client2, err := CreateEtcdClient(nil, cfg1.LCUrls) // check member change with this client
defer func() {
client2.Close()
}()
Expand All @@ -329,6 +311,71 @@ func TestEtcdScaleInAndOut(t *testing.T) {
checkMembers(re, client2, []*embed.Etcd{etcd2})
}

func TestRandomKillEtcd(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)"))
// Start a etcd server.
cfg1 := NewTestSingleConfig(t)
etcd1, err := embed.StartEtcd(cfg1)
re.NoError(err)
<-etcd1.Server.ReadyNotify()
client1, err := CreateEtcdClient(nil, cfg1.LCUrls)
re.NoError(err)
defer func() {
client1.Close()
}()

etcd2 := checkAddEtcdMember(t, cfg1, client1)
cfg2 := etcd2.Config()
<-etcd2.Server.ReadyNotify()

etcd3 := checkAddEtcdMember(t, &cfg2, client1)
<-etcd3.Server.ReadyNotify()

time.Sleep(1 * time.Second)
re.Len(client1.Endpoints(), 3)

// Randomly kill an etcd server and restart it
etcds := []*embed.Etcd{etcd1, etcd2, etcd3}
cfgs := []embed.Config{etcd1.Config(), etcd2.Config(), etcd3.Config()}
for i := 0; i < 10; i++ {
killIndex := rand.Intn(len(etcds))
etcds[killIndex].Close()
testutil.Eventually(re, func() bool {
return IsHealthy(context.Background(), client1)
})
etcd, err := embed.StartEtcd(&cfgs[killIndex])
re.NoError(err)
<-etcd.Server.ReadyNotify()
etcds[killIndex] = etcd
testutil.Eventually(re, func() bool {
return IsHealthy(context.Background(), client1)
})
}
for _, etcd := range etcds {
if etcd != nil {
etcd.Close()
}
}
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
}

func TestEtcdWithHangLeaderEnableCheck(t *testing.T) {
re := require.New(t)
var err error
// Test with enable check.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)"))
err = checkEtcdWithHangLeader(t)
re.NoError(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))

// Test with disable check.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick", "return(true)"))
err = checkEtcdWithHangLeader(t)
re.Error(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick"))
}

func checkEtcdWithHangLeader(t *testing.T) error {
re := require.New(t)
// Start a etcd server.
Expand All @@ -355,15 +402,15 @@ func checkEtcdWithHangLeader(t *testing.T) error {
}()
re.NoError(err)

// Add a new member and set the client endpoints to etcd1 and etcd2.
// Add a new member
etcd2 := checkAddEtcdMember(t, cfg1, client1)
defer etcd2.Close()
checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2})
time.Sleep(1 * time.Second) // wait for etcd client sync endpoints

// Hang the etcd1 and wait for the client to connect to etcd2.
enableDiscard.Store(true)
time.Sleep(defaultDialKeepAliveTime + defaultDialKeepAliveTimeout*2)
time.Sleep(time.Second)
_, err = EtcdKVGet(client1, "test/key1")
return err
}
Expand Down Expand Up @@ -473,16 +520,14 @@ func TestLoopWatcherTestSuite(t *testing.T) {
}

func (suite *loopWatcherTestSuite) SetupSuite() {
var err error
t := suite.T()
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cleans = make([]func(), 0)
// Start a etcd server and create a client with etcd1 as endpoint.
suite.config = NewTestSingleConfig(t)
suite.startEtcd()
ep1 := suite.config.LCUrls[0].String()
urls, err := types.NewURLs([]string{ep1})
suite.NoError(err)
suite.client, err = CreateEtcdClient(nil, urls)
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls)
suite.NoError(err)
suite.cleans = append(suite.cleans, func() {
suite.client.Close()
Expand Down

0 comments on commit 12d592e

Please sign in to comment.