Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Aug 23, 2023
1 parent 120a937 commit 670bc4a
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 15 deletions.
12 changes: 6 additions & 6 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ import (
"sync/atomic"
"time"

"github.com/tikv/pd/client/backoff"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -159,7 +158,7 @@ type pdServiceDiscovery struct {
option *option

successReConnect chan struct{}
bo *backoff.Backoffer
bo *retry.Backoffer
}

// newPDServiceDiscovery returns a new PD service discovery-based client.
Expand All @@ -182,7 +181,7 @@ func newPDServiceDiscovery(
keyspaceID: keyspaceID,
tlsCfg: tlsCfg,
option: option,
bo: backoff.NewBackoffer(ctx, maxRetryTimes),
bo: retry.NewBackoffer(ctx, maxRetryTimes),
}
pdsd.urls.Store(urls)
return pdsd
Expand Down Expand Up @@ -470,10 +469,11 @@ func (c *pdServiceDiscovery) ScheduleCheckMemberChanged() {
select {
case c.checkMembershipCh <- struct{}{}:
if err := c.waitForReady(); err != nil {
if c.bo.GetBackoffTime(backoff.BoMemberUpdate.String()) >= 10 {
// If backoff times count is greater than 10, reset it.
if c.bo.GetBackoffTimeCnt(retry.BoMemberUpdate.String()) >= 10 {
c.bo.Reset()
}
e := c.bo.Backoff(backoff.BoMemberUpdate, err)
e := c.bo.Backoff(retry.BoMemberUpdate, err)
if e != nil {
log.Error("[pd] wait for ready backoff failed", errs.ZapError(e))
return
Expand Down
8 changes: 4 additions & 4 deletions client/backoff/backoff.go → client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package backoff
package retry

import (
"context"
Expand Down Expand Up @@ -51,7 +51,7 @@ func NewBackoffer(ctx context.Context, maxSleep int) *Backoffer {
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *Backoffer) Backoff(cfg *Config, err error) error {
if span := opentracing.SpanFromContext(b.ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(fmt.Sprintf("tikv.backoff.%s", cfg), opentracing.ChildOf(span.Context()))
span1 := span.Tracer().StartSpan(fmt.Sprintf("pd.client.backoff.%s", cfg), opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(b.ctx, span1)
}
Expand Down Expand Up @@ -125,8 +125,8 @@ func (b *Backoffer) GetBackoffTimes() map[string]int {
return b.backoffTimes
}

// GetBackoffTime returns backoff time count by specific type.
func (b *Backoffer) GetBackoffTime(s string) int {
// GetBackoffTimeCnt returns backoff time count by specific type.
func (b *Backoffer) GetBackoffTimeCnt(s string) int {
return b.backoffTimes[s]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package backoff
package retry

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion client/backoff/config.go → client/retry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package backoff
package retry

import (
"context"
Expand Down
12 changes: 9 additions & 3 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ func TestClientClusterIDCheck(t *testing.T) {
func TestClientLeaderChange(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval", `return(true)`))
defer failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval")
defer func() {
failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval")
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3)
Expand Down Expand Up @@ -315,7 +317,9 @@ func TestTSOFollowerProxy(t *testing.T) {
func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval", `return(true)`))
defer failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval")
defer func() {
failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval")
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3)
Expand Down Expand Up @@ -380,7 +384,9 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) {
func TestGlobalAndLocalTSO(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval", `return(true)`))
defer failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval")
defer func() {
failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval")
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dcLocationConfig := map[string]string{
Expand Down

0 comments on commit 670bc4a

Please sign in to comment.