Skip to content

Commit

Permalink
Merge branch 'release-8.5' into cherry-pick-8764-to-release-8.5
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Dec 25, 2024
2 parents beb543a + d190c0e commit a3ab827
Show file tree
Hide file tree
Showing 8 changed files with 332 additions and 76 deletions.
88 changes: 88 additions & 0 deletions pkg/mock/mockserver/mockserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mockserver

import (
"context"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/grpcutil"
)

// MockServer is used to mock Server for test use.
type MockServer struct {
ctx context.Context
member, leader *pdpb.Member
storage storage.Storage
bc *core.BasicCluster
}

// NewMockServer creates a new MockServer.
func NewMockServer(ctx context.Context, member, leader *pdpb.Member, storage storage.Storage, bc *core.BasicCluster) *MockServer {
return &MockServer{
ctx: ctx,
member: member,
leader: leader,
storage: storage,
bc: bc,
}
}

// LoopContext returns the context of the server.
func (s *MockServer) LoopContext() context.Context {
return s.ctx
}

// ClusterID returns the cluster ID of the server.
func (*MockServer) ClusterID() uint64 {
return 1
}

// GetMemberInfo returns the member info of the server.
func (s *MockServer) GetMemberInfo() *pdpb.Member {
return s.member
}

// GetLeader returns the leader of the server.
func (s *MockServer) GetLeader() *pdpb.Member {
return s.leader
}

// GetStorage returns the storage of the server.
func (s *MockServer) GetStorage() storage.Storage {
return s.storage
}

// Name returns the name of the server.
func (*MockServer) Name() string {
return "mock-server"
}

// GetRegions returns the regions of the server.
func (s *MockServer) GetRegions() []*core.RegionInfo {
return s.bc.GetRegions()
}

// GetTLSConfig returns the TLS config of the server.
func (*MockServer) GetTLSConfig() *grpcutil.TLSConfig {
return &grpcutil.TLSConfig{}
}

// GetBasicCluster returns the basic cluster of the server.
func (s *MockServer) GetBasicCluster() *core.BasicCluster {
return s.bc
}
19 changes: 15 additions & 4 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
keepaliveTime = 10 * time.Second
keepaliveTimeout = 3 * time.Second
msgSize = 8 * units.MiB
retryInterval = time.Second
)

// StopSyncWithLeader stop to sync the region with leader.
Expand Down Expand Up @@ -153,7 +154,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
}
}
log.Error("server failed to establish sync stream with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err))
time.Sleep(time.Second)
select {
case <-ctx.Done():
log.Info("stop synchronizing with leader due to context canceled")
return
case <-time.After(retryInterval):
}
continue
}
log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.getNextIndex()))
Expand All @@ -165,7 +171,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
if err = stream.CloseSend(); err != nil {
log.Error("failed to terminate client stream", errs.ZapError(errs.ErrGRPCCloseSend, err))
}
time.Sleep(time.Second)
select {
case <-ctx.Done():
log.Info("stop synchronizing with leader due to context canceled")
return
case <-time.After(retryInterval):
}
break
}
if s.history.getNextIndex() != resp.GetStartIndex() {
Expand Down Expand Up @@ -208,13 +219,13 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
}
ctx := &core.MetaProcessContext{
cctx := &core.MetaProcessContext{
Context: ctx,
TaskRunner: ratelimit.NewSyncRunner(),
Tracer: core.NewNoopHeartbeatProcessTracer(),
// no limit for followers.
}
saveKV, _, _, _ := regionGuide(ctx, region, origin)
saveKV, _, _, _ := regionGuide(cctx, region, origin)
overlaps := bc.PutRegion(region)

if hasBuckets {
Expand Down
69 changes: 15 additions & 54 deletions pkg/syncer/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockserver"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/grpcutil"
"google.golang.org/grpc/codes"
Expand All @@ -37,11 +37,13 @@ func TestLoadRegion(t *testing.T) {
rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil)
re.NoError(err)

server := &mockServer{
ctx: context.Background(),
storage: storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs),
bc: core.NewBasicCluster(),
}
server := mockserver.NewMockServer(
context.Background(),
nil,
nil,
storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs),
core.NewBasicCluster(),
)
for i := range 30 {
rs.SaveRegion(&metapb.Region{Id: uint64(i) + 1})
}
Expand All @@ -64,11 +66,13 @@ func TestErrorCode(t *testing.T) {
tempDir := t.TempDir()
rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil)
re.NoError(err)
server := &mockServer{
ctx: context.Background(),
storage: storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs),
bc: core.NewBasicCluster(),
}
server := mockserver.NewMockServer(
context.Background(),
nil,
nil,
storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs),
core.NewBasicCluster(),
)
ctx, cancel := context.WithCancel(context.TODO())
rc := NewRegionSyncer(server)
conn, err := grpcutil.GetClientConn(ctx, "http://127.0.0.1", nil)
Expand All @@ -79,46 +83,3 @@ func TestErrorCode(t *testing.T) {
re.True(ok)
re.Equal(codes.Canceled, ev.Code())
}

type mockServer struct {
ctx context.Context
member, leader *pdpb.Member
storage storage.Storage
bc *core.BasicCluster
}

