diff --git a/pkg/gc/safepoint.go b/pkg/gc/safepoint.go index 167f0319daa7..3a8f14ff7873 100644 --- a/pkg/gc/safepoint.go +++ b/pkg/gc/safepoint.go @@ -18,20 +18,26 @@ import ( "math" "time" + "github.com/pingcap/errors" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/tikv/pd/server/config" ) +var blockGCSafePointErrmsg = "don't allow update gc safe point v1." +var blockServiceSafepointErrmsg = "don't allow update service safe point v1." + // SafePointManager is the manager for safePoint of GC and services. type SafePointManager struct { gcLock syncutil.Mutex serviceGCLock syncutil.Mutex store endpoint.GCSafePointStorage + cfg config.PDServerConfig } // NewSafePointManager creates a SafePointManager of GC and services. -func NewSafePointManager(store endpoint.GCSafePointStorage) *SafePointManager { - return &SafePointManager{store: store} +func NewSafePointManager(store endpoint.GCSafePointStorage, cfg config.PDServerConfig) *SafePointManager { + return &SafePointManager{store: store, cfg: cfg} } // LoadGCSafePoint loads current GC safe point from storage. @@ -49,6 +55,11 @@ func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafe if err != nil { return } + if manager.cfg.BlockSafePointV1 { + err = errors.Errorf(blockGCSafePointErrmsg) + return + } + if oldSafePoint >= newSafePoint { return } @@ -58,6 +69,9 @@ func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafe // UpdateServiceGCSafePoint update the safepoint for a specific service. func (manager *SafePointManager) UpdateServiceGCSafePoint(serviceID string, newSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) { + if manager.cfg.BlockSafePointV1 { + return nil, false, errors.Errorf(blockServiceSafepointErrmsg) + } manager.serviceGCLock.Lock() defer manager.serviceGCLock.Unlock() minServiceSafePoint, err = manager.store.LoadMinServiceGCSafePoint(now) diff --git a/pkg/gc/safepoint_test.go b/pkg/gc/safepoint_test.go index 41ce02639fd6..39cd3660b2b2 100644 --- a/pkg/gc/safepoint_test.go +++ b/pkg/gc/safepoint_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/server/config" ) func newGCStorage() endpoint.GCSafePointStorage { @@ -30,7 +31,7 @@ func newGCStorage() endpoint.GCSafePointStorage { } func TestGCSafePointUpdateSequentially(t *testing.T) { - gcSafePointManager := NewSafePointManager(newGCStorage()) + gcSafePointManager := NewSafePointManager(newGCStorage(), config.PDServerConfig{}) re := require.New(t) curSafePoint := uint64(0) // update gc safePoint with asc value. @@ -59,7 +60,7 @@ func TestGCSafePointUpdateSequentially(t *testing.T) { } func TestGCSafePointUpdateCurrently(t *testing.T) { - gcSafePointManager := NewSafePointManager(newGCStorage()) + gcSafePointManager := NewSafePointManager(newGCStorage(), config.PDServerConfig{}) maxSafePoint := uint64(1000) wg := sync.WaitGroup{} re := require.New(t) @@ -83,7 +84,7 @@ func TestGCSafePointUpdateCurrently(t *testing.T) { func TestServiceGCSafePointUpdate(t *testing.T) { re := require.New(t) - manager := NewSafePointManager(newGCStorage()) + manager := NewSafePointManager(newGCStorage(), config.PDServerConfig{}) gcworkerServiceID := "gc_worker" cdcServiceID := "cdc" brServiceID := "br" @@ -162,3 +163,22 @@ func TestServiceGCSafePointUpdate(t *testing.T) { re.NoError(err) re.True(updated) } + +func TestBlockUpdateSafePointV1(t *testing.T) { + re := require.New(t) + manager := NewSafePointManager(newGCStorage(), config.PDServerConfig{BlockSafePointV1: true}) + gcworkerServiceID := "gc_worker" + gcWorkerSafePoint := uint64(8) + + min, updated, err := manager.UpdateServiceGCSafePoint(gcworkerServiceID, gcWorkerSafePoint, math.MaxInt64, time.Now()) + re.Error(err, blockServiceSafepointErrmsg) + re.Equal(err.Error(), blockServiceSafepointErrmsg) + re.False(updated) + re.Nil(min) + + oldSafePoint, err := manager.UpdateGCSafePoint(gcWorkerSafePoint) + re.Error(err) + re.Equal(err.Error(), blockGCSafePointErrmsg) + + re.Equal(uint64(0), oldSafePoint) +} diff --git a/server/config/config.go b/server/config/config.go index 5bda5bdc9f93..9c9be8ad2b3f 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -1106,6 +1106,8 @@ type PDServerConfig struct { EnableGOGCTuner bool `toml:"enable-gogc-tuner" json:"enable-gogc-tuner,string"` // GCTunerThreshold is the threshold of GC tuner. GCTunerThreshold float64 `toml:"gc-tuner-threshold" json:"gc-tuner-threshold"` + // BlockSafePointV1 is used to control gc safe point v1 and service safe point v1 can not be updated. + BlockSafePointV1 bool `toml:"block-safe-point-v1" json:"block-safe-point-v1,string"` } func (c *PDServerConfig) adjust(meta *configutil.ConfigMetaData) error { diff --git a/server/server.go b/server/server.go index a6926f1a9d9d..63a22ad7c393 100644 --- a/server/server.go +++ b/server/server.go @@ -452,7 +452,7 @@ func (s *Server) startServer(ctx context.Context) error { return err } - s.gcSafePointManager = gc.NewSafePointManager(s.storage) + s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg) s.basicCluster = core.NewBasicCluster() s.cluster = cluster.NewRaftCluster(ctx, s.clusterID, syncer.NewRegionSyncer(s), s.client, s.httpClient) keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{