Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer: exit watch leader immediately (#8824) #8830

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions pkg/mock/mockserver/mockserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mockserver

import (
"context"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/grpcutil"
)

// MockServer is used to mock Server for test use.
type MockServer struct {
ctx context.Context
member, leader *pdpb.Member
storage storage.Storage
bc *core.BasicCluster
}

// NewMockServer creates a new MockServer.
func NewMockServer(ctx context.Context, member, leader *pdpb.Member, storage storage.Storage, bc *core.BasicCluster) *MockServer {
return &MockServer{
ctx: ctx,
member: member,
leader: leader,
storage: storage,
bc: bc,
}
}

// LoopContext returns the context of the server.
func (s *MockServer) LoopContext() context.Context {
return s.ctx
}

// ClusterID returns the cluster ID of the server.
func (*MockServer) ClusterID() uint64 {
return 1
}

// GetMemberInfo returns the member info of the server.
func (s *MockServer) GetMemberInfo() *pdpb.Member {
return s.member
}

// GetLeader returns the leader of the server.
func (s *MockServer) GetLeader() *pdpb.Member {
return s.leader
}

// GetStorage returns the storage of the server.
func (s *MockServer) GetStorage() storage.Storage {
return s.storage
}

// Name returns the name of the server.
func (*MockServer) Name() string {
return "mock-server"
}

// GetRegions returns the regions of the server.
func (s *MockServer) GetRegions() []*core.RegionInfo {
return s.bc.GetRegions()
}

// GetTLSConfig returns the TLS config of the server.
func (*MockServer) GetTLSConfig() *grpcutil.TLSConfig {
return &grpcutil.TLSConfig{}
}

// GetBasicCluster returns the basic cluster of the server.
func (s *MockServer) GetBasicCluster() *core.BasicCluster {
return s.bc
}
15 changes: 13 additions & 2 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
keepaliveTime = 10 * time.Second
keepaliveTimeout = 3 * time.Second
msgSize = 8 * units.MiB
retryInterval = time.Second
)

// StopSyncWithLeader stop to sync the region with leader.
Expand Down Expand Up @@ -150,7 +151,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
}
}
log.Error("server failed to establish sync stream with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err))
time.Sleep(time.Second)
select {
case <-ctx.Done():
log.Info("stop synchronizing with leader due to context canceled")
return
case <-time.After(retryInterval):
}
continue
}
log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex()))
Expand All @@ -162,7 +168,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
if err = stream.CloseSend(); err != nil {
log.Error("failed to terminate client stream", errs.ZapError(errs.ErrGRPCCloseSend, err))
}
time.Sleep(time.Second)
select {
case <-ctx.Done():
log.Info("stop synchronizing with leader due to context canceled")
return
case <-time.After(retryInterval):
}
break
}
if s.history.GetNextIndex() != resp.GetStartIndex() {
Expand Down
69 changes: 15 additions & 54 deletions pkg/syncer/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockserver"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/grpcutil"
"google.golang.org/grpc/codes"
Expand All @@ -37,11 +37,13 @@ func TestLoadRegion(t *testing.T) {
rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil)
re.NoError(err)

server := &mockServer{
ctx: context.Background(),
storage: storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs),
bc: core.NewBasicCluster(),
}
server := mockserver.NewMockServer(
context.Background(),
nil,
nil,
storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs),
core.NewBasicCluster(),
)
for i := 0; i < 30; i++ {
rs.SaveRegion(&metapb.Region{Id: uint64(i) + 1})
}
Expand All @@ -64,11 +66,13 @@ func TestErrorCode(t *testing.T) {
tempDir := t.TempDir()
rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil)
re.NoError(err)
server := &mockServer{
ctx: context.Background(),
storage: storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs),
bc: core.NewBasicCluster(),
}
server := mockserver.NewMockServer(
context.Background(),
nil,
nil,
storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs),
core.NewBasicCluster(),
)
ctx, cancel := context.WithCancel(context.TODO())
rc := NewRegionSyncer(server)
conn, err := grpcutil.GetClientConn(ctx, "http://127.0.0.1", nil)
Expand All @@ -79,46 +83,3 @@ func TestErrorCode(t *testing.T) {
re.True(ok)
re.Equal(codes.Canceled, ev.Code())
}

type mockServer struct {
ctx context.Context
member, leader *pdpb.Member
storage storage.Storage
bc *core.BasicCluster
}

func (s *mockServer) LoopContext() context.Context {
return s.ctx
}

func (s *mockServer) ClusterID() uint64 {
return 1
}

func (s *mockServer) GetMemberInfo() *pdpb.Member {
return s.member
}

func (s *mockServer) GetLeader() *pdpb.Member {
return s.leader
}

func (s *mockServer) GetStorage() storage.Storage {
return s.storage
}

func (s *mockServer) Name() string {
return "mock-server"
}

func (s *mockServer) GetRegions() []*core.RegionInfo {
return s.bc.GetRegions()
}

func (s *mockServer) GetTLSConfig() *grpcutil.TLSConfig {
return &grpcutil.TLSConfig{}
}

func (s *mockServer) GetBasicCluster() *core.BasicCluster {
return s.bc
}
2 changes: 1 addition & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,11 @@ func TestLeaderTransferAndMoveCluster(t *testing.T) {
oldServers := cluster.GetServers()
oldLeaderName := cluster.WaitLeader()
for i := 0; i < 3; i++ {
time.Sleep(5 * time.Second)
newPD, err := cluster.Join(ctx)
re.NoError(err)
re.NoError(newPD.Run())
oldLeaderName = cluster.WaitLeader()
time.Sleep(5 * time.Second)
}

// ABCDEF->DEF
Expand Down
Loading