Skip to content

Commit

Permalink
remove unnecessary ctx
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jun 25, 2023
1 parent 03cb666 commit 3154498
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 40 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(s.ctx, tlsConfig, []url.URL(u))
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u))
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(s.ctx, tlsConfig, s.backendUrls)
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls)
return err
}

Expand Down
28 changes: 12 additions & 16 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func newClient(tlsConfig *tls.Config, acURL ...string) (*clientv3.Client, error)
}

// CreateEtcdClient creates etcd v3 client with detecting endpoints.
func CreateEtcdClient(ctx context.Context, tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client, error) {
func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client, error) {
urls := make([]string, 0, len(acURLs))
for _, u := range acURLs {
urls = append(urls, u.String())
Expand All @@ -236,7 +236,7 @@ func CreateEtcdClient(ctx context.Context, tlsConfig *tls.Config, acURLs []url.U
checker := &healthyChecker{
tlsConfig: tlsConfig,
}
eps := syncUrls(ctx, client)
eps := syncUrls(client)
checker.update(eps)
tickerInterval := defaultDialKeepAliveTime
failpoint.Inject("fastTick", func() {
Expand All @@ -249,7 +249,7 @@ func CreateEtcdClient(ctx context.Context, tlsConfig *tls.Config, acURLs []url.U
return client, err
}

go func(ctx context.Context, client *clientv3.Client) {
go func(client *clientv3.Client) {
defer logutil.LogPanic()
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
Expand All @@ -259,11 +259,9 @@ func CreateEtcdClient(ctx context.Context, tlsConfig *tls.Config, acURLs []url.U
select {
case <-client.Ctx().Done():
return
case <-ctx.Done():
return
case <-ticker.C:
usedEps := client.Endpoints()
healthyEps := checker.patrol(ctx)
healthyEps := checker.patrol(client.Ctx())
if len(healthyEps) == 0 {
// when all endpoints are unhealthy, try to reset endpoints rather than delete them
// to avoid blocking there is no any endpoint in client.
Expand All @@ -285,25 +283,23 @@ func CreateEtcdClient(ctx context.Context, tlsConfig *tls.Config, acURLs []url.U
}
}
}
}(ctx, client)
}(client)

// Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine.
go func(ctx context.Context, client *clientv3.Client) {
go func(client *clientv3.Client) {
defer logutil.LogPanic()
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
for {
select {
case <-client.Ctx().Done():
return
case <-ctx.Done():
return
case <-ticker.C:
eps := syncUrls(ctx, client)
eps := syncUrls(client)
checker.update(eps)
}
}
}(ctx, client)
}(client)

return client, err
}
Expand Down Expand Up @@ -393,9 +389,9 @@ func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) {
})
}

func syncUrls(ctx context.Context, client *clientv3.Client) []string {
func syncUrls(client *clientv3.Client) []string {
// See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170
ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), DefaultRequestTimeout)
ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(client.Ctx()), DefaultRequestTimeout)
defer cancel()
now := time.Now()
mresp, err := client.MemberList(ctx)
Expand Down Expand Up @@ -430,8 +426,8 @@ func isEqual(a, b []string) bool {
}

// CreateClients creates etcd v3 client and http client.
func CreateClients(ctx context.Context, tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) {
client, err := CreateEtcdClient(ctx, tlsConfig, acUrls)
func CreateClients(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) {
client, err := CreateEtcdClient(tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
Expand Down
28 changes: 8 additions & 20 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,6 @@ func TestInitClusterID(t *testing.T) {
func TestEtcdClientSync(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)"))
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}

// Start a etcd server.
cfg1 := NewTestSingleConfig(t)
Expand All @@ -249,7 +247,7 @@ func TestEtcdClientSync(t *testing.T) {
ep1 := cfg1.LCUrls[0].String()
urls, err := types.NewURLs([]string{ep1})
re.NoError(err)
client1, err := CreateEtcdClient(ctx, nil, urls)
client1, err := CreateEtcdClient(nil, urls)
defer func() {
client1.Close()
}()
Expand All @@ -273,8 +271,6 @@ func TestEtcdClientSync(t *testing.T) {
return err == nil && len(listResp.Members) == 1 && listResp.Members[0].ID == uint64(etcd2.Server.ID())
})

cancel()
wg.Wait()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
}

Expand All @@ -296,8 +292,6 @@ func TestEtcdWithHangLeaderEnableCheck(t *testing.T) {

func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
// Start a etcd server.
cfg1 := NewTestSingleConfig(t)
etcd1, err := embed.StartEtcd(cfg1)
Expand All @@ -311,12 +305,12 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) {
// Create two etcd clients with etcd1 as endpoint.
urls, err := types.NewURLs([]string{ep1})
re.NoError(err)
client1, err := CreateEtcdClient(ctx, nil, urls) // execute member change operation with this client
client1, err := CreateEtcdClient(nil, urls) // execute member change operation with this client
defer func() {
client1.Close()
}()
re.NoError(err)
client2, err := CreateEtcdClient(ctx, nil, urls) // check member change with this client
client2, err := CreateEtcdClient(nil, urls) // check member change with this client
defer func() {
client2.Close()
}()
Expand All @@ -333,14 +327,10 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) {
_, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
re.NoError(err)
checkMembers(re, client2, []*embed.Etcd{etcd2})
cancel()
wg.Wait()
}

func checkEtcdWithHangLeader(t *testing.T) error {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
// Start a etcd server.
cfg1 := NewTestSingleConfig(t)
etcd1, err := embed.StartEtcd(cfg1)
Expand All @@ -359,7 +349,7 @@ func checkEtcdWithHangLeader(t *testing.T) error {
// Create a etcd client with etcd1 as endpoint.
urls, err := types.NewURLs([]string{proxyAddr})
re.NoError(err)
client1, err := CreateEtcdClient(ctx, nil, urls)
client1, err := CreateEtcdClient(nil, urls)
defer func() {
client1.Close()
}()
Expand All @@ -375,8 +365,6 @@ func checkEtcdWithHangLeader(t *testing.T) error {
enableDiscard.Store(true)
time.Sleep(defaultDialKeepAliveTime + defaultDialKeepAliveTimeout*2)
_, err = EtcdKVGet(client1, "test/key1")
cancel()
wg.Wait()
return err
}

Expand Down Expand Up @@ -494,7 +482,7 @@ func (suite *loopWatcherTestSuite) SetupSuite() {
ep1 := suite.config.LCUrls[0].String()
urls, err := types.NewURLs([]string{ep1})
suite.NoError(err)
suite.client, err = CreateEtcdClient(suite.ctx, nil, urls)
suite.client, err = CreateEtcdClient(nil, urls)
suite.NoError(err)
suite.cleans = append(suite.cleans, func() {
suite.client.Close()
Expand Down Expand Up @@ -697,23 +685,23 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() {

// Case2: close the etcd client and put a new value after watcher restarts
suite.client.Close()
suite.client, err = CreateEtcdClient(suite.ctx, nil, suite.config.LCUrls)
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls)
suite.NoError(err)
watcher.updateClientCh <- suite.client
suite.put("TestWatcherBreak", "2")
checkCache("2")

// Case3: close the etcd client and put a new value before watcher restarts
suite.client.Close()
suite.client, err = CreateEtcdClient(suite.ctx, nil, suite.config.LCUrls)
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls)
suite.NoError(err)
suite.put("TestWatcherBreak", "3")
watcher.updateClientCh <- suite.client
checkCache("3")

// Case4: close the etcd client and put a new value with compact
suite.client.Close()
suite.client, err = CreateEtcdClient(suite.ctx, nil, suite.config.LCUrls)
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls)
suite.NoError(err)
suite.put("TestWatcherBreak", "4")
resp, err := EtcdKVGet(suite.client, "TestWatcherBreak")
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (s *Server) startClient() (*clientv3.Client, *http.Client, error) {
if err != nil {
return nil, nil, err
}
return etcdutil.CreateClients(s.ctx, tlsConfig, etcdCfg.ACUrls)
return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls)
}

func (s *Server) startElectionClient() (*clientv3.Client, error) {
Expand All @@ -392,7 +392,7 @@ func (s *Server) startElectionClient() (*clientv3.Client, error) {
return nil, err
}

return etcdutil.CreateEtcdClient(s.ctx, tlsConfig, etcdCfg.ACUrls)
return etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls)
}

// AddStartCallback adds a callback in the startServer phase.
Expand Down

0 comments on commit 3154498

Please sign in to comment.