Skip to content

Commit

Permalink
client: refactor batch client send loop
Browse files Browse the repository at this point in the history
Signed-off-by: zyguan <[email protected]>
  • Loading branch information
zyguan committed Jan 23, 2025
1 parent 61e09c6 commit eed5293
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 27 deletions.
21 changes: 21 additions & 0 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,27 @@ func (c *RPCClient) closeConns() {
c.Unlock()
}

func (c *RPCClient) recycleIdleConnArray() {
start := time.Now()

var addrs []string
var vers []uint64
c.RLock()
for _, conn := range c.conns {
if conn.batchConn != nil && conn.isIdle() {
addrs = append(addrs, conn.target)
vers = append(vers, conn.ver)
}
}
c.RUnlock()

for i, addr := range addrs {
c.CloseAddrVer(addr, vers[i])
}

metrics.TiKVBatchClientRecycle.Observe(time.Since(start).Seconds())
}

func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
tikvrpc.AttachContext(req, req.Context)

Expand Down
32 changes: 5 additions & 27 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,10 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
a.reqBuilder.reset()

headRecvTime, headArrivalInterval := a.fetchAllPendingRequests(int(cfg.MaxBatchSize))
if a.reqBuilder.len() == 0 {
// the conn is closed or recycled.
return
}

// curl -X PUT -d 'return(true)' http://0.0.0.0:10080/fail/tikvclient/mockBlockOnBatchClient
if val, err := util.EvalFailpoint("mockBlockOnBatchClient"); err == nil {
Expand All @@ -558,13 +562,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
}
}
length := a.reqBuilder.len()
avgBatchWaitSize = 0.2*float64(length) + 0.8*avgBatchWaitSize
a.metrics.pendingRequests.Observe(float64(len(a.batchCommandsCh) + length))
if uint(length) == 0 {
// The batch command channel is closed.
return
} else {
avgBatchWaitSize = 0.2*float64(length) + 0.8*avgBatchWaitSize
}
a.metrics.bestBatchSize.Observe(avgBatchWaitSize)
a.metrics.headArrivalInterval.Observe(headArrivalInterval.Seconds())
a.metrics.sendLoopWaitHeadDur.Observe(headRecvTime.Sub(sendLoopStartTime).Seconds())
Expand Down Expand Up @@ -1164,24 +1163,3 @@ func sendBatchRequest(
return nil, errors.WithMessage(context.DeadlineExceeded, reason)
}
}

func (c *RPCClient) recycleIdleConnArray() {
start := time.Now()

var addrs []string
var vers []uint64
c.RLock()
for _, conn := range c.conns {
if conn.batchConn != nil && conn.isIdle() {
addrs = append(addrs, conn.target)
vers = append(vers, conn.ver)
}
}
c.RUnlock()

for i, addr := range addrs {
c.CloseAddrVer(addr, vers[i])
}

metrics.TiKVBatchClientRecycle.Observe(time.Since(start).Seconds())
}

0 comments on commit eed5293

Please sign in to comment.