Skip to content

Commit

Permalink
improve api bench
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <[email protected]>
  • Loading branch information
CabinfeverB committed Nov 10, 2023
1 parent 2c07c24 commit dbf8a35
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 14 deletions.
3 changes: 3 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ type Client interface {
// SetExternalTimestamp sets external timestamp
SetExternalTimestamp(ctx context.Context, timestamp uint64) error

// GetServiceDiscovery returns ServiceDiscovery
GetServiceDiscovery() ServiceDiscovery

// TSOClient is the TSO client.
TSOClient
// MetaStorageClient is the meta storage client.
Expand Down
52 changes: 48 additions & 4 deletions tools/pd-api-bench/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"net/http"
"net/url"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pkg/errors"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand All @@ -33,6 +35,8 @@ var (
PDAddress string
// Debug is the flag to print the output of api response for debug.
Debug bool
// ClusterID is the ID of cluster.
ClusterID uint64
)

var (
Expand All @@ -43,6 +47,7 @@ var (

// InitCluster initializes the cluster.
func InitCluster(ctx context.Context, cli pd.Client, httpClit *http.Client) error {
ClusterID = cli.GetClusterID(ctx)
req, _ := http.NewRequestWithContext(ctx, http.MethodGet,
PDAddress+"/pd/api/v1/stats/region?start_key=&end_key=&count", nil)
resp, err := httpClit.Do(req)
Expand Down Expand Up @@ -113,10 +118,11 @@ type GRPCCase interface {

// GRPCCaseMap is the map for all gRPC cases.
var GRPCCaseMap = map[string]GRPCCase{
"GetRegion": newGetRegion(),
"GetStore": newGetStore(),
"GetStores": newGetStores(),
"ScanRegions": newScanRegions(),
"StoreHeartbeat": newStoreHeartbeat(),
"GetRegion": newGetRegion(),
"GetStore": newGetStore(),
"GetStores": newGetStores(),
"ScanRegions": newScanRegions(),
}

// HTTPCase is the interface for all HTTP cases.
Expand Down Expand Up @@ -254,6 +260,44 @@ func (c *getRegion) Unary(ctx context.Context, cli pd.Client) error {
return nil
}

type storeHeartbeat struct {
*baseCase
}

func newStoreHeartbeat() *storeHeartbeat {
return &storeHeartbeat{
baseCase: &baseCase{
name: "StoreHeartbeat",
qps: 10000,
burst: 1,
},
}
}

func (c *storeHeartbeat) Unary(ctx context.Context, cli pd.Client) error {
sd := cli.GetServiceDiscovery()
conn := sd.GetServingEndpointClientConn()
client := pdpb.NewPDClient(conn)
req := &pdpb.StoreHeartbeatRequest{
Header: &pdpb.RequestHeader{
ClusterId: ClusterID,
},
Stats: &pdpb.StoreStats{
StoreId: 1,
},
}
resp, err := client.StoreHeartbeat(ctx, req)
if err != nil {
return err
}
if resp != nil {
if resp.Header.Error != nil {
return errors.Errorf(resp.Header.Error.Message)
}
}
return nil
}

type scanRegions struct {
*baseCase
regionSample int
Expand Down
3 changes: 2 additions & 1 deletion tools/pd-api-bench/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/tools/pd-api-bench
go 1.21

require (
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c
github.com/pkg/errors v0.9.1
github.com/tikv/pd v0.0.0-00010101000000-000000000000
github.com/tikv/pd/client v0.0.0-00010101000000-000000000000
go.uber.org/zap v1.24.0
Expand Down Expand Up @@ -69,7 +71,6 @@ require (
github.com/pingcap/errcode v0.3.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.11.1 // indirect
Expand Down
122 changes: 113 additions & 9 deletions tools/pd-api-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand All @@ -48,6 +49,8 @@ var (
qps = flag.Int64("qps", 1000, "qps")
burst = flag.Int64("burst", 1, "burst")

wait = flag.Bool("wait", true, "wait for a round")

// http params
httpParams = flag.String("params", "", "http params")

Expand Down Expand Up @@ -141,11 +144,52 @@ func main() {
}
gcaseStr := strings.Split(*gRPCCases, ",")
for _, str := range gcaseStr {
if len(str) == 0 {
caseQPS := int64(0)
caseBurst := int64(0)
cStr := ""

strs := strings.Split(str, "-")
fmt.Println(strs)
// to get case name
strsa := strings.Split(strs[0], "+")
cStr = strsa[0]
// to get case Burst
if len(strsa) > 1 {
caseBurst, err = strconv.ParseInt(strsa[1], 10, 64)
if err != nil {
log.Printf("parse burst failed for case %s", str)
}
}
// to get case qps
if len(strs) > 1 {
strsb := strings.Split(strs[1], "+")
caseQPS, err = strconv.ParseInt(strsb[0], 10, 64)
if err != nil {
log.Printf("parse qps failed for case %s", str)
}
// to get case Burst
if len(strsb) > 1 {
caseBurst, err = strconv.ParseInt(strsb[1], 10, 64)
if err != nil {
log.Printf("parse burst failed for case %s", str)
}
}
}
if len(cStr) == 0 {
continue
}
if cas, ok := cases.GRPCCaseMap[str]; ok {
if cas, ok := cases.GRPCCaseMap[cStr]; ok {
gcases = append(gcases, cas)
if caseBurst > 0 {
cas.SetBurst(caseBurst)
} else if *burst > 0 {
cas.SetBurst(*burst)
}
if caseQPS > 0 {
cas.SetQPS(caseQPS)
} else if *qps > 0 {
cas.SetQPS(*qps)
}
} else {
log.Println("no this case", str)
}
Expand Down Expand Up @@ -188,6 +232,9 @@ func main() {
}

func handleGRPCCase(ctx context.Context, gcase cases.GRPCCase, clients []pd.Client) {
startCnt := 0
endCnt := 0
var cntMu sync.Mutex
qps := gcase.GetQPS()
burst := gcase.GetBurst()
tt := time.Duration(base/qps*burst*int64(*client)) * time.Microsecond
Expand All @@ -200,9 +247,36 @@ func handleGRPCCase(ctx context.Context, gcase cases.GRPCCase, clients []pd.Clie
select {
case <-ticker.C:
for i := int64(0); i < burst; i++ {
err := gcase.Unary(ctx, cli)
if err != nil {
log.Println(err)
cntMu.Lock()
startCnt++
if startCnt%1000 == 0 {
log.Printf("case grpc %s has sent query %d", gcase.Name(), startCnt)
}
cntMu.Unlock()
if *wait {
err := gcase.Unary(ctx, cli)
if err != nil {
log.Println(err)
}
cntMu.Lock()
endCnt++
if endCnt%1000 == 0 {
log.Printf("case grpc %s has finished query %d", gcase.Name(), endCnt)
}
cntMu.Unlock()
} else {
go func() {
err := gcase.Unary(ctx, cli)
if err != nil {
log.Println(err)
}
cntMu.Lock()
endCnt++
if endCnt%1000 == 0 {
log.Printf("case grpc %s has finished query %d", gcase.Name(), endCnt)
}
cntMu.Unlock()
}()
}
}
case <-ctx.Done():
Expand All @@ -215,6 +289,9 @@ func handleGRPCCase(ctx context.Context, gcase cases.GRPCCase, clients []pd.Clie
}

func handleHTTPCase(ctx context.Context, hcase cases.HTTPCase, httpClis []*http.Client) {
startCnt := 0
endCnt := 0
var cntMu sync.Mutex
qps := hcase.GetQPS()
burst := hcase.GetBurst()
tt := time.Duration(base/qps*burst*int64(*client)) * time.Microsecond
Expand All @@ -230,9 +307,36 @@ func handleHTTPCase(ctx context.Context, hcase cases.HTTPCase, httpClis []*http.
select {
case <-ticker.C:
for i := int64(0); i < burst; i++ {
err := hcase.Do(ctx, hCli)
if err != nil {
log.Println(err)
cntMu.Lock()
startCnt++
if startCnt%1000 == 0 {
log.Printf("case http %s has done query %d", hcase.Name(), startCnt)
}
cntMu.Unlock()
if *wait {
err := hcase.Do(ctx, hCli)
if err != nil {
log.Println(err)
}
cntMu.Lock()
endCnt++
if endCnt%1000 == 0 {
log.Printf("case http %s has finished query %d", hcase.Name(), endCnt)
}
cntMu.Unlock()
} else {
go func() {
err := hcase.Do(ctx, hCli)
if err != nil {
log.Println(err)
}
cntMu.Lock()
endCnt++
if endCnt%1000 == 0 {
log.Printf("case http %s has finished query %d", hcase.Name(), endCnt)
}
cntMu.Unlock()
}()
}
}
case <-ctx.Done():
Expand All @@ -252,7 +356,7 @@ func exit(code int) {
func newHTTPClient() *http.Client {
// defaultTimeout for non-context requests.
const defaultTimeout = 30 * time.Second
cli := &http.Client{Timeout: defaultTimeout}
cli := &http.Client{Timeout: defaultTimeout, Transport: http.DefaultTransport.(*http.Transport).Clone()}
tlsConf := loadTLSConfig()
if tlsConf != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
Expand Down

0 comments on commit dbf8a35

Please sign in to comment.