Skip to content

Commit

Permalink
Merge branch 'master' into init-rule-manager
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Aug 31, 2023
2 parents 9296f96 + 39cff3b commit 99ebf41
Show file tree
Hide file tree
Showing 20 changed files with 266 additions and 49 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ linters-settings:
excludes:
- G402
- G404
- G601
18 changes: 11 additions & 7 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,18 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

const (
globalDCLocation = "global"
memberUpdateInterval = time.Minute
serviceModeUpdateInterval = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
globalDCLocation = "global"
memberUpdateInterval = time.Minute
serviceModeUpdateInterval = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
updateMemberBackOffBaseTime = 100 * time.Millisecond
)

type serviceType int
Expand Down Expand Up @@ -239,17 +241,19 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout)
for {
select {
case <-ctx.Done():
log.Info("[pd] exit member loop due to context canceled")
return
case <-ticker.C:
case <-c.checkMembershipCh:
}
failpoint.Inject("skipUpdateMember", func() {
failpoint.Continue()
})
if err := c.updateMember(); err != nil {
if err := bo.Exec(ctx, c.updateMember); err != nil {
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err))
}
}
Expand Down Expand Up @@ -319,7 +323,7 @@ func (c *pdServiceDiscovery) GetKeyspaceGroupID() uint32 {
return defaultKeySpaceGroupID
}

// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
// DiscoverMicroservice discovers the microservice with the specified type and returns the server urls.
func (c *pdServiceDiscovery) DiscoverMicroservice(svcType serviceType) (urls []string, err error) {
switch svcType {
case apiService:
Expand Down Expand Up @@ -386,7 +390,7 @@ func (c *pdServiceDiscovery) ScheduleCheckMemberChanged() {
}
}

// Immediately check if there is any membership change among the leader/followers in a
// CheckMemberChanged Immediately check if there is any membership change among the leader/followers in a
// quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster.
func (c *pdServiceDiscovery) CheckMemberChanged() error {
return c.updateMember()
Expand Down
5 changes: 5 additions & 0 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,16 @@ func (c *client) createTokenDispatcher() {
tokenBatchController: newTokenBatchController(
make(chan *tokenRequest, 1)),
}
c.wg.Add(1)
go c.handleResourceTokenDispatcher(dispatcherCtx, dispatcher.tokenBatchController)
c.tokenDispatcher = dispatcher
}

func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tbc *tokenBatchController) {
defer func() {
log.Info("[resource manager] exit resource token dispatcher")
c.wg.Done()
}()
var (
connection resourceManagerConnectionContext
firstRequest *tokenRequest
Expand Down
86 changes: 86 additions & 0 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"time"

"github.com/pingcap/failpoint"
)

// BackOffer is a backoff policy for retrying operations.
type BackOffer struct {
max time.Duration
next time.Duration
base time.Duration
}

// Exec is a helper function to exec backoff.
func (bo *BackOffer) Exec(
ctx context.Context,
fn func() error,
) error {
if err := fn(); err != nil {
select {
case <-ctx.Done():
case <-time.After(bo.nextInterval()):
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
})
}
return err
}
// reset backoff when fn() succeed.
bo.resetBackoff()
return nil
}

// InitialBackOffer make the initial state for retrying.
func InitialBackOffer(base, max time.Duration) BackOffer {
return BackOffer{
max: max,
base: base,
next: base,
}
}

// nextInterval for now use the `exponentialInterval`.
func (bo *BackOffer) nextInterval() time.Duration {
return bo.exponentialInterval()
}

// exponentialInterval returns the exponential backoff duration.
func (bo *BackOffer) exponentialInterval() time.Duration {
backoffInterval := bo.next
bo.next *= 2
if bo.next > bo.max {
bo.next = bo.max
}
return backoffInterval
}

// resetBackoff resets the backoff to initial state.
func (bo *BackOffer) resetBackoff() {
bo.next = bo.base
}

// Only used for test.
var testBackOffExecuteFlag = false

// TestBackOffExecute Only used for test.
func TestBackOffExecute() bool {
return testBackOffExecuteFlag
}
47 changes: 47 additions & 0 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/stretchr/testify/require"
)

func TestExponentialBackoff(t *testing.T) {
re := require.New(t)

baseBackoff := 100 * time.Millisecond
maxBackoff := 1 * time.Second

backoff := InitialBackOffer(baseBackoff, maxBackoff)
re.Equal(backoff.nextInterval(), baseBackoff)
re.Equal(backoff.nextInterval(), 2*baseBackoff)

for i := 0; i < 10; i++ {
re.LessOrEqual(backoff.nextInterval(), maxBackoff)
}
re.Equal(backoff.nextInterval(), maxBackoff)

// Reset backoff
backoff.resetBackoff()
err := backoff.Exec(context.Background(), func() error {
return errors.New("test")
})
re.Error(err)
}
4 changes: 3 additions & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/timerpool"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -389,6 +390,7 @@ func (c *tsoClient) handleDispatcher(
// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(c.option.timeout)
defer streamLoopTimer.Stop()
bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout)
tsoBatchLoop:
for {
select {
Expand Down Expand Up @@ -498,7 +500,7 @@ tsoBatchLoop:
stream = nil
// Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP.
if IsLeaderChange(err) {
if err := c.svcDiscovery.CheckMemberChanged(); err != nil {
if err := bo.Exec(dispatcherCtx, c.svcDiscovery.CheckMemberChanged); err != nil {
select {
case <-dispatcherCtx.Done():
return
Expand Down
2 changes: 1 addition & 1 deletion client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged() {
}
}

// Immediately check if there is any membership change among the primary/secondaries in
// CheckMemberChanged Immediately check if there is any membership change among the primary/secondaries in
// a primary/secondary configured cluster.
func (c *tsoServiceDiscovery) CheckMemberChanged() error {
c.apiSvcDiscovery.CheckMemberChanged()
Expand Down
8 changes: 2 additions & 6 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1660,15 +1660,11 @@ func DiffRegionKeyInfo(origin *RegionInfo, other *RegionInfo) string {
}

// String converts slice of bytes to string without copy.
func String(b []byte) (s string) {
func String(b []byte) string {
if len(b) == 0 {
return ""
}
pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
pstring.Data = pbytes.Data
pstring.Len = pbytes.Len
return
return unsafe.String(unsafe.SliceData(b), len(b))
}

// ToUpperASCIIInplace bytes.ToUpper but zero-cost
Expand Down
2 changes: 1 addition & 1 deletion pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (l *lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-c
}
res, err := l.lease.KeepAliveOnce(ctx1, leaseID)
if err != nil {
log.Warn("lease keep alive failed", zap.String("purpose", l.Purpose), errs.ZapError(err))
log.Warn("lease keep alive failed", zap.String("purpose", l.Purpose), zap.Time("start", start), errs.ZapError(err))
return
}
if res.TTL > 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/encryption/key_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"go.etcd.io/etcd/embed"
)

// #nosec G101
const (
testMasterKey = "8fd7e3e917c170d92f3e51a981dd7bc8fba11f3df7d8df994842f6e86f69b530"
testMasterKey2 = "8fd7e3e917c170d92f3e51a981dd7bc8fba11f3df7d8df994842f6e86f69b531"
Expand Down
6 changes: 3 additions & 3 deletions pkg/encryption/master_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestPlaintextMasterKey(t *testing.T) {
func TestEncrypt(t *testing.T) {
t.Parallel()
re := require.New(t)
keyHex := "2f07ec61e5a50284f47f2b402a962ec672e500b26cb3aa568bb1531300c74806"
keyHex := "2f07ec61e5a50284f47f2b402a962ec672e500b26cb3aa568bb1531300c74806" // #nosec G101
key, err := hex.DecodeString(keyHex)
re.NoError(err)
masterKey := &MasterKey{key: key}
Expand All @@ -68,7 +68,7 @@ func TestEncrypt(t *testing.T) {
func TestDecrypt(t *testing.T) {
t.Parallel()
re := require.New(t)
keyHex := "2f07ec61e5a50284f47f2b402a962ec672e500b26cb3aa568bb1531300c74806"
keyHex := "2f07ec61e5a50284f47f2b402a962ec672e500b26cb3aa568bb1531300c74806" // #nosec G101
key, err := hex.DecodeString(keyHex)
re.NoError(err)
plaintext := "this-is-a-plaintext"
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestNewFileMasterKeyLengthMismatch(t *testing.T) {
func TestNewFileMasterKey(t *testing.T) {
t.Parallel()
re := require.New(t)
key := "2f07ec61e5a50284f47f2b402a962ec672e500b26cb3aa568bb1531300c74806"
key := "2f07ec61e5a50284f47f2b402a962ec672e500b26cb3aa568bb1531300c74806" // #nosec G101
dir := t.TempDir()
path := dir + "/key"
os.WriteFile(path, []byte(key), 0600)
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func (oc *Controller) AddOperator(ops ...*Operator) bool {
// but maybe user want to add operator when waiting queue is busy
if oc.exceedStoreLimitLocked(ops...) {
for _, op := range ops {
operatorCounter.WithLabelValues(op.Desc(), "exceed-limit").Inc()
_ = op.Cancel(ExceedStoreLimit)
oc.buryOperator(op)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/schedule/schedulers/transfer_witness_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (s *trasferWitnessLeaderScheduler) Schedule(cluster sche.SchedulerCluster,

func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, typ string, cluster sche.SchedulerCluster, batchSize int) []*operator.Operator {
var ops []*operator.Operator
batchLoop:
for i := 0; i < batchSize; i++ {
select {
case region := <-s.regions:
Expand All @@ -92,7 +93,7 @@ func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeaderBatch(name,
ops = append(ops, op)
}
default:
break
break batchLoop
}
}
return ops
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/hot_region_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ type HistoryHotRegion struct {

// HotRegionStorageHandler help hot region storage get hot region info.
type HotRegionStorageHandler interface {
// PackHistoryHotWriteRegions get read hot region info in HistoryHotRegion form.
// PackHistoryHotReadRegions get read hot region info in HistoryHotRegion form.
PackHistoryHotReadRegions() ([]HistoryHotRegion, error)
// PackHistoryHotWriteRegions get write hot region info in HistoryHotRegion form.
PackHistoryHotWriteRegions() ([]HistoryHotRegion, error)
// IsLeader return true means this server is leader.
IsLeader() bool
// GetHotRegionWriteInterval gets interval for PD to store Hot Region information..
// GetHotRegionsWriteInterval gets interval for PD to store Hot Region information.
GetHotRegionsWriteInterval() time.Duration
// GetHotRegionsReservedDays gets days hot region information is kept.
GetHotRegionsReservedDays() uint64
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/apiutil/apiutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
// PDRedirectorHeader is used to mark which PD redirected this request.
PDRedirectorHeader = "PD-Redirector"
// PDAllowFollowerHandleHeader is used to mark whether this request is allowed to be handled by the follower PD.
PDAllowFollowerHandleHeader = "PD-Allow-follower-handle"
PDAllowFollowerHandleHeader = "PD-Allow-follower-handle" // #nosec G101
// XForwardedForHeader is used to mark the client IP.
XForwardedForHeader = "X-Forwarded-For"
// XForwardedPortHeader is used to mark the client port.
Expand Down
Loading

0 comments on commit 99ebf41

Please sign in to comment.