Skip to content

Commit

Permalink
*: check raftcluster nil (tikv#7054) (tikv#7068)
Browse files Browse the repository at this point in the history
close tikv#7053

Signed-off-by: husharp <[email protected]>

Co-authored-by: husharp <[email protected]>
Co-authored-by: Hu# <[email protected]>
  • Loading branch information
ti-chi-bot and HuSharp authored Sep 14, 2024
1 parent 41c6bc2 commit 9365b94
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 21 deletions.
1 change: 0 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,6 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB
if rc == nil {
return &pdpb.ReportBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil
}

_, err := rc.HandleBatchReportSplit(request)
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
Expand Down
14 changes: 10 additions & 4 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,14 +901,20 @@ func (h *Handler) ResetTS(ts uint64) error {

// SetStoreLimitScene sets the limit values for different scenes
func (h *Handler) SetStoreLimitScene(scene *storelimit.Scene, limitType storelimit.Type) {
cluster := h.s.GetRaftCluster()
cluster.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return
}
rc.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
}

// GetStoreLimitScene returns the limit values for different scenes
func (h *Handler) GetStoreLimitScene(limitType storelimit.Type) *storelimit.Scene {
cluster := h.s.GetRaftCluster()
return cluster.GetStoreLimiter().StoreLimitScene(limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return nil
}
return rc.GetStoreLimiter().StoreLimitScene(limitType)
}

// PluginLoad loads the plugin referenced by the pluginPath
Expand Down
44 changes: 28 additions & 16 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,18 +853,18 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {
}
old := s.persistOptions.GetReplicationConfig()
if cfg.EnablePlacementRules != old.EnablePlacementRules {
raftCluster := s.GetRaftCluster()
if raftCluster == nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if cfg.EnablePlacementRules {
// initialize rule manager.
if err := raftCluster.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel); err != nil {
if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel); err != nil {
return err
}
} else {
// NOTE: can be removed after placement rules feature is enabled by default.
for _, s := range raftCluster.GetStores() {
for _, s := range rc.GetStores() {
if !s.IsTombstone() && core.IsStoreContainLabel(s.GetMeta(), core.EngineKey, core.EngineTiFlash) {
return errors.New("cannot disable placement rules with TiFlash nodes")
}
Expand All @@ -874,8 +874,12 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {

var rule *placement.Rule
if cfg.EnablePlacementRules {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
// replication.MaxReplicas won't work when placement rule is enabled and not only have one default rule.
defaultRule := s.GetRaftCluster().GetRuleManager().GetRule("pd", "default")
defaultRule := rc.GetRuleManager().GetRule("pd", "default")

CheckInDefaultRule := func() error {
// replication config won't work when placement rule is enabled and exceeds one default rule
Expand All @@ -902,7 +906,11 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {
rule.Count = int(cfg.MaxReplicas)
rule.LocationLabels = cfg.LocationLabels
rule.IsolationLevel = cfg.IsolationLevel
if err := s.GetRaftCluster().GetRuleManager().SetRule(rule); err != nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if err := rc.GetRuleManager().SetRule(rule); err != nil {
log.Error("failed to update rule count",
errs.ZapError(err))
return err
Expand All @@ -914,7 +922,11 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {
s.persistOptions.SetReplicationConfig(old)
if rule != nil {
rule.Count = int(old.MaxReplicas)
if e := s.GetRaftCluster().GetRuleManager().SetRule(rule); e != nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if e := rc.GetRuleManager().SetRule(rule); e != nil {
log.Error("failed to roll back count of rule when update replication config", errs.ZapError(e))
}
}
Expand Down Expand Up @@ -1088,18 +1100,18 @@ func (s *Server) GetServerOption() *config.PersistOptions {

// GetMetaRegions gets meta regions from cluster.
func (s *Server) GetMetaRegions() []*metapb.Region {
cluster := s.GetRaftCluster()
if cluster != nil {
return cluster.GetMetaRegions()
rc := s.GetRaftCluster()
if rc != nil {
return rc.GetMetaRegions()
}
return nil
}

// GetRegions gets regions from cluster.
func (s *Server) GetRegions() []*core.RegionInfo {
cluster := s.GetRaftCluster()
if cluster != nil {
return cluster.GetRegions()
rc := s.GetRaftCluster()
if rc != nil {
return rc.GetRegions()
}
return nil
}
Expand Down Expand Up @@ -1154,9 +1166,9 @@ func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) erro
}
log.Info("replication mode config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))

cluster := s.GetRaftCluster()
if cluster != nil {
err := cluster.GetReplicationMode().UpdateConfig(cfg)
rc := s.GetRaftCluster()
if rc != nil {
err := rc.GetReplicationMode().UpdateConfig(cfg)
if err != nil {
log.Warn("failed to update replication mode", errs.ZapError(err))
// revert to old config
Expand Down

0 comments on commit 9365b94

Please sign in to comment.