diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index e0fc14fcac9..e5161ad9e3e 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/timerutil" atomicutil "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -291,7 +292,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV()) if err != nil { log.Warn("watch resource group meta failed", zap.Error(err)) - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) }) @@ -301,7 +302,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) if err != nil { log.Warn("watch resource group config failed", zap.Error(err)) - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) } } case <-emergencyTokenAcquisitionTicker.C: @@ -335,7 +336,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { }) if !ok { watchMetaChannel = nil - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) }) @@ -371,7 +372,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { case resp, ok := <-watchConfigChannel: if !ok { watchConfigChannel = nil - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) }) diff --git a/client/timerpool/pool.go b/client/timerutil/pool.go similarity index 98% rename from client/timerpool/pool.go rename to client/timerutil/pool.go index 28ffacfc629..2d608b09053 100644 --- a/client/timerpool/pool.go +++ b/client/timerutil/pool.go @@ -4,7 +4,7 @@ // Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 -package timerpool +package timerutil import ( "sync" diff --git a/client/timerpool/pool_test.go b/client/timerutil/pool_test.go similarity index 98% rename from client/timerpool/pool_test.go rename to client/timerutil/pool_test.go index d6dffc723a9..f90a305d99f 100644 --- a/client/timerpool/pool_test.go +++ b/client/timerutil/pool_test.go @@ -4,7 +4,7 @@ // Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 -package timerpool +package timerutil import ( "testing" diff --git a/client/timerutil/util.go b/client/timerutil/util.go new file mode 100644 index 00000000000..7e24671a09e --- /dev/null +++ b/client/timerutil/util.go @@ -0,0 +1,32 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package timerutil + +import "time" + +// SafeResetTimer is used to reset timer safely. +// Before Go 1.23, the only safe way to use Reset was to call Timer.Stop and explicitly drain the timer first. +// We need be careful here, see more details in the comments of Timer.Reset. +// https://pkg.go.dev/time@master#Timer.Reset +func SafeResetTimer(t *time.Timer, d time.Duration) { + // Stop the timer if it's not stopped. + if !t.Stop() { + select { + case <-t.C: // try to drain from the channel + default: + } + } + t.Reset(d) +} diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index eb89f892f75..df0ad207138 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -29,7 +29,7 @@ import ( "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" "github.com/tikv/pd/client/retry" - "github.com/tikv/pd/client/timerpool" + "github.com/tikv/pd/client/timerutil" "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" @@ -151,7 +151,7 @@ func newTSDeadline( done chan struct{}, cancel context.CancelFunc, ) *deadline { - timer := timerpool.GlobalTimerPool.Get(timeout) + timer := timerutil.GlobalTimerPool.Get(timeout) return &deadline{ timer: timer, done: done, @@ -197,11 +197,11 @@ func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) { case <-d.timer.C: log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) d.cancel() - timerpool.GlobalTimerPool.Put(d.timer) + timerutil.GlobalTimerPool.Put(d.timer) case <-d.done: - timerpool.GlobalTimerPool.Put(d.timer) + timerutil.GlobalTimerPool.Put(d.timer) case <-ctx.Done(): - timerpool.GlobalTimerPool.Put(d.timer) + timerutil.GlobalTimerPool.Put(d.timer) return } case <-ctx.Done(): @@ -432,16 +432,7 @@ tsoBatchLoop: if maxBatchWaitInterval >= 0 { tbc.adjustBestBatchSize() } - // Stop the timer if it's not stopped. - if !streamLoopTimer.Stop() { - select { - case <-streamLoopTimer.C: // try to drain from the channel - default: - } - } - // We need be careful here, see more details in the comments of Timer.Reset. - // https://pkg.go.dev/time@master#Timer.Reset - streamLoopTimer.Reset(c.option.timeout) + timerutil.SafeResetTimer(streamLoopTimer, c.option.timeout) // Choose a stream to send the TSO gRPC request. streamChoosingLoop: for { diff --git a/pkg/election/lease.go b/pkg/election/lease.go index a6b49fb99f8..e781e82ee8f 100644 --- a/pkg/election/lease.go +++ b/pkg/election/lease.go @@ -23,6 +23,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/timerutil" "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" @@ -124,16 +125,7 @@ func (l *lease) KeepAlive(ctx context.Context) { l.expireTime.Store(t) } } - // Stop the timer if it's not stopped. - if !timer.Stop() { - select { - case <-timer.C: // try to drain from the channel - default: - } - } - // We need be careful here, see more details in the comments of Timer.Reset. - // https://pkg.go.dev/time@master#Timer.Reset - timer.Reset(l.leaseTimeout) + timerutil.SafeResetTimer(timer, l.leaseTimeout) case <-timer.C: log.Info("keep alive lease too slow", zap.Duration("timeout-duration", l.leaseTimeout), zap.Time("actual-expire", l.expireTime.Load().(time.Time)), zap.String("purpose", l.Purpose)) return diff --git a/pkg/mock/mockserver/mockserver.go b/pkg/mock/mockserver/mockserver.go new file mode 100644 index 00000000000..d79d79ffa03 --- /dev/null +++ b/pkg/mock/mockserver/mockserver.go @@ -0,0 +1,88 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mockserver + +import ( + "context" + + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/grpcutil" +) + +// MockServer is used to mock Server for test use. +type MockServer struct { + ctx context.Context + member, leader *pdpb.Member + storage storage.Storage + bc *core.BasicCluster +} + +// NewMockServer creates a new MockServer. +func NewMockServer(ctx context.Context, member, leader *pdpb.Member, storage storage.Storage, bc *core.BasicCluster) *MockServer { + return &MockServer{ + ctx: ctx, + member: member, + leader: leader, + storage: storage, + bc: bc, + } +} + +// LoopContext returns the context of the server. +func (s *MockServer) LoopContext() context.Context { + return s.ctx +} + +// ClusterID returns the cluster ID of the server. +func (*MockServer) ClusterID() uint64 { + return 1 +} + +// GetMemberInfo returns the member info of the server. +func (s *MockServer) GetMemberInfo() *pdpb.Member { + return s.member +} + +// GetLeader returns the leader of the server. +func (s *MockServer) GetLeader() *pdpb.Member { + return s.leader +} + +// GetStorage returns the storage of the server. +func (s *MockServer) GetStorage() storage.Storage { + return s.storage +} + +// Name returns the name of the server. +func (*MockServer) Name() string { + return "mock-server" +} + +// GetRegions returns the regions of the server. +func (s *MockServer) GetRegions() []*core.RegionInfo { + return s.bc.GetRegions() +} + +// GetTLSConfig returns the TLS config of the server. +func (*MockServer) GetTLSConfig() *grpcutil.TLSConfig { + return &grpcutil.TLSConfig{} +} + +// GetBasicCluster returns the basic cluster of the server. +func (s *MockServer) GetBasicCluster() *core.BasicCluster { + return s.bc +} diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index ffbd71d2f1e..efe0e22b878 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -41,6 +41,7 @@ const ( keepaliveTime = 10 * time.Second keepaliveTimeout = 3 * time.Second msgSize = 8 * units.MiB + retryInterval = time.Second ) // StopSyncWithLeader stop to sync the region with leader. @@ -150,7 +151,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { } } log.Error("server failed to establish sync stream with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err)) - time.Sleep(time.Second) + select { + case <-ctx.Done(): + log.Info("stop synchronizing with leader due to context canceled") + return + case <-time.After(retryInterval): + } continue } log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex())) @@ -162,7 +168,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { if err = stream.CloseSend(); err != nil { log.Error("failed to terminate client stream", errs.ZapError(errs.ErrGRPCCloseSend, err)) } - time.Sleep(time.Second) + select { + case <-ctx.Done(): + log.Info("stop synchronizing with leader due to context canceled") + return + case <-time.After(retryInterval): + } break } if s.history.GetNextIndex() != resp.GetStartIndex() { diff --git a/pkg/syncer/client_test.go b/pkg/syncer/client_test.go index 84193ebaffe..76b7687687d 100644 --- a/pkg/syncer/client_test.go +++ b/pkg/syncer/client_test.go @@ -21,9 +21,9 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/mock/mockserver" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/grpcutil" "google.golang.org/grpc/codes" @@ -37,11 +37,13 @@ func TestLoadRegion(t *testing.T) { rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil) re.NoError(err) - server := &mockServer{ - ctx: context.Background(), - storage: storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), - bc: core.NewBasicCluster(), - } + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), + core.NewBasicCluster(), + ) for i := 0; i < 30; i++ { rs.SaveRegion(&metapb.Region{Id: uint64(i) + 1}) } @@ -64,11 +66,13 @@ func TestErrorCode(t *testing.T) { tempDir := t.TempDir() rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil) re.NoError(err) - server := &mockServer{ - ctx: context.Background(), - storage: storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), - bc: core.NewBasicCluster(), - } + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), + core.NewBasicCluster(), + ) ctx, cancel := context.WithCancel(context.TODO()) rc := NewRegionSyncer(server) conn, err := grpcutil.GetClientConn(ctx, "http://127.0.0.1", nil) @@ -79,46 +83,3 @@ func TestErrorCode(t *testing.T) { re.True(ok) re.Equal(codes.Canceled, ev.Code()) } - -type mockServer struct { - ctx context.Context - member, leader *pdpb.Member - storage storage.Storage - bc *core.BasicCluster -} - -func (s *mockServer) LoopContext() context.Context { - return s.ctx -} - -func (s *mockServer) ClusterID() uint64 { - return 1 -} - -func (s *mockServer) GetMemberInfo() *pdpb.Member { - return s.member -} - -func (s *mockServer) GetLeader() *pdpb.Member { - return s.leader -} - -func (s *mockServer) GetStorage() storage.Storage { - return s.storage -} - -func (s *mockServer) Name() string { - return "mock-server" -} - -func (s *mockServer) GetRegions() []*core.RegionInfo { - return s.bc.GetRegions() -} - -func (s *mockServer) GetTLSConfig() *grpcutil.TLSConfig { - return &grpcutil.TLSConfig{} -} - -func (s *mockServer) GetBasicCluster() *core.BasicCluster { - return s.bc -} diff --git a/pkg/timerpool/pool.go b/pkg/utils/timerutil/pool.go similarity index 98% rename from pkg/timerpool/pool.go rename to pkg/utils/timerutil/pool.go index 28ffacfc629..2d608b09053 100644 --- a/pkg/timerpool/pool.go +++ b/pkg/utils/timerutil/pool.go @@ -4,7 +4,7 @@ // Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 -package timerpool +package timerutil import ( "sync" diff --git a/pkg/timerpool/pool_test.go b/pkg/utils/timerutil/pool_test.go similarity index 98% rename from pkg/timerpool/pool_test.go rename to pkg/utils/timerutil/pool_test.go index d6dffc723a9..f90a305d99f 100644 --- a/pkg/timerpool/pool_test.go +++ b/pkg/utils/timerutil/pool_test.go @@ -4,7 +4,7 @@ // Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 -package timerpool +package timerutil import ( "testing" diff --git a/pkg/utils/timerutil/util.go b/pkg/utils/timerutil/util.go new file mode 100644 index 00000000000..7e24671a09e --- /dev/null +++ b/pkg/utils/timerutil/util.go @@ -0,0 +1,32 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package timerutil + +import "time" + +// SafeResetTimer is used to reset timer safely. +// Before Go 1.23, the only safe way to use Reset was to call Timer.Stop and explicitly drain the timer first. +// We need be careful here, see more details in the comments of Timer.Reset. +// https://pkg.go.dev/time@master#Timer.Reset +func SafeResetTimer(t *time.Timer, d time.Duration) { + // Stop the timer if it's not stopped. + if !t.Stop() { + select { + case <-t.C: // try to drain from the channel + default: + } + } + t.Reset(d) +} diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index 6d1ee2ace28..b2e453e45e2 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -24,9 +24,9 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/timerpool" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/timerutil" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -209,7 +209,7 @@ func NewTSDeadline( done chan struct{}, cancel context.CancelFunc, ) *TSDeadline { - timer := timerpool.GlobalTimerPool.Get(timeout) + timer := timerutil.GlobalTimerPool.Get(timeout) return &TSDeadline{ timer: timer, done: done, @@ -230,11 +230,11 @@ func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) { log.Error("tso proxy request processing is canceled due to timeout", errs.ZapError(errs.ErrProxyTSOTimeout)) d.cancel() - timerpool.GlobalTimerPool.Put(d.timer) + timerutil.GlobalTimerPool.Put(d.timer) case <-d.done: - timerpool.GlobalTimerPool.Put(d.timer) + timerutil.GlobalTimerPool.Put(d.timer) case <-ctx.Done(): - timerpool.GlobalTimerPool.Put(d.timer) + timerutil.GlobalTimerPool.Put(d.timer) return } case <-ctx.Done(): diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 7fadddf4532..150e6e0e08e 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -228,11 +228,11 @@ func TestLeaderTransferAndMoveCluster(t *testing.T) { oldServers := cluster.GetServers() oldLeaderName := cluster.WaitLeader() for i := 0; i < 3; i++ { + time.Sleep(5 * time.Second) newPD, err := cluster.Join(ctx) re.NoError(err) re.NoError(newPD.Run()) oldLeaderName = cluster.WaitLeader() - time.Sleep(5 * time.Second) } // ABCDEF->DEF