diff --git a/server/config/config.go b/server/config/config.go index d8bd086225c..62861a2633a 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -96,6 +96,8 @@ type Config struct { // be automatically clamped to the range. TSOUpdatePhysicalInterval typeutil.Duration `toml:"tso-update-physical-interval" json:"tso-update-physical-interval"` + TSODelayInjection typeutil.Duration `toml:"tso-delay-injection" json:"tso-delay-injection"` + // EnableLocalTSO is used to enable the Local TSO Allocator feature, // which allows the PD server to generate Local TSO for certain DC-level transactions. // To make this feature meaningful, user has to set the "zone" label for the PD server diff --git a/server/grpc_service.go b/server/grpc_service.go index 9e892dda161..25c048407f9 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -534,6 +534,39 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { ) ctx, cancel := context.WithCancel(stream.Context()) defer cancel() + + type respAndTime struct { + startTime time.Time + resp *pdpb.TsoResponse + } + + delayInjection := s.GetConfig().TSODelayInjection.Duration + + respCh := make(chan respAndTime, 1000) + respErrCh := make(chan error, 2) + + if delayInjection > 0 { + defer cancel() + go func() { + for { + select { + case <-ctx.Done(): + return + case r := <-respCh: + now := time.Now() + deadline := r.startTime.Add(delayInjection) + if deadline.After(now) { + time.Sleep(deadline.Sub(now)) + } + if err := stream.Send(r.resp); err != nil { + respErrCh <- errors.WithStack(err) + return + } + } + } + }() + } + for { // Prevent unnecessary performance overhead of the channel. if errCh != nil { @@ -543,6 +576,13 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { default: } } + if delayInjection > 0 { + select { + case err := <-respErrCh: + return err + default: + } + } request, err := stream.Recv() if err == io.EOF { return nil @@ -591,8 +631,15 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { Timestamp: &ts, Count: count, } - if err := stream.Send(response); err != nil { - return errors.WithStack(err) + if delayInjection > 0 { + respCh <- respAndTime{ + startTime: start, + resp: response, + } + } else { + if err := stream.Send(response); err != nil { + return errors.WithStack(err) + } } } }