func (s *mockServer) LoopContext() context.Context {
return s.ctx
}

func (*mockServer) ClusterID() uint64 {
return 1
}

func (s *mockServer) GetMemberInfo() *pdpb.Member {
return s.member
}

func (s *mockServer) GetLeader() *pdpb.Member {
return s.leader
}

func (s *mockServer) GetStorage() storage.Storage {
return s.storage
}

func (*mockServer) Name() string {
return "mock-server"
}

func (s *mockServer) GetRegions() []*core.RegionInfo {
return s.bc.GetRegions()
}

func (*mockServer) GetTLSConfig() *grpcutil.TLSConfig {
return &grpcutil.TLSConfig{}
}

func (s *mockServer) GetBasicCluster() *core.BasicCluster {
return s.bc
}
75 changes: 65 additions & 10 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (c *RaftCluster) InitCluster(
}

// Start starts a cluster.
func (c *RaftCluster) Start(s Server) error {
func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
c.Lock()
defer c.Unlock()

Expand All @@ -327,11 +327,32 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}
c.isAPIServiceMode = s.IsAPIServiceMode()
err := c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
return err
}
c.checkTSOService()
// We should not manage tso service when bootstrap try to start raft cluster.
// It only is controlled by leader election.
// Ref: https://github.com/tikv/pd/issues/8836
if !bootstrap {
c.checkTSOService()
}
defer func() {
if !bootstrap && err != nil {
if err := c.stopTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}
}
}()
failpoint.Inject("raftClusterReturn", func(val failpoint.Value) {
if val, ok := val.(bool); (ok && val) || !ok {
err = errors.New("raftClusterReturn")
} else {
err = nil
}
failpoint.Return(err)
})
cluster, err := c.LoadClusterInfo()
if err != nil {
return err
Expand Down Expand Up @@ -422,12 +443,12 @@ func (c *RaftCluster) checkTSOService() {
log.Info("TSO is provided by PD")
c.UnsetServiceIndependent(constant.TSOServiceName)
} else {
if err := c.startTSOJobsIfNeeded(); err != nil {
if err := c.stopTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}
log.Info("TSO is provided by TSO server")
if !c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by TSO server")
c.SetServiceIndependent(constant.TSOServiceName)
}
}
Expand Down Expand Up @@ -459,9 +480,22 @@ func (c *RaftCluster) runServiceCheckJob() {
log.Info("service check job is stopped")
return
case <-schedulingTicker.C:
c.checkSchedulingService()
// ensure raft cluster is running
// avoid unexpected startSchedulingJobs when raft cluster is stopping
c.RLock()
if c.running {
c.checkSchedulingService()
}
c.RUnlock()
case <-tsoTicker.C:
c.checkTSOService()
// ensure raft cluster is running
// avoid unexpected startTSOJobsIfNeeded when raft cluster is stopping
// ref: https://github.com/tikv/pd/issues/8781
c.RLock()
if c.running {
c.checkTSOService()
}
c.RUnlock()
}
}
}
Expand All @@ -478,6 +512,10 @@ func (c *RaftCluster) startTSOJobsIfNeeded() error {
log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
return err
}
} else if !c.running {
// If the global TSO allocator is already initialized, but the running flag is false,
// it means there maybe unexpected error happened before.
log.Warn("the global TSO allocator is already initialized before, but the cluster is not running")
}
return nil
}
Expand All @@ -489,6 +527,7 @@ func (c *RaftCluster) stopTSOJobsIfNeeded() error {
return err
}
if allocator.IsInitialize() {
log.Info("closing the global TSO allocator")
c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true)
failpoint.Inject("updateAfterResetTSO", func() {
allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
Expand Down Expand Up @@ -841,6 +880,15 @@ func (c *RaftCluster) runReplicationMode() {
// Stop stops the cluster.
func (c *RaftCluster) Stop() {
c.Lock()
// We need to try to stop tso jobs whatever the cluster is running or not.
// Because we need to call checkTSOService as soon as possible while the cluster is starting,
// which makes the cluster may not be running but the tso job has been started.
// For example, the cluster meets an error when starting, such as cluster is not bootstrapped.
// In this case, the `running` in `RaftCluster` is false, but the tso job has been started.
// Ref: https://github.com/tikv/pd/issues/8836
if err := c.stopTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop tso jobs", errs.ZapError(err))
}
if !c.running {
c.Unlock()
return
Expand All @@ -850,9 +898,6 @@ func (c *RaftCluster) Stop() {
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
c.stopSchedulingJobs()
}
if err := c.stopTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop tso jobs", errs.ZapError(err))
}
c.heartbeatRunner.Stop()
c.miscRunner.Stop()
c.logRunner.Stop()
Expand Down Expand Up @@ -2555,3 +2600,13 @@ func (c *RaftCluster) SetServiceIndependent(name string) {
func (c *RaftCluster) UnsetServiceIndependent(name string) {
c.independentServices.Delete(name)
}

// GetGlobalTSOAllocator return global tso allocator
// It only is used for test.
func (c *RaftCluster) GetGlobalTSOAllocator() tso.Allocator {
allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
return nil
}
return allocator
}
Loading

0 comments on commit a3ab827

Please sign in to comment.