Skip to content

Commit

Permalink
Merge branch 'master' into no-limit-scatter
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Jul 27, 2023
2 parents 2f2b16b + 7ac9e6b commit 54f9277
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 6 deletions.
18 changes: 16 additions & 2 deletions pkg/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,26 @@ import (
"math"
"time"

"github.com/pingcap/errors"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/server/config"
)

var blockGCSafePointErrmsg = "don't allow update gc safe point v1."
var blockServiceSafepointErrmsg = "don't allow update service safe point v1."

// SafePointManager is the manager for safePoint of GC and services.
type SafePointManager struct {
gcLock syncutil.Mutex
serviceGCLock syncutil.Mutex
store endpoint.GCSafePointStorage
cfg config.PDServerConfig
}

// NewSafePointManager creates a SafePointManager of GC and services.
func NewSafePointManager(store endpoint.GCSafePointStorage) *SafePointManager {
return &SafePointManager{store: store}
func NewSafePointManager(store endpoint.GCSafePointStorage, cfg config.PDServerConfig) *SafePointManager {
return &SafePointManager{store: store, cfg: cfg}
}

// LoadGCSafePoint loads current GC safe point from storage.
Expand All @@ -49,6 +55,11 @@ func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafe
if err != nil {
return
}
if manager.cfg.BlockSafePointV1 {
err = errors.Errorf(blockGCSafePointErrmsg)
return
}

if oldSafePoint >= newSafePoint {
return
}
Expand All @@ -58,6 +69,9 @@ func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafe

// UpdateServiceGCSafePoint update the safepoint for a specific service.
func (manager *SafePointManager) UpdateServiceGCSafePoint(serviceID string, newSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
if manager.cfg.BlockSafePointV1 {
return nil, false, errors.Errorf(blockServiceSafepointErrmsg)
}
manager.serviceGCLock.Lock()
defer manager.serviceGCLock.Unlock()
minServiceSafePoint, err = manager.store.LoadMinServiceGCSafePoint(now)
Expand Down
26 changes: 23 additions & 3 deletions pkg/gc/safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/server/config"
)

func newGCStorage() endpoint.GCSafePointStorage {
return endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
}

func TestGCSafePointUpdateSequentially(t *testing.T) {
gcSafePointManager := NewSafePointManager(newGCStorage())
gcSafePointManager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
re := require.New(t)
curSafePoint := uint64(0)
// update gc safePoint with asc value.
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestGCSafePointUpdateSequentially(t *testing.T) {
}

func TestGCSafePointUpdateCurrently(t *testing.T) {
gcSafePointManager := NewSafePointManager(newGCStorage())
gcSafePointManager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
maxSafePoint := uint64(1000)
wg := sync.WaitGroup{}
re := require.New(t)
Expand All @@ -83,7 +84,7 @@ func TestGCSafePointUpdateCurrently(t *testing.T) {

func TestServiceGCSafePointUpdate(t *testing.T) {
re := require.New(t)
manager := NewSafePointManager(newGCStorage())
manager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
gcworkerServiceID := "gc_worker"
cdcServiceID := "cdc"
brServiceID := "br"
Expand Down Expand Up @@ -162,3 +163,22 @@ func TestServiceGCSafePointUpdate(t *testing.T) {
re.NoError(err)
re.True(updated)
}

func TestBlockUpdateSafePointV1(t *testing.T) {
re := require.New(t)
manager := NewSafePointManager(newGCStorage(), config.PDServerConfig{BlockSafePointV1: true})
gcworkerServiceID := "gc_worker"
gcWorkerSafePoint := uint64(8)

min, updated, err := manager.UpdateServiceGCSafePoint(gcworkerServiceID, gcWorkerSafePoint, math.MaxInt64, time.Now())
re.Error(err, blockServiceSafepointErrmsg)
re.Equal(err.Error(), blockServiceSafepointErrmsg)
re.False(updated)
re.Nil(min)

oldSafePoint, err := manager.UpdateGCSafePoint(gcWorkerSafePoint)
re.Error(err)
re.Equal(err.Error(), blockGCSafePointErrmsg)

re.Equal(uint64(0), oldSafePoint)
}
2 changes: 2 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,8 @@ type PDServerConfig struct {
EnableGOGCTuner bool `toml:"enable-gogc-tuner" json:"enable-gogc-tuner,string"`
// GCTunerThreshold is the threshold of GC tuner.
GCTunerThreshold float64 `toml:"gc-tuner-threshold" json:"gc-tuner-threshold"`
// BlockSafePointV1 is used to control gc safe point v1 and service safe point v1 can not be updated.
BlockSafePointV1 bool `toml:"block-safe-point-v1" json:"block-safe-point-v1,string"`
}

func (c *PDServerConfig) adjust(meta *configutil.ConfigMetaData) error {
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (s *Server) startServer(ctx context.Context) error {
return err
}

s.gcSafePointManager = gc.NewSafePointManager(s.storage)
s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, s.clusterID, syncer.NewRegionSyncer(s), s.client, s.httpClient)
keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{
Expand Down

0 comments on commit 54f9277

Please sign in to comment.