Skip to content

Commit

Permalink
*: check raftcluster nil (#7054) (#7069)
Browse files Browse the repository at this point in the history
close #7053

Signed-off-by: husharp <[email protected]>

Co-authored-by: husharp <[email protected]>
  • Loading branch information
ti-chi-bot and HuSharp authored Sep 19, 2023
1 parent 41a070b commit f4d1cb7
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 23 deletions.
2 changes: 1 addition & 1 deletion server/api/min_resolved_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type minResolvedTS struct {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts [get]
func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := h.svr.GetRaftCluster()
c := getCluster(r)
value := c.GetMinResolvedTS()
persistInterval := c.GetOpts().GetPDServerConfig().MinResolvedTSPersistenceInterval
h.rd.JSON(w, http.StatusOK, minResolvedTS{
Expand Down
2 changes: 1 addition & 1 deletion server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {

// min resolved ts API
minResolvedTSHandler := newMinResolvedTSHandler(svr, rd)
registerFunc(apiRouter, "/min-resolved-ts", minResolvedTSHandler.GetMinResolvedTS, setMethods("GET"))
registerFunc(clusterRouter, "/min-resolved-ts", minResolvedTSHandler.GetMinResolvedTS, setMethods("GET"))

// unsafe admin operation API
unsafeOperationHandler := newUnsafeOperationHandler(svr, rd)
Expand Down
4 changes: 3 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,6 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB
if rc == nil {
return &pdpb.ReportBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil
}

_, err := rc.HandleBatchReportSplit(request)
if err != nil {
return &pdpb.ReportBatchSplitResponse{
Expand Down Expand Up @@ -1697,6 +1696,9 @@ func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.S
return rsp.(*pdpb.SplitAndScatterRegionsResponse), err
}
rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.SplitAndScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}
splitFinishedPercentage, newRegionIDs := rc.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit()))
scatterFinishedPercentage, err := scatterRegions(rc, newRegionIDs, request.GetGroup(), int(request.GetRetryLimit()))
if err != nil {
Expand Down
14 changes: 10 additions & 4 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,14 +925,20 @@ func (h *Handler) ResetTS(ts uint64) error {

// SetStoreLimitScene sets the limit values for different scenes
func (h *Handler) SetStoreLimitScene(scene *storelimit.Scene, limitType storelimit.Type) {
cluster := h.s.GetRaftCluster()
cluster.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return
}
rc.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
}

// GetStoreLimitScene returns the limit values for different scenes
func (h *Handler) GetStoreLimitScene(limitType storelimit.Type) *storelimit.Scene {
cluster := h.s.GetRaftCluster()
return cluster.GetStoreLimiter().StoreLimitScene(limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return nil
}
return rc.GetStoreLimiter().StoreLimitScene(limitType)
}

// GetProgressByID returns the progress details for a given store ID.
Expand Down
44 changes: 28 additions & 16 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,18 +891,18 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {
}
old := s.persistOptions.GetReplicationConfig()
if cfg.EnablePlacementRules != old.EnablePlacementRules {
raftCluster := s.GetRaftCluster()
if raftCluster == nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if cfg.EnablePlacementRules {
// initialize rule manager.
if err := raftCluster.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil {
if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil {
return err
}
} else {
// NOTE: can be removed after placement rules feature is enabled by default.
for _, s := range raftCluster.GetStores() {
for _, s := range rc.GetStores() {
if !s.IsRemoved() && s.IsTiFlash() {
return errors.New("cannot disable placement rules with TiFlash nodes")
}
Expand All @@ -912,8 +912,12 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {

var rule *placement.Rule
if cfg.EnablePlacementRules {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
// replication.MaxReplicas won't work when placement rule is enabled and not only have one default rule.
defaultRule := s.GetRaftCluster().GetRuleManager().GetRule("pd", "default")
defaultRule := rc.GetRuleManager().GetRule("pd", "default")

CheckInDefaultRule := func() error {
// replication config won't work when placement rule is enabled and exceeds one default rule
Expand All @@ -939,7 +943,11 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {
if rule != nil {
rule.Count = int(cfg.MaxReplicas)
rule.LocationLabels = cfg.LocationLabels
if err := s.GetRaftCluster().GetRuleManager().SetRule(rule); err != nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if err := rc.GetRuleManager().SetRule(rule); err != nil {
log.Error("failed to update rule count",
errs.ZapError(err))
return err
Expand All @@ -951,7 +959,11 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {
s.persistOptions.SetReplicationConfig(old)
if rule != nil {
rule.Count = int(old.MaxReplicas)
if e := s.GetRaftCluster().GetRuleManager().SetRule(rule); e != nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if e := rc.GetRuleManager().SetRule(rule); e != nil {
log.Error("failed to roll back count of rule when update replication config", errs.ZapError(e))
}
}
Expand Down Expand Up @@ -1136,18 +1148,18 @@ func (s *Server) GetServerOption() *config.PersistOptions {

// GetMetaRegions gets meta regions from cluster.
func (s *Server) GetMetaRegions() []*metapb.Region {
cluster := s.GetRaftCluster()
if cluster != nil {
return cluster.GetMetaRegions()
rc := s.GetRaftCluster()
if rc != nil {
return rc.GetMetaRegions()
}
return nil
}

// GetRegions gets regions from cluster.
func (s *Server) GetRegions() []*core.RegionInfo {
cluster := s.GetRaftCluster()
if cluster != nil {
return cluster.GetRegions()
rc := s.GetRaftCluster()
if rc != nil {
return rc.GetRegions()
}
return nil
}
Expand Down Expand Up @@ -1253,9 +1265,9 @@ func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) erro
}
log.Info("replication mode config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))

cluster := s.GetRaftCluster()
if cluster != nil {
err := cluster.GetReplicationMode().UpdateConfig(cfg)
rc := s.GetRaftCluster()
if rc != nil {
err := rc.GetReplicationMode().UpdateConfig(cfg)
if err != nil {
log.Warn("failed to update replication mode", errs.ZapError(err))
// revert to old config
Expand Down
40 changes: 40 additions & 0 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,46 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) {
c.Assert(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"), IsNil)
}

func (s *testProgressSuite) TestSendApiWhenRestartRaftCluster(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) {
conf.Replication.MaxReplicas = 1
})
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
leader := cluster.GetServer(cluster.WaitLeader())

grpcPDClient := testutil.MustNewGrpcClient(c, leader.GetAddr())
clusterID := leader.GetClusterID()
req := &pdpb.BootstrapRequest{
Header: testutil.NewRequestHeader(clusterID),
Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"},
Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}},
}
resp, err := grpcPDClient.Bootstrap(context.Background(), req)
c.Assert(err, IsNil)
c.Assert(resp.GetHeader().GetError(), IsNil)

// Mock restart raft cluster
rc := leader.GetRaftCluster()
c.Assert(rc, NotNil)
rc.Stop()

// Mock client-go will still send request
output := sendRequest(c, leader.GetAddr()+"/pd/api/v1/min-resolved-ts", http.MethodGet, http.StatusInternalServerError)

c.Assert(strings.Contains(string(output), "TiKV cluster not bootstrapped, please start TiKV first"), IsTrue)

err = rc.Start(leader.GetServer())
c.Assert(err, IsNil)
rc = leader.GetRaftCluster()
c.Assert(rc, NotNil)
}

func sendRequest(c *C, url string, method string, statusCode int) []byte {
req, _ := http.NewRequest(method, url, nil)
resp, err := dialClient.Do(req)
Expand Down

0 comments on commit f4d1cb7

Please sign in to comment.