Skip to content

Commit

Permalink
Abstract tsostream
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Jul 23, 2024
1 parent 624b6f3 commit a9b312b
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 74 deletions.
4 changes: 2 additions & 2 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ type tsoConnectionContext struct {
// Current stream to send gRPC requests.
// - `pdpb.PD_TsoClient` for a leader/follower in the PD cluster.
// - `tsopb.TSO_TsoClient` for a primary/secondary in the TSO cluster.
stream tsoStream
stream *tsoStream
}

// updateConnectionCtxs will choose the proper way to update the connections for the given dc-location.
Expand Down Expand Up @@ -382,7 +382,7 @@ func (c *tsoClient) tryConnectToTSO(
var (
networkErrNum uint64
err error
stream tsoStream
stream *tsoStream
url string
cc *grpc.ClientConn
updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) {
Expand Down
4 changes: 2 additions & 2 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) {
streamCtx context.Context
cancel context.CancelFunc
streamURL string
stream tsoStream
stream *tsoStream
)
// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(option.timeout)
Expand Down Expand Up @@ -393,7 +393,7 @@ func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext
}

func (td *tsoDispatcher) processRequests(
stream tsoStream, dcLocation string, tbc *tsoBatchController,
stream *tsoStream, dcLocation string, tbc *tsoBatchController,
) error {
var (
requests = tbc.getCollectedRequests()
Expand Down
262 changes: 192 additions & 70 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,22 @@ func (*tsoTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBui
// TSO Stream Builder

type tsoStreamBuilder interface {
build(context.Context, context.CancelFunc, time.Duration) (tsoStream, error)
build(context.Context, context.CancelFunc, time.Duration) (*tsoStream, error)
}

type pdTSOStreamBuilder struct {
serverURL string
client pdpb.PDClient
}

func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) {
func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (*tsoStream, error) {
done := make(chan struct{})
// TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created.
go checkStreamTimeout(ctx, cancel, done, timeout)
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &pdTSOStream{stream: stream, serverURL: b.serverURL}, nil
return &tsoStream{stream: pdTSOStreamAdapter{stream}, serverURL: b.serverURL}, nil
}
return nil, err
}
Expand All @@ -74,14 +74,14 @@ type tsoTSOStreamBuilder struct {

func (b *tsoTSOStreamBuilder) build(
ctx context.Context, cancel context.CancelFunc, timeout time.Duration,
) (tsoStream, error) {
) (*tsoStream, error) {
done := make(chan struct{})
// TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created.
go checkStreamTimeout(ctx, cancel, done, timeout)
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &tsoTSOStream{stream: stream, serverURL: b.serverURL}, nil
return &tsoStream{stream: tsoTSOStreamAdapter{stream}, serverURL: b.serverURL}, nil
}
return nil, err
}
Expand All @@ -99,86 +99,53 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha
<-done
}

// TSO Stream

type tsoStream interface {
getServerURL() string
// processRequests processes TSO requests in streaming mode to get timestamps
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error)
type tsoRequestResult struct {
physical, logical int64
count uint32
suffixBits uint32
respKeyspaceGroupID uint32
}

type pdTSOStream struct {
serverURL string
stream pdpb.PD_TsoClient
type grpcTSOStreamAdapter interface {
Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
count int64) error
Recv() (tsoRequestResult, error)
}

func (s *pdTSOStream) getServerURL() string {
return s.serverURL
type pdTSOStreamAdapter struct {
stream pdpb.PD_TsoClient
}

func (s *pdTSOStream) processRequests(
clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
func (s pdTSOStreamAdapter) Send(clusterID uint64, _, _ uint32, dcLocation string, count int64) error {
req := &pdpb.TsoRequest{
Header: &pdpb.RequestHeader{
ClusterId: clusterID,
},
Count: uint32(count),
DcLocation: dcLocation,
}
return s.stream.Send(req)
}

if err = s.stream.Send(req); err != nil {
if err == io.EOF {
err = errs.ErrClientTSOStreamClosed
} else {
err = errors.WithStack(err)
}
return
}
tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds())
func (s pdTSOStreamAdapter) Recv() (tsoRequestResult, error) {
resp, err := s.stream.Recv()
duration := time.Since(start).Seconds()
if err != nil {
requestFailedDurationTSO.Observe(duration)
if err == io.EOF {
err = errs.ErrClientTSOStreamClosed
} else {
err = errors.WithStack(err)
}
return
return tsoRequestResult{}, err
}
requestDurationTSO.Observe(duration)
tsoBatchSize.Observe(float64(count))

if resp.GetCount() != uint32(count) {
err = errors.WithStack(errTSOLength)
return
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = defaultKeySpaceGroupID
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
return
return tsoRequestResult{
physical: resp.GetTimestamp().GetPhysical(),
logical: resp.GetTimestamp().GetLogical(),
count: resp.GetCount(),
suffixBits: resp.GetTimestamp().GetSuffixBits(),
respKeyspaceGroupID: defaultKeySpaceGroupID,
}, nil
}

type tsoTSOStream struct {
serverURL string
stream tsopb.TSO_TsoClient
}

func (s *tsoTSOStream) getServerURL() string {
return s.serverURL
type tsoTSOStreamAdapter struct {
stream tsopb.TSO_TsoClient
}

func (s *tsoTSOStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
func (s tsoTSOStreamAdapter) Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64) error {
req := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: clusterID,
Expand All @@ -188,8 +155,54 @@ func (s *tsoTSOStream) processRequests(
Count: uint32(count),
DcLocation: dcLocation,
}
return s.stream.Send(req)
}

func (s tsoTSOStreamAdapter) Recv() (tsoRequestResult, error) {
resp, err := s.stream.Recv()
if err != nil {
return tsoRequestResult{}, err
}
return tsoRequestResult{
physical: resp.GetTimestamp().GetPhysical(),
logical: resp.GetTimestamp().GetLogical(),
count: resp.GetCount(),
suffixBits: resp.GetTimestamp().GetSuffixBits(),
respKeyspaceGroupID: resp.GetHeader().GetKeyspaceGroupId(),
}, nil
}

//// TSO Stream
//
//type tsoStream interface {
// getServerURL() string
// //// processRequests processes TSO requests in streaming mode to get timestamps
// //processRequests(
// // clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
// // count int64, batchStartTime time.Time,
// //) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error)
// processRequestsAsync(
// clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
// count int64, batchStartTime time.Time, resultCh chan<- tsoRequestResult,
// )
//
// Close()
//}

type tsoStream struct {
serverURL string
stream grpcTSOStreamAdapter
}

func (s *tsoStream) getServerURL() string {
return s.serverURL
}

if err = s.stream.Send(req); err != nil {
func (s *tsoStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
if err = s.stream.Send(clusterID, keyspaceID, keyspaceGroupID, dcLocation, count); err != nil {
if err == io.EOF {
err = errs.ErrClientTSOStreamClosed
} else {
Expand All @@ -198,7 +211,7 @@ func (s *tsoTSOStream) processRequests(
return
}
tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds())
resp, err := s.stream.Recv()
res, err := s.stream.Recv()
duration := time.Since(start).Seconds()
if err != nil {
requestFailedDurationTSO.Observe(duration)
Expand All @@ -212,13 +225,122 @@ func (s *tsoTSOStream) processRequests(
requestDurationTSO.Observe(duration)
tsoBatchSize.Observe(float64(count))

if resp.GetCount() != uint32(count) {
if res.count != uint32(count) {
err = errors.WithStack(errTSOLength)
return
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId()
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
respKeyspaceGroupID = res.respKeyspaceGroupID
physical, logical, suffixBits = res.physical, res.logical, res.suffixBits
return
}

//
//type pdTSOStream struct {
// serverURL string
// stream pdpb.PD_TsoClient
//}
//
//func (s *pdTSOStream) getServerURL() string {
// return s.serverURL
//}
//
//func (s *pdTSOStream) processRequests(
// clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time,
//) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
// start := time.Now()
// req := &pdpb.TsoRequest{
// Header: &pdpb.RequestHeader{
// ClusterId: clusterID,
// },
// Count: uint32(count),
// DcLocation: dcLocation,
// }
//
// if err = s.stream.Send(req); err != nil {
// if err == io.EOF {
// err = errs.ErrClientTSOStreamClosed
// } else {
// err = errors.WithStack(err)
// }
// return
// }
// tsoBatchSendLatency.Observe(float64(time.Since(batchStartTime)))
// resp, err := s.stream.Recv()
// if err != nil {
// if err == io.EOF {
// err = errs.ErrClientTSOStreamClosed
// } else {
// err = errors.WithStack(err)
// }
// return
// }
// requestDurationTSO.Observe(time.Since(start).Seconds())
// tsoBatchSize.Observe(float64(count))
//
// if resp.GetCount() != uint32(count) {
// err = errors.WithStack(errTSOLength)
// return
// }
//
// ts := resp.GetTimestamp()
// respKeyspaceGroupID = defaultKeySpaceGroupID
// physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
// return
//}
//
//type tsoTSOStream struct {
// serverURL string
// stream tsopb.TSO_TsoClient
//}
//
//func (s *tsoTSOStream) getServerURL() string {
// return s.serverURL
//}
//
//func (s *tsoTSOStream) processRequests(
// clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
// count int64, batchStartTime time.Time,
//) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
// start := time.Now()
// req := &tsopb.TsoRequest{
// Header: &tsopb.RequestHeader{
// ClusterId: clusterID,
// KeyspaceId: keyspaceID,
// KeyspaceGroupId: keyspaceGroupID,
// },
// Count: uint32(count),
// DcLocation: dcLocation,
// }
//
// if err = s.stream.Send(req); err != nil {
// if err == io.EOF {
// err = errs.ErrClientTSOStreamClosed
// } else {
// err = errors.WithStack(err)
// }
// return
// }
// tsoBatchSendLatency.Observe(float64(time.Since(batchStartTime)))
// resp, err := s.stream.Recv()
// if err != nil {
// if err == io.EOF {
// err = errs.ErrClientTSOStreamClosed
// } else {
// err = errors.WithStack(err)
// }
// return
// }
// requestDurationTSO.Observe(time.Since(start).Seconds())
// tsoBatchSize.Observe(float64(count))
//
// if resp.GetCount() != uint32(count) {
// err = errors.WithStack(errTSOLength)
// return
// }
//
// ts := resp.GetTimestamp()
// respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId()
// physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
// return
//}

0 comments on commit a9b312b

Please sign in to comment.