Skip to content

Commit

Permalink
Merge branch 'master' into no-limit-scatter
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jul 28, 2023
2 parents 56034ed + 4db1735 commit 53b0f2e
Show file tree
Hide file tree
Showing 8 changed files with 857 additions and 94 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/mailru/easyjson v0.7.6
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.8 // indirect
Expand Down
18 changes: 16 additions & 2 deletions pkg/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,26 @@ import (
"math"
"time"

"github.com/pingcap/errors"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/server/config"
)

var blockGCSafePointErrmsg = "don't allow update gc safe point v1."
var blockServiceSafepointErrmsg = "don't allow update service safe point v1."

// SafePointManager is the manager for safePoint of GC and services.
type SafePointManager struct {
gcLock syncutil.Mutex
serviceGCLock syncutil.Mutex
store endpoint.GCSafePointStorage
cfg config.PDServerConfig
}

// NewSafePointManager creates a SafePointManager of GC and services.
func NewSafePointManager(store endpoint.GCSafePointStorage) *SafePointManager {
return &SafePointManager{store: store}
func NewSafePointManager(store endpoint.GCSafePointStorage, cfg config.PDServerConfig) *SafePointManager {
return &SafePointManager{store: store, cfg: cfg}
}

// LoadGCSafePoint loads current GC safe point from storage.
Expand All @@ -49,6 +55,11 @@ func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafe
if err != nil {
return
}
if manager.cfg.BlockSafePointV1 {
err = errors.Errorf(blockGCSafePointErrmsg)
return
}

if oldSafePoint >= newSafePoint {
return
}
Expand All @@ -58,6 +69,9 @@ func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafe

// UpdateServiceGCSafePoint update the safepoint for a specific service.
func (manager *SafePointManager) UpdateServiceGCSafePoint(serviceID string, newSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
if manager.cfg.BlockSafePointV1 {
return nil, false, errors.Errorf(blockServiceSafepointErrmsg)
}
manager.serviceGCLock.Lock()
defer manager.serviceGCLock.Unlock()
minServiceSafePoint, err = manager.store.LoadMinServiceGCSafePoint(now)
Expand Down
26 changes: 23 additions & 3 deletions pkg/gc/safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/server/config"
)

func newGCStorage() endpoint.GCSafePointStorage {
return endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
}

func TestGCSafePointUpdateSequentially(t *testing.T) {
gcSafePointManager := NewSafePointManager(newGCStorage())
gcSafePointManager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
re := require.New(t)
curSafePoint := uint64(0)
// update gc safePoint with asc value.
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestGCSafePointUpdateSequentially(t *testing.T) {
}

func TestGCSafePointUpdateCurrently(t *testing.T) {
gcSafePointManager := NewSafePointManager(newGCStorage())
gcSafePointManager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
maxSafePoint := uint64(1000)
wg := sync.WaitGroup{}
re := require.New(t)
Expand All @@ -83,7 +84,7 @@ func TestGCSafePointUpdateCurrently(t *testing.T) {

func TestServiceGCSafePointUpdate(t *testing.T) {
re := require.New(t)
manager := NewSafePointManager(newGCStorage())
manager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
gcworkerServiceID := "gc_worker"
cdcServiceID := "cdc"
brServiceID := "br"
Expand Down Expand Up @@ -162,3 +163,22 @@ func TestServiceGCSafePointUpdate(t *testing.T) {
re.NoError(err)
re.True(updated)
}

func TestBlockUpdateSafePointV1(t *testing.T) {
re := require.New(t)
manager := NewSafePointManager(newGCStorage(), config.PDServerConfig{BlockSafePointV1: true})
gcworkerServiceID := "gc_worker"
gcWorkerSafePoint := uint64(8)

min, updated, err := manager.UpdateServiceGCSafePoint(gcworkerServiceID, gcWorkerSafePoint, math.MaxInt64, time.Now())
re.Error(err, blockServiceSafepointErrmsg)
re.Equal(err.Error(), blockServiceSafepointErrmsg)
re.False(updated)
re.Nil(min)

oldSafePoint, err := manager.UpdateGCSafePoint(gcWorkerSafePoint)
re.Error(err)
re.Equal(err.Error(), blockGCSafePointErrmsg)

re.Equal(uint64(0), oldSafePoint)
}
Loading

0 comments on commit 53b0f2e

Please sign in to comment.