Skip to content

Commit

Permalink
Add tso delay inject
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Oct 15, 2024
1 parent 982fa22 commit 12653e1
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
2 changes: 2 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ type Config struct {
// be automatically clamped to the range.
TSOUpdatePhysicalInterval typeutil.Duration `toml:"tso-update-physical-interval" json:"tso-update-physical-interval"`

TSODelayInjection typeutil.Duration `toml:"tso-delay-injection" json:"tso-delay-injection"`

// EnableLocalTSO is used to enable the Local TSO Allocator feature,
// which allows the PD server to generate Local TSO for certain DC-level transactions.
// To make this feature meaningful, user has to set the "zone" label for the PD server
Expand Down
51 changes: 49 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,39 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
)
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()

type respAndTime struct {
startTime time.Time
resp *pdpb.TsoResponse
}

delayInjection := s.GetConfig().TSODelayInjection.Duration

respCh := make(chan respAndTime, 1000)
respErrCh := make(chan error, 2)

if delayInjection > 0 {
defer cancel()
go func() {
for {
select {
case <-ctx.Done():
return
case r := <-respCh:
now := time.Now()
deadline := r.startTime.Add(delayInjection)
if deadline.After(now) {
time.Sleep(deadline.Sub(now))
}
if err := stream.Send(r.resp); err != nil {
respErrCh <- errors.WithStack(err)
return
}
}
}
}()
}

for {
// Prevent unnecessary performance overhead of the channel.
if errCh != nil {
Expand All @@ -543,6 +576,13 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
default:
}
}
if delayInjection > 0 {
select {
case err := <-respErrCh:
return err
default:
}
}
request, err := stream.Recv()
if err == io.EOF {
return nil
Expand Down Expand Up @@ -591,8 +631,15 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
Timestamp: &ts,
Count: count,
}
if err := stream.Send(response); err != nil {
return errors.WithStack(err)
if delayInjection > 0 {
respCh <- respAndTime{
startTime: start,
resp: response,
}
} else {
if err := stream.Send(response); err != nil {
return errors.WithStack(err)
}
}
}
}
Expand Down

0 comments on commit 12653e1

Please sign in to comment.