Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: check raftcluster nil (#7054) #7070

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/api/min_resolved_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts/{store_id} [get]
func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := h.svr.GetRaftCluster()
c := getCluster(r)

Check warning on line 54 in server/api/min_resolved_ts.go

View check run for this annotation

Codecov / codecov/patch

server/api/min_resolved_ts.go#L54

Added line #L54 was not covered by tests
idStr := mux.Vars(r)["store_id"]
storeID, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
Expand All @@ -74,7 +74,7 @@
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts [get]
func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := h.svr.GetRaftCluster()
c := getCluster(r)
value := c.GetMinResolvedTS()
persistInterval := c.GetOpts().GetPDServerConfig().MinResolvedTSPersistenceInterval
h.rd.JSON(w, http.StatusOK, minResolvedTS{
Expand Down
6 changes: 5 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2320,7 +2320,11 @@
func (c *RaftCluster) GetStoreMinResolvedTS(storeID uint64) uint64 {
c.RLock()
defer c.RUnlock()
if !c.isInitialized() || !core.IsAvailableForMinResolvedTS(c.GetStore(storeID)) {
store := c.GetStore(storeID)
if store == nil {
return math.MaxUint64

Check warning on line 2325 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L2323-L2325

Added lines #L2323 - L2325 were not covered by tests
}
if !c.isInitialized() || !core.IsAvailableForMinResolvedTS(store) {

Check warning on line 2327 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L2327

Added line #L2327 was not covered by tests
return math.MaxUint64
}
return c.GetStore(storeID).GetMinResolvedTS()
Expand Down
4 changes: 3 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1232,7 +1232,6 @@
if rc == nil {
return &pdpb.ReportBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil
}

_, err := rc.HandleBatchReportSplit(request)
if err != nil {
return &pdpb.ReportBatchSplitResponse{
Expand Down Expand Up @@ -1695,6 +1694,9 @@
return rsp.(*pdpb.SplitAndScatterRegionsResponse), err
}
rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.SplitAndScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil

Check warning on line 1698 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1697-L1698

Added lines #L1697 - L1698 were not covered by tests
}
splitFinishedPercentage, newRegionIDs := rc.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit()))
scatterFinishedPercentage, err := scatterRegions(rc, newRegionIDs, request.GetGroup(), int(request.GetRetryLimit()))
if err != nil {
Expand Down
14 changes: 10 additions & 4 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,14 +930,20 @@

// SetStoreLimitScene sets the limit values for different scenes
func (h *Handler) SetStoreLimitScene(scene *storelimit.Scene, limitType storelimit.Type) {
cluster := h.s.GetRaftCluster()
cluster.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return

Check warning on line 935 in server/handler.go

View check run for this annotation

Codecov / codecov/patch

server/handler.go#L935

Added line #L935 was not covered by tests
}
rc.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
}

// GetStoreLimitScene returns the limit values for different scenes
func (h *Handler) GetStoreLimitScene(limitType storelimit.Type) *storelimit.Scene {
cluster := h.s.GetRaftCluster()
return cluster.GetStoreLimiter().StoreLimitScene(limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return nil

Check warning on line 944 in server/handler.go

View check run for this annotation

Codecov / codecov/patch

server/handler.go#L944

Added line #L944 was not covered by tests
}
return rc.GetStoreLimiter().StoreLimitScene(limitType)
}

// GetProgressByID returns the progress details for a given store ID.
Expand Down
60 changes: 40 additions & 20 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,18 +949,18 @@
}
old := s.persistOptions.GetReplicationConfig()
if cfg.EnablePlacementRules != old.EnablePlacementRules {
raftCluster := s.GetRaftCluster()
if raftCluster == nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if cfg.EnablePlacementRules {
// initialize rule manager.
if err := raftCluster.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil {
if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil {
return err
}
} else {
// NOTE: can be removed after placement rules feature is enabled by default.
for _, s := range raftCluster.GetStores() {
for _, s := range rc.GetStores() {
if !s.IsRemoved() && s.IsTiFlash() {
return errors.New("cannot disable placement rules with TiFlash nodes")
}
Expand All @@ -970,8 +970,12 @@

var rule *placement.Rule
if cfg.EnablePlacementRules {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()

Check warning on line 975 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L975

Added line #L975 was not covered by tests
}
// replication.MaxReplicas won't work when placement rule is enabled and not only have one default rule.
defaultRule := s.GetRaftCluster().GetRuleManager().GetRule("pd", "default")
defaultRule := rc.GetRuleManager().GetRule("pd", "default")

CheckInDefaultRule := func() error {
// replication config won't work when placement rule is enabled and exceeds one default rule
Expand All @@ -997,7 +1001,11 @@
if rule != nil {
rule.Count = int(cfg.MaxReplicas)
rule.LocationLabels = cfg.LocationLabels
if err := s.GetRaftCluster().GetRuleManager().SetRule(rule); err != nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()

Check warning on line 1006 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L1006

Added line #L1006 was not covered by tests
}
if err := rc.GetRuleManager().SetRule(rule); err != nil {
log.Error("failed to update rule count",
errs.ZapError(err))
return err
Expand All @@ -1009,7 +1017,11 @@
s.persistOptions.SetReplicationConfig(old)
if rule != nil {
rule.Count = int(old.MaxReplicas)
if e := s.GetRaftCluster().GetRuleManager().SetRule(rule); e != nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()

Check warning on line 1022 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L1020-L1022

Added lines #L1020 - L1022 were not covered by tests
}
if e := rc.GetRuleManager().SetRule(rule); e != nil {

Check warning on line 1024 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L1024

Added line #L1024 was not covered by tests
log.Error("failed to roll back count of rule when update replication config", errs.ZapError(e))
}
}
Expand Down Expand Up @@ -1243,18 +1255,18 @@

// GetMetaRegions gets meta regions from cluster.
func (s *Server) GetMetaRegions() []*metapb.Region {
cluster := s.GetRaftCluster()
if cluster != nil {
return cluster.GetMetaRegions()
rc := s.GetRaftCluster()
if rc != nil {
return rc.GetMetaRegions()

Check warning on line 1260 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L1258-L1260

Added lines #L1258 - L1260 were not covered by tests
}
return nil
}

// GetRegions gets regions from cluster.
func (s *Server) GetRegions() []*core.RegionInfo {
cluster := s.GetRaftCluster()
if cluster != nil {
return cluster.GetRegions()
rc := s.GetRaftCluster()
if rc != nil {
return rc.GetRegions()
}
return nil
}
Expand Down Expand Up @@ -1375,9 +1387,9 @@
}
log.Info("replication mode config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))

cluster := s.GetRaftCluster()
if cluster != nil {
err := cluster.GetReplicationMode().UpdateConfig(cfg)
rc := s.GetRaftCluster()
if rc != nil {
err := rc.GetReplicationMode().UpdateConfig(cfg)
if err != nil {
log.Warn("failed to update replication mode", errs.ZapError(err))
// revert to old config
Expand Down Expand Up @@ -1747,7 +1759,11 @@

// GetExternalTS returns external timestamp.
func (s *Server) GetExternalTS() uint64 {
return s.GetRaftCluster().GetExternalTS()
rc := s.GetRaftCluster()
if rc == nil {
return 0

Check warning on line 1764 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L1764

Added line #L1764 was not covered by tests
}
return rc.GetExternalTS()
}

// SetExternalTS returns external timestamp.
Expand All @@ -1761,14 +1777,18 @@
log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("global ts", globalTS))
return errors.New(desc)
}
currentExternalTS := s.GetRaftCluster().GetExternalTS()
c := s.GetRaftCluster()
if c == nil {
return errs.ErrNotBootstrapped.FastGenByArgs()

Check warning on line 1782 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L1782

Added line #L1782 was not covered by tests
}
currentExternalTS := c.GetExternalTS()
if tsoutil.CompareTimestampUint64(externalTS, currentExternalTS) != 1 {
desc := "the external timestamp should be larger than now"
log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("current external timestamp", currentExternalTS))
return errors.New(desc)
}
s.GetRaftCluster().SetExternalTS(externalTS)
return nil

return c.SetExternalTS(externalTS)
}

// SetClient sets the etcd client.
Expand Down
40 changes: 40 additions & 0 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,46 @@ func TestRemovingProgress(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))
}

func TestSendApiWhenRestartRaftCluster(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) {
conf.Replication.MaxReplicas = 1
})
re.NoError(err)
defer cluster.Destroy()

err = cluster.RunInitialServers()
re.NoError(err)
leader := cluster.GetServer(cluster.WaitLeader())

grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr())
clusterID := leader.GetClusterID()
req := &pdpb.BootstrapRequest{
Header: testutil.NewRequestHeader(clusterID),
Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"},
Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}},
}
resp, err := grpcPDClient.Bootstrap(context.Background(), req)
re.NoError(err)
re.Nil(resp.GetHeader().GetError())

// Mock restart raft cluster
rc := leader.GetRaftCluster()
re.NotNil(rc)
rc.Stop()

// Mock client-go will still send request
output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/min-resolved-ts", http.MethodGet, http.StatusInternalServerError)
re.Contains(string(output), "TiKV cluster not bootstrapped, please start TiKV first")

err = rc.Start(leader.GetServer())
re.NoError(err)
rc = leader.GetRaftCluster()
re.NotNil(rc)
}

func TestPreparingProgress(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`))
Expand Down
Loading