Skip to content

Commit

Permalink
replace
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Apr 2, 2024
1 parent 46385af commit 7602994
Show file tree
Hide file tree
Showing 21 changed files with 630 additions and 206 deletions.
22 changes: 16 additions & 6 deletions .github/workflows/pd-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,19 @@ jobs:
strategy:
fail-fast: true
matrix:
worker_id: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
include:
- worker_id: 1
name: 'Unit Test'
- worker_id: 2
name: 'Tools Test'
- worker_id: 3
name: 'Client Integration Test'
- worker_id: 4
name: 'TSO Integration Test'
- worker_id: 5
name: 'MicroService Integration Test'
outputs:
job-total: 13
job-total: 5
steps:
- uses: actions/setup-go@v3
with:
Expand All @@ -42,12 +52,12 @@ jobs:
~/.cache/go-build
**/.tools
**/.dashboard_download_cache
key: ${{ runner.os }}-go-${{ matrix.worker_id }}-${{ hashFiles('**/go.sum') }}
- name: Make Test
key: ${{ runner.os }}-go-${{ matrix.name }}-${{ hashFiles('**/go.sum') }}
- name: ${{ matrix.name }}
env:
WORKER_ID: ${{ matrix.worker_id }}
WORKER_COUNT: 13
JOB_COUNT: 9 # 10 is tools test, 11, 12, 13 are for other integrations jobs
WORKER_COUNT: 5
JOB_COUNT: 5
run: |
make ci-test-job JOB_COUNT=$(($JOB_COUNT)) JOB_INDEX=$WORKER_ID
mv covprofile covprofile_$WORKER_ID
Expand Down
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ regions-dump:
stores-dump:
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/stores-dump stores-dump/main.go
pd-ut: pd-xprog
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-ut pd-ut/ut.go
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-ut pd-ut/ut.go pd-ut/coverProfile.go
pd-xprog:
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -tags xprog -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/xprog pd-ut/xprog.go

Expand Down Expand Up @@ -254,9 +254,10 @@ basic-test: install-tools
go test $(BASIC_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

ci-test-job: install-tools dashboard-ui
ci-test-job: install-tools dashboard-ui pd-ut
@$(FAILPOINT_ENABLE)
./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX) || { $(FAILPOINT_DISABLE); exit 1; }
@$(CLEAN_UT_BINARY)
@$(FAILPOINT_DISABLE)

TSO_INTEGRATION_TEST_PKGS := $(PD_PKG)/tests/server/tso
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.59.0
Expand All @@ -33,7 +34,6 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
21 changes: 16 additions & 5 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.uber.org/multierr"
)

const maxRecordErrorCount = 20

// Backoffer is a backoff policy for retrying operations.
type Backoffer struct {
// base defines the initial time interval to wait before each retry.
Expand All @@ -34,6 +37,7 @@ type Backoffer struct {
// By default, all errors are retryable.
retryableChecker func(err error) bool

attempt int
next time.Duration
currentTotal time.Duration
}
Expand All @@ -45,11 +49,16 @@ func (bo *Backoffer) Exec(
) error {
defer bo.resetBackoff()
var (
err error
after *time.Timer
allErrors error
after *time.Timer
)
for {
err = fn()
err := fn()
bo.attempt++
if bo.attempt < maxRecordErrorCount {
// multierr.Append will ignore nil error.
allErrors = multierr.Append(allErrors, err)
}
if !bo.isRetryable(err) {
break
}
Expand All @@ -62,7 +71,7 @@ func (bo *Backoffer) Exec(
select {
case <-ctx.Done():
after.Stop()
return errors.Trace(ctx.Err())
return multierr.Append(allErrors, errors.Trace(ctx.Err()))
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
Expand All @@ -77,7 +86,7 @@ func (bo *Backoffer) Exec(
}
}
}
return err
return allErrors
}

// InitialBackoffer make the initial state for retrying.
Expand All @@ -102,6 +111,7 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer {
},
next: base,
currentTotal: 0,
attempt: 0,
}
}

Expand Down Expand Up @@ -141,6 +151,7 @@ func (bo *Backoffer) exponentialInterval() time.Duration {
func (bo *Backoffer) resetBackoff() {
bo.next = bo.base
bo.currentTotal = 0
bo.attempt = 0
}

// Only used for test.
Expand Down
1 change: 1 addition & 0 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestBackoffer(t *testing.T) {
return expectedErr
})
re.InDelta(total, time.Since(start), float64(250*time.Millisecond))
re.ErrorContains(err, "test; test; test; test")
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
Expand Down
2 changes: 1 addition & 1 deletion client/testutil/tempurl.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func Alloc() string {
}

func tryAllocTestURL() string {
l, err := net.Listen("tcp", "127.0.0.1:0")
l, err := net.Listen("tcp", "127.0.0.1:")
if err != nil {
log.Fatal("listen failed", zap.Error(err))
}
Expand Down
101 changes: 80 additions & 21 deletions pkg/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ import (
"github.com/tikv/pd/pkg/utils/syncutil"
)

// speedStatisticalWindow is the speed calculation window
const speedStatisticalWindow = 10 * time.Minute
const (
// maxSpeedCalculationWindow is the maximum size of the time window used to calculate the speed,
// but it does not mean that all data in it will be used to calculate the speed,
// which data is used depends on the patrol region duration
maxSpeedCalculationWindow = 2 * time.Hour
// minSpeedCalculationWindow is the minimum speed calculation window
minSpeedCalculationWindow = 10 * time.Minute
)

// Manager is used to maintain the progresses we care about.
type Manager struct {
Expand All @@ -46,12 +52,28 @@ type progressIndicator struct {
remaining float64
// We use a fixed interval's history to calculate the latest average speed.
history *list.List
// We use speedStatisticalWindow / updateInterval to get the windowLengthLimit.
// Assume that the windowLengthLimit is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4].
// We use (maxSpeedCalculationWindow / updateInterval + 1) to get the windowCapacity.
// Assume that the windowCapacity is 4, the init value is 1. After update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4].
// Then we update it again with 5, the window will become [2, 3, 4, 5].
windowLengthLimit int
updateInterval time.Duration
lastSpeed float64
windowCapacity int
// windowLength is used to determine what data will be computed.
// Assume that the windowLength is 2, the init value is 1. The value that will be calculated are [1].
// After update 3 times with 2, 3, 4 separately. The value that will be calculated are [3,4] and the values in queue are [(1,2),3,4].
// It helps us avoid calculation results jumping change when patrol-region-interval changes.
windowLength int
// front is the first element which should be used.
// currentWindowLength indicates where the front is currently in the queue.
// Assume that the windowLength is 2, the init value is 1. The front is [1] and currentWindowLength is 1.
// After update 3 times with 2, 3, 4 separately.
// The front is [3], the currentWindowLength is 2, and values in queue are [(1,2),3,4]
// ^ front
// - - currentWindowLength = len([3,4]) = 2
// We will always keep the currentWindowLength equal to windowLength if the actual size is enough.
front *list.Element
currentWindowLength int

updateInterval time.Duration
lastSpeed float64
}

// Reset resets the progress manager.
Expand All @@ -62,52 +84,89 @@ func (m *Manager) Reset() {
m.progresses = make(map[string]*progressIndicator)
}

// Option is used to do some action for progressIndicator.
type Option func(*progressIndicator)

// WindowDurationOption changes the time window size.
func WindowDurationOption(dur time.Duration) func(*progressIndicator) {
return func(pi *progressIndicator) {
if dur < minSpeedCalculationWindow {
dur = minSpeedCalculationWindow
} else if dur > maxSpeedCalculationWindow {
dur = maxSpeedCalculationWindow
}
pi.windowLength = int(dur/pi.updateInterval) + 1
}
}

// AddProgress adds a progress into manager if it doesn't exist.
func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration) (exist bool) {
func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration, opts ...Option) (exist bool) {
m.Lock()
defer m.Unlock()

history := list.New()
history.PushBack(current)
if _, exist = m.progresses[progress]; !exist {
m.progresses[progress] = &progressIndicator{
total: total,
remaining: total,
history: history,
windowLengthLimit: int(speedStatisticalWindow / updateInterval),
updateInterval: updateInterval,
pi := &progressIndicator{
total: total,
remaining: total,
history: history,
windowCapacity: int(maxSpeedCalculationWindow/updateInterval) + 1,
windowLength: int(minSpeedCalculationWindow / updateInterval),
updateInterval: updateInterval,
}
for _, op := range opts {
op(pi)
}
m.progresses[progress] = pi
pi.front = history.Front()
pi.currentWindowLength = 1
}
return
}

// UpdateProgress updates the progress if it exists.
func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool) {
func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool, opts ...Option) {
m.Lock()
defer m.Unlock()

if p, exist := m.progresses[progress]; exist {
for _, op := range opts {
op(p)
}
p.remaining = remaining
if p.total < remaining {
p.total = remaining
}

if p.history.Len() > p.windowLengthLimit {
p.history.PushBack(current)
p.currentWindowLength++

// try to move `front` into correct place.
for p.currentWindowLength > p.windowLength {
p.front = p.front.Next()
p.currentWindowLength--
}
for p.currentWindowLength < p.windowLength && p.front.Prev() != nil {
p.front = p.front.Prev()
p.currentWindowLength++
}

for p.history.Len() > p.windowCapacity {
p.history.Remove(p.history.Front())
}
p.history.PushBack(current)

// It means it just init and we haven't update the progress
if p.history.Len() <= 1 {
p.lastSpeed = 0
} else if isInc {
// the value increases, e.g., [1, 2, 3]
p.lastSpeed = (p.history.Back().Value.(float64) - p.history.Front().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
p.lastSpeed = (current - p.front.Value.(float64)) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())

Check warning on line 165 in pkg/progress/progress.go

View check run for this annotation

Codecov / codecov/patch

pkg/progress/progress.go#L164-L165

Added lines #L164 - L165 were not covered by tests
} else {
// the value decreases, e.g., [3, 2, 1]
p.lastSpeed = (p.history.Front().Value.(float64) - p.history.Back().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
p.lastSpeed = (p.front.Value.(float64) - current) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
}
if p.lastSpeed < 0 {
p.lastSpeed = 0
Expand Down
Loading

0 comments on commit 7602994

Please sign in to comment.