diff --git a/.golangci.yml b/.golangci.yml index e09011c45be..e5b5fe1ab56 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -226,5 +226,4 @@ issues: linters: - errcheck include: - # remove the comment after the path is ready - # - EXC0012 + - EXC0012 diff --git a/tools/pd-api-bench/cases/cases.go b/tools/pd-api-bench/cases/cases.go index d81e7e0fa9d..f863d3248c6 100644 --- a/tools/pd-api-bench/cases/cases.go +++ b/tools/pd-api-bench/cases/cases.go @@ -80,12 +80,12 @@ func (c *Config) Clone() *Config { // Case is the interface for all cases. type Case interface { - Name() string - SetQPS(int64) - GetQPS() int64 - SetBurst(int64) - GetBurst() int64 - GetConfig() *Config + getName() string + setQPS(int64) + getQPS() int64 + setBurst(int64) + getBurst() int64 + getConfig() *Config } type baseCase struct { @@ -93,35 +93,35 @@ type baseCase struct { cfg *Config } -func (c *baseCase) Name() string { +func (c *baseCase) getName() string { return c.name } -func (c *baseCase) SetQPS(qps int64) { +func (c *baseCase) setQPS(qps int64) { c.cfg.QPS = qps } -func (c *baseCase) GetQPS() int64 { +func (c *baseCase) getQPS() int64 { return c.cfg.QPS } -func (c *baseCase) SetBurst(burst int64) { +func (c *baseCase) setBurst(burst int64) { c.cfg.Burst = burst } -func (c *baseCase) GetBurst() int64 { +func (c *baseCase) getBurst() int64 { return c.cfg.Burst } -func (c *baseCase) GetConfig() *Config { +func (c *baseCase) getConfig() *Config { return c.cfg.Clone() } // EtcdCase is the interface for all etcd api cases. type EtcdCase interface { Case - Init(context.Context, *clientv3.Client) error - Unary(context.Context, *clientv3.Client) error + init(context.Context, *clientv3.Client) error + unary(context.Context, *clientv3.Client) error } // EtcdCreateFn is function type to create EtcdCase. @@ -138,7 +138,7 @@ var EtcdCaseFnMap = map[string]EtcdCreateFn{ // GRPCCase is the interface for all gRPC cases. type GRPCCase interface { Case - Unary(context.Context, pd.Client) error + unary(context.Context, pd.Client) error } // GRPCCreateFn is function type to create GRPCCase. @@ -159,7 +159,7 @@ var GRPCCaseFnMap = map[string]GRPCCreateFn{ // HTTPCase is the interface for all HTTP cases. type HTTPCase interface { Case - Do(context.Context, pdHttp.Client) error + do(context.Context, pdHttp.Client) error } // HTTPCreateFn is function type to create HTTPCase. @@ -186,7 +186,7 @@ func newMinResolvedTS() func() HTTPCase { } } -func (c *minResolvedTS) Do(ctx context.Context, cli pdHttp.Client) error { +func (c *minResolvedTS) do(ctx context.Context, cli pdHttp.Client) error { minResolvedTS, storesMinResolvedTS, err := cli.GetMinResolvedTSByStoresIDs(ctx, storesID) if Debug { log.Info("do HTTP case", zap.String("case", c.name), zap.Uint64("min-resolved-ts", minResolvedTS), zap.Any("store-min-resolved-ts", storesMinResolvedTS), zap.Error(err)) @@ -214,7 +214,7 @@ func newRegionStats() func() HTTPCase { } } -func (c *regionsStats) Do(ctx context.Context, cli pdHttp.Client) error { +func (c *regionsStats) do(ctx context.Context, cli pdHttp.Client) error { upperBound := totalRegion / c.regionSample if upperBound < 1 { upperBound = 1 @@ -248,7 +248,7 @@ func newUpdateGCSafePoint() func() GRPCCase { } } -func (*updateGCSafePoint) Unary(ctx context.Context, cli pd.Client) error { +func (*updateGCSafePoint) unary(ctx context.Context, cli pd.Client) error { s := time.Now().Unix() _, err := cli.UpdateGCSafePoint(ctx, uint64(s)) if err != nil { @@ -272,7 +272,7 @@ func newUpdateServiceGCSafePoint() func() GRPCCase { } } -func (*updateServiceGCSafePoint) Unary(ctx context.Context, cli pd.Client) error { +func (*updateServiceGCSafePoint) unary(ctx context.Context, cli pd.Client) error { s := time.Now().Unix() id := rand.Int63n(100) + 1 _, err := cli.UpdateServiceGCSafePoint(ctx, strconv.FormatInt(id, 10), id, uint64(s)) @@ -297,7 +297,7 @@ func newGetRegion() func() GRPCCase { } } -func (*getRegion) Unary(ctx context.Context, cli pd.Client) error { +func (*getRegion) unary(ctx context.Context, cli pd.Client) error { id := rand.Intn(totalRegion)*4 + 1 _, err := cli.GetRegion(ctx, generateKeyForSimulator(id)) if err != nil { @@ -321,7 +321,7 @@ func newGetRegionEnableFollower() func() GRPCCase { } } -func (*getRegionEnableFollower) Unary(ctx context.Context, cli pd.Client) error { +func (*getRegionEnableFollower) unary(ctx context.Context, cli pd.Client) error { id := rand.Intn(totalRegion)*4 + 1 _, err := cli.GetRegion(ctx, generateKeyForSimulator(id), pd.WithAllowFollowerHandle()) if err != nil { @@ -347,7 +347,7 @@ func newScanRegions() func() GRPCCase { } } -func (c *scanRegions) Unary(ctx context.Context, cli pd.Client) error { +func (c *scanRegions) unary(ctx context.Context, cli pd.Client) error { upperBound := totalRegion / c.regionSample random := rand.Intn(upperBound) startID := c.regionSample*random*4 + 1 @@ -375,7 +375,7 @@ func newTso() func() GRPCCase { } } -func (*tso) Unary(ctx context.Context, cli pd.Client) error { +func (*tso) unary(ctx context.Context, cli pd.Client) error { _, _, err := cli.GetTS(ctx) if err != nil { return err @@ -398,7 +398,7 @@ func newGetStore() func() GRPCCase { } } -func (*getStore) Unary(ctx context.Context, cli pd.Client) error { +func (*getStore) unary(ctx context.Context, cli pd.Client) error { storeIdx := rand.Intn(totalStore) _, err := cli.GetStore(ctx, storesID[storeIdx]) if err != nil { @@ -422,7 +422,7 @@ func newGetStores() func() GRPCCase { } } -func (*getStores) Unary(ctx context.Context, cli pd.Client) error { +func (*getStores) unary(ctx context.Context, cli pd.Client) error { _, err := cli.GetAllStores(ctx) if err != nil { return err @@ -451,7 +451,7 @@ func newGetKV() func() EtcdCase { } } -func (*getKV) Init(ctx context.Context, cli *clientv3.Client) error { +func (*getKV) init(ctx context.Context, cli *clientv3.Client) error { for i := 0; i < 100; i++ { _, err := cli.Put(ctx, fmt.Sprintf("/test/0001/%4d", i), fmt.Sprintf("%4d", i)) if err != nil { @@ -461,7 +461,7 @@ func (*getKV) Init(ctx context.Context, cli *clientv3.Client) error { return nil } -func (*getKV) Unary(ctx context.Context, cli *clientv3.Client) error { +func (*getKV) unary(ctx context.Context, cli *clientv3.Client) error { _, err := cli.Get(ctx, "/test/0001", clientv3.WithPrefix()) return err } @@ -481,9 +481,9 @@ func newPutKV() func() EtcdCase { } } -func (*putKV) Init(context.Context, *clientv3.Client) error { return nil } +func (*putKV) init(context.Context, *clientv3.Client) error { return nil } -func (*putKV) Unary(ctx context.Context, cli *clientv3.Client) error { +func (*putKV) unary(ctx context.Context, cli *clientv3.Client) error { _, err := cli.Put(ctx, "/test/0001/0000", "test") return err } @@ -503,9 +503,9 @@ func newDeleteKV() func() EtcdCase { } } -func (*deleteKV) Init(context.Context, *clientv3.Client) error { return nil } +func (*deleteKV) init(context.Context, *clientv3.Client) error { return nil } -func (*deleteKV) Unary(ctx context.Context, cli *clientv3.Client) error { +func (*deleteKV) unary(ctx context.Context, cli *clientv3.Client) error { _, err := cli.Delete(ctx, "/test/0001/0000") return err } @@ -525,9 +525,9 @@ func newTxnKV() func() EtcdCase { } } -func (*txnKV) Init(context.Context, *clientv3.Client) error { return nil } +func (*txnKV) init(context.Context, *clientv3.Client) error { return nil } -func (*txnKV) Unary(ctx context.Context, cli *clientv3.Client) error { +func (*txnKV) unary(ctx context.Context, cli *clientv3.Client) error { txn := cli.Txn(ctx) txn = txn.If(clientv3.Compare(clientv3.Value("/test/0001/0000"), "=", "test")) txn = txn.Then(clientv3.OpPut("/test/0001/0000", "test2")) diff --git a/tools/pd-api-bench/cases/controller.go b/tools/pd-api-bench/cases/controller.go index e19c79c0f3e..a77474db3a7 100644 --- a/tools/pd-api-bench/cases/controller.go +++ b/tools/pd-api-bench/cases/controller.go @@ -62,7 +62,7 @@ func (c *Coordinator) GetHTTPCase(name string) (*Config, error) { c.mu.RLock() defer c.mu.RUnlock() if controller, ok := c.http[name]; ok { - return controller.GetConfig(), nil + return controller.getConfig(), nil } return nil, errors.Errorf("case %v does not exist", name) } @@ -72,7 +72,7 @@ func (c *Coordinator) GetGRPCCase(name string) (*Config, error) { c.mu.RLock() defer c.mu.RUnlock() if controller, ok := c.grpc[name]; ok { - return controller.GetConfig(), nil + return controller.getConfig(), nil } return nil, errors.Errorf("case %v does not exist", name) } @@ -82,7 +82,7 @@ func (c *Coordinator) GetEtcdCase(name string) (*Config, error) { c.mu.RLock() defer c.mu.RUnlock() if controller, ok := c.etcd[name]; ok { - return controller.GetConfig(), nil + return controller.getConfig(), nil } return nil, errors.Errorf("case %v does not exist", name) } @@ -93,7 +93,7 @@ func (c *Coordinator) GetAllHTTPCases() map[string]*Config { defer c.mu.RUnlock() ret := make(map[string]*Config) for name, c := range c.http { - ret[name] = c.GetConfig() + ret[name] = c.getConfig() } return ret } @@ -104,7 +104,7 @@ func (c *Coordinator) GetAllGRPCCases() map[string]*Config { defer c.mu.RUnlock() ret := make(map[string]*Config) for name, c := range c.grpc { - ret[name] = c.GetConfig() + ret[name] = c.getConfig() } return ret } @@ -115,7 +115,7 @@ func (c *Coordinator) GetAllEtcdCases() map[string]*Config { defer c.mu.RUnlock() ret := make(map[string]*Config) for name, c := range c.etcd { - ret[name] = c.GetConfig() + ret[name] = c.getConfig() } return ret } @@ -131,9 +131,9 @@ func (c *Coordinator) SetHTTPCase(name string, cfg *Config) error { c.http[name] = controller } controller.stop() - controller.SetQPS(cfg.QPS) + controller.setQPS(cfg.QPS) if cfg.Burst > 0 { - controller.SetBurst(cfg.Burst) + controller.setBurst(cfg.Burst) } controller.run() } else { @@ -153,9 +153,9 @@ func (c *Coordinator) SetGRPCCase(name string, cfg *Config) error { c.grpc[name] = controller } controller.stop() - controller.SetQPS(cfg.QPS) + controller.setQPS(cfg.QPS) if cfg.Burst > 0 { - controller.SetBurst(cfg.Burst) + controller.setBurst(cfg.Burst) } controller.run() } else { @@ -175,9 +175,9 @@ func (c *Coordinator) SetEtcdCase(name string, cfg *Config) error { c.etcd[name] = controller } controller.stop() - controller.SetQPS(cfg.QPS) + controller.setQPS(cfg.QPS) if cfg.Burst > 0 { - controller.SetBurst(cfg.Burst) + controller.setBurst(cfg.Burst) } controller.run() } else { @@ -207,15 +207,15 @@ func newHTTPController(ctx context.Context, clis []pdHttp.Client, fn HTTPCreateF // run tries to run the HTTP api bench. func (c *httpController) run() { - if c.GetQPS() <= 0 || c.cancel != nil { + if c.getQPS() <= 0 || c.cancel != nil { return } c.ctx, c.cancel = context.WithCancel(c.pctx) - qps := c.GetQPS() - burst := c.GetBurst() + qps := c.getQPS() + burst := c.getBurst() cliNum := int64(len(c.clients)) tt := time.Duration(base*burst*cliNum/qps) * time.Microsecond - log.Info("begin to run http case", zap.String("case", c.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) + log.Info("begin to run http case", zap.String("case", c.getName()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) for _, hCli := range c.clients { c.wg.Add(1) go func(hCli pdHttp.Client) { @@ -229,9 +229,9 @@ func (c *httpController) run() { for { select { case <-ticker.C: - err := c.Do(c.ctx, hCli) + err := c.do(c.ctx, hCli) if err != nil { - log.Error("meet error when doing HTTP request", zap.String("case", c.Name()), zap.Error(err)) + log.Error("meet error when doing HTTP request", zap.String("case", c.getName()), zap.Error(err)) } case <-c.ctx.Done(): log.Info("got signal to exit running HTTP case") @@ -276,15 +276,15 @@ func newGRPCController(ctx context.Context, clis []pd.Client, fn GRPCCreateFn) * // run tries to run the gRPC api bench. func (c *gRPCController) run() { - if c.GetQPS() <= 0 || c.cancel != nil { + if c.getQPS() <= 0 || c.cancel != nil { return } c.ctx, c.cancel = context.WithCancel(c.pctx) - qps := c.GetQPS() - burst := c.GetBurst() + qps := c.getQPS() + burst := c.getBurst() cliNum := int64(len(c.clients)) tt := time.Duration(base*burst*cliNum/qps) * time.Microsecond - log.Info("begin to run gRPC case", zap.String("case", c.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) + log.Info("begin to run gRPC case", zap.String("case", c.getName()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) for _, cli := range c.clients { c.wg.Add(1) go func(cli pd.Client) { @@ -298,9 +298,9 @@ func (c *gRPCController) run() { for { select { case <-ticker.C: - err := c.Unary(c.ctx, cli) + err := c.unary(c.ctx, cli) if err != nil { - log.Error("meet error when doing gRPC request", zap.String("case", c.Name()), zap.Error(err)) + log.Error("meet error when doing gRPC request", zap.String("case", c.getName()), zap.Error(err)) } case <-c.ctx.Done(): log.Info("got signal to exit running gRPC case") @@ -345,18 +345,18 @@ func newEtcdController(ctx context.Context, clis []*clientv3.Client, fn EtcdCrea // run tries to run the gRPC api bench. func (c *etcdController) run() { - if c.GetQPS() <= 0 || c.cancel != nil { + if c.getQPS() <= 0 || c.cancel != nil { return } c.ctx, c.cancel = context.WithCancel(c.pctx) - qps := c.GetQPS() - burst := c.GetBurst() + qps := c.getQPS() + burst := c.getBurst() cliNum := int64(len(c.clients)) tt := time.Duration(base*burst*cliNum/qps) * time.Microsecond - log.Info("begin to run etcd case", zap.String("case", c.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) - err := c.Init(c.ctx, c.clients[0]) + log.Info("begin to run etcd case", zap.String("case", c.getName()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) + err := c.init(c.ctx, c.clients[0]) if err != nil { - log.Error("init error", zap.String("case", c.Name()), zap.Error(err)) + log.Error("init error", zap.String("case", c.getName()), zap.Error(err)) return } for _, cli := range c.clients { @@ -372,9 +372,9 @@ func (c *etcdController) run() { for { select { case <-ticker.C: - err := c.Unary(c.ctx, cli) + err := c.unary(c.ctx, cli) if err != nil { - log.Error("meet error when doing etcd request", zap.String("case", c.Name()), zap.Error(err)) + log.Error("meet error when doing etcd request", zap.String("case", c.getName()), zap.Error(err)) } case <-c.ctx.Done(): log.Info("got signal to exit running etcd case") diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index c5c633de217..c7e0cfd691d 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -33,6 +33,7 @@ import ( ) const ( + // PDControlCallerID is used to set the caller ID for PD client PDControlCallerID = "pd-ctl" clusterPrefix = "pd/api/v1/cluster" ) diff --git a/tools/pd-heartbeat-bench/metrics/util.go b/tools/pd-heartbeat-bench/metrics/util.go index e409cf10815..f747e5b507f 100644 --- a/tools/pd-heartbeat-bench/metrics/util.go +++ b/tools/pd-heartbeat-bench/metrics/util.go @@ -32,12 +32,12 @@ import ( var ( prometheusCli api.Client - finalMetrics2Collect []Metric + finalMetrics2Collect []metric avgRegionStats report.Stats avgStoreTime float64 collectRound = 1.0 - metrics2Collect = []Metric{ + metrics2Collect = []metric{ {promSQL: cpuMetric, name: "max cpu usage(%)", max: true}, {promSQL: memoryMetric, name: "max memory usage(G)", max: true}, {promSQL: goRoutineMetric, name: "max go routines", max: true}, @@ -69,7 +69,7 @@ var ( } ) -type Metric struct { +type metric struct { promSQL string name string value float64 @@ -77,9 +77,10 @@ type Metric struct { max bool } +// InitMetric2Collect initializes the metrics to collect func InitMetric2Collect(endpoint string) (withMetric bool) { for _, name := range breakdownNames { - metrics2Collect = append(metrics2Collect, Metric{ + metrics2Collect = append(metrics2Collect, metric{ promSQL: hbBreakdownMetricByName(name), name: name, }) @@ -94,7 +95,7 @@ func InitMetric2Collect(endpoint string) (withMetric bool) { log.Error("parse prometheus url error", zap.Error(err)) return false } - prometheusCli, err = NewPrometheusClient(*cu) + prometheusCli, err = newPrometheusClient(*cu) if err != nil { log.Error("create prometheus client error", zap.Error(err)) return false @@ -108,7 +109,7 @@ func InitMetric2Collect(endpoint string) (withMetric bool) { return true } -func NewPrometheusClient(prometheusURL url.URL) (api.Client, error) { +func newPrometheusClient(prometheusURL url.URL) (api.Client, error) { client, err := api.NewClient(api.Config{ Address: prometheusURL.String(), }) @@ -122,6 +123,7 @@ func NewPrometheusClient(prometheusURL url.URL) (api.Client, error) { // WarmUpRound wait for the first round to warm up const WarmUpRound = 1 +// CollectMetrics collects the metrics func CollectMetrics(curRound int, wait time.Duration) { if curRound < WarmUpRound { return @@ -183,7 +185,7 @@ func getMetric(cli api.Client, query string, ts time.Time) ([]float64, error) { return value, nil } -func formatMetrics(ms []Metric) string { +func formatMetrics(ms []metric) string { res := "" for _, m := range ms { res += "[" + m.name + "]" + " " + fmt.Sprintf("%.10f", m.value) + " " @@ -191,6 +193,7 @@ func formatMetrics(ms []Metric) string { return res } +// CollectRegionAndStoreStats collects the region and store stats func CollectRegionAndStoreStats(regionStats *report.Stats, storeTime *float64) { if regionStats != nil && storeTime != nil { collect(*regionStats, *storeTime) @@ -211,6 +214,7 @@ func collect(regionStats report.Stats, storeTime float64) { avgStoreTime = average(avgStoreTime, storeTime) } +// OutputConclusion outputs the final conclusion func OutputConclusion() { logFields := RegionFields(avgRegionStats, zap.Float64("avg store time", avgStoreTime), @@ -219,6 +223,7 @@ func OutputConclusion() { log.Info("final metrics collected", logFields...) } +// RegionFields returns the fields for region stats func RegionFields(stats report.Stats, fields ...zap.Field) []zap.Field { return append([]zap.Field{ zap.String("total", fmt.Sprintf("%.4fs", stats.Total.Seconds())), diff --git a/tools/pd-simulator/simulator/client.go b/tools/pd-simulator/simulator/client.go index 8acf3ccd9ab..224d54b05d6 100644 --- a/tools/pd-simulator/simulator/client.go +++ b/tools/pd-simulator/simulator/client.go @@ -40,13 +40,13 @@ import ( // Client is a PD (Placement Driver) client. // It should not be used after calling Close(). type Client interface { - AllocID(context.Context) (uint64, error) + allocID(context.Context) (uint64, error) PutStore(context.Context, *metapb.Store) error StoreHeartbeat(context.Context, *pdpb.StoreStats) error RegionHeartbeat(context.Context, *core.RegionInfo) error - HeartbeatStreamLoop() - ChangeConn(*grpc.ClientConn) error + heartbeatStreamLoop() + changeConn(*grpc.ClientConn) error Close() } @@ -61,15 +61,17 @@ const ( var ( // errFailInitClusterID is returned when failed to load clusterID from all supplied PD addresses. errFailInitClusterID = errors.New("[pd] failed to get cluster id") - PDHTTPClient pdHttp.Client - SD pd.ServiceDiscovery - ClusterID atomic.Uint64 + // PDHTTPClient is a client for PD HTTP API. + PDHTTPClient pdHttp.Client + // SD is a service discovery for PD. + SD pd.ServiceDiscovery + clusterID atomic.Uint64 ) // requestHeader returns a header for fixed ClusterID. func requestHeader() *pdpb.RequestHeader { return &pdpb.RequestHeader{ - ClusterId: ClusterID.Load(), + ClusterId: clusterID.Load(), } } @@ -110,7 +112,7 @@ func createConn(url string) (*grpc.ClientConn, error) { return cc, nil } -func (c *client) ChangeConn(cc *grpc.ClientConn) error { +func (c *client) changeConn(cc *grpc.ClientConn) error { c.clientConn = cc simutil.Logger.Info("change pd client with endpoints", zap.String("tag", c.tag), zap.String("pd-address", cc.Target())) return nil @@ -143,7 +145,7 @@ func (c *client) createHeartbeatStream() (pdpb.PD_RegionHeartbeatClient, context return stream, ctx, cancel } -func (c *client) HeartbeatStreamLoop() { +func (c *client) heartbeatStreamLoop() { c.wg.Add(1) defer c.wg.Done() for { @@ -173,10 +175,10 @@ func (c *client) HeartbeatStreamLoop() { if client := SD.GetServiceClient(); client != nil { _, conn, err := getLeaderURL(ctx, client.GetClientConn()) if err != nil { - simutil.Logger.Error("[HeartbeatStreamLoop] failed to get leader URL", zap.Error(err)) + simutil.Logger.Error("[heartbeatStreamLoop] failed to get leader URL", zap.Error(err)) continue } - if err = c.ChangeConn(conn); err == nil { + if err = c.changeConn(conn); err == nil { break } } @@ -234,6 +236,7 @@ func (c *client) reportRegionHeartbeat(ctx context.Context, stream pdpb.PD_Regio } } +// Close closes the client. func (c *client) Close() { if c.cancel == nil { simutil.Logger.Info("pd client has been closed", zap.String("tag", c.tag)) @@ -248,7 +251,7 @@ func (c *client) Close() { } } -func (c *client) AllocID(ctx context.Context) (uint64, error) { +func (c *client) allocID(ctx context.Context) (uint64, error) { ctx, cancel := context.WithTimeout(ctx, pdTimeout) resp, err := c.pdClient().AllocID(ctx, &pdpb.AllocIDRequest{ Header: requestHeader(), @@ -263,6 +266,7 @@ func (c *client) AllocID(ctx context.Context) (uint64, error) { return resp.GetId(), nil } +// PutStore sends PutStore to PD. func (c *client) PutStore(ctx context.Context, store *metapb.Store) error { ctx, cancel := context.WithTimeout(ctx, pdTimeout) newStore := typeutil.DeepClone(store, core.StoreFactory) @@ -281,6 +285,7 @@ func (c *client) PutStore(ctx context.Context, store *metapb.Store) error { return nil } +// StoreHeartbeat sends a StoreHeartbeat to PD. func (c *client) StoreHeartbeat(ctx context.Context, newStats *pdpb.StoreStats) error { ctx, cancel := context.WithTimeout(ctx, pdTimeout) resp, err := c.pdClient().StoreHeartbeat(ctx, &pdpb.StoreHeartbeatRequest{ @@ -298,17 +303,18 @@ func (c *client) StoreHeartbeat(ctx context.Context, newStats *pdpb.StoreStats) return nil } +// RegionHeartbeat sends a RegionHeartbeat to PD. func (c *client) RegionHeartbeat(_ context.Context, region *core.RegionInfo) error { c.reportRegionHeartbeatCh <- region return nil } -type RetryClient struct { +type retryClient struct { client Client retryCount int } -func NewRetryClient(node *Node) *RetryClient { +func newRetryClient(node *Node) *retryClient { // Init PD client and putting it into node. tag := fmt.Sprintf("store %d", node.Store.Id) var ( @@ -331,8 +337,8 @@ func NewRetryClient(node *Node) *RetryClient { } node.client = client - // Init RetryClient - retryClient := &RetryClient{ + // Init retryClient + retryClient := &retryClient{ client: client, retryCount: retryTimes, } @@ -342,12 +348,12 @@ func NewRetryClient(node *Node) *RetryClient { }) // start heartbeat stream node.receiveRegionHeartbeatCh = receiveRegionHeartbeatCh - go client.HeartbeatStreamLoop() + go client.heartbeatStreamLoop() return retryClient } -func (rc *RetryClient) requestWithRetry(f func() (any, error)) (any, error) { +func (rc *retryClient) requestWithRetry(f func() (any, error)) (any, error) { // execute the function directly if res, err := f(); err == nil { return res, nil @@ -362,7 +368,7 @@ func (rc *RetryClient) requestWithRetry(f func() (any, error)) (any, error) { simutil.Logger.Error("[retry] failed to get leader URL", zap.Error(err)) return nil, err } - if err = rc.client.ChangeConn(conn); err != nil { + if err = rc.client.changeConn(conn); err != nil { simutil.Logger.Error("failed to change connection", zap.Error(err)) return nil, err } @@ -381,8 +387,8 @@ func getLeaderURL(ctx context.Context, conn *grpc.ClientConn) (string, *grpc.Cli if members.GetHeader().GetError() != nil { return "", nil, errors.New(members.GetHeader().GetError().String()) } - ClusterID.Store(members.GetHeader().GetClusterId()) - if ClusterID.Load() == 0 { + clusterID.Store(members.GetHeader().GetClusterId()) + if clusterID.Load() == 0 { return "", nil, errors.New("cluster id is 0") } if members.GetLeader() == nil { @@ -393,9 +399,9 @@ func getLeaderURL(ctx context.Context, conn *grpc.ClientConn) (string, *grpc.Cli return leaderURL, conn, err } -func (rc *RetryClient) AllocID(ctx context.Context) (uint64, error) { +func (rc *retryClient) allocID(ctx context.Context) (uint64, error) { res, err := rc.requestWithRetry(func() (any, error) { - id, err := rc.client.AllocID(ctx) + id, err := rc.client.allocID(ctx) return id, err }) if err != nil { @@ -404,7 +410,8 @@ func (rc *RetryClient) AllocID(ctx context.Context) (uint64, error) { return res.(uint64), nil } -func (rc *RetryClient) PutStore(ctx context.Context, store *metapb.Store) error { +// PutStore sends PutStore to PD. +func (rc *retryClient) PutStore(ctx context.Context, store *metapb.Store) error { _, err := rc.requestWithRetry(func() (any, error) { err := rc.client.PutStore(ctx, store) return nil, err @@ -412,7 +419,8 @@ func (rc *RetryClient) PutStore(ctx context.Context, store *metapb.Store) error return err } -func (rc *RetryClient) StoreHeartbeat(ctx context.Context, newStats *pdpb.StoreStats) error { +// StoreHeartbeat sends a StoreHeartbeat to PD. +func (rc *retryClient) StoreHeartbeat(ctx context.Context, newStats *pdpb.StoreStats) error { _, err := rc.requestWithRetry(func() (any, error) { err := rc.client.StoreHeartbeat(ctx, newStats) return nil, err @@ -420,7 +428,8 @@ func (rc *RetryClient) StoreHeartbeat(ctx context.Context, newStats *pdpb.StoreS return err } -func (rc *RetryClient) RegionHeartbeat(ctx context.Context, region *core.RegionInfo) error { +// RegionHeartbeat sends a RegionHeartbeat to PD. +func (rc *retryClient) RegionHeartbeat(ctx context.Context, region *core.RegionInfo) error { _, err := rc.requestWithRetry(func() (any, error) { err := rc.client.RegionHeartbeat(ctx, region) return nil, err @@ -428,15 +437,16 @@ func (rc *RetryClient) RegionHeartbeat(ctx context.Context, region *core.RegionI return err } -func (*RetryClient) ChangeConn(_ *grpc.ClientConn) error { +func (*retryClient) changeConn(_ *grpc.ClientConn) error { panic("unImplement") } -func (rc *RetryClient) HeartbeatStreamLoop() { - rc.client.HeartbeatStreamLoop() +func (rc *retryClient) heartbeatStreamLoop() { + rc.client.heartbeatStreamLoop() } -func (rc *RetryClient) Close() { +// Close closes the client. +func (rc *retryClient) Close() { rc.client.Close() } @@ -465,10 +475,10 @@ retry: break retry } } - if ClusterID.Load() == 0 { + if clusterID.Load() == 0 { return "", nil, errors.WithStack(errFailInitClusterID) } - simutil.Logger.Info("get cluster id successfully", zap.Uint64("cluster-id", ClusterID.Load())) + simutil.Logger.Info("get cluster id successfully", zap.Uint64("cluster-id", clusterID.Load())) // Check if the cluster is already bootstrapped. ctx, cancel := context.WithTimeout(ctx, pdTimeout) @@ -513,7 +523,6 @@ retry: } /* PDHTTPClient is a client for PD HTTP API, these are the functions that are used in the simulator */ - func PutPDConfig(config *sc.PDConfig) error { if len(config.PlacementRules) > 0 { ruleOps := make([]*pdHttp.RuleOp, 0) @@ -541,8 +550,9 @@ func PutPDConfig(config *sc.PDConfig) error { return nil } +// ChooseToHaltPDSchedule is used to choose whether to halt the PD schedule func ChooseToHaltPDSchedule(halt bool) { - HaltSchedule.Store(halt) + haltSchedule.Store(halt) PDHTTPClient.SetConfig(context.Background(), map[string]any{ "schedule.halt-scheduling": strconv.FormatBool(halt), }) diff --git a/tools/pd-simulator/simulator/config/config.go b/tools/pd-simulator/simulator/config/config.go index 4d182a2a03f..32c483ceba7 100644 --- a/tools/pd-simulator/simulator/config/config.go +++ b/tools/pd-simulator/simulator/config/config.go @@ -130,6 +130,8 @@ func (sc *SimConfig) Adjust(meta *toml.MetaData) error { return sc.ServerConfig.Adjust(meta, false) } + +// Speed returns the tick speed of the simulator. func (sc *SimConfig) Speed() uint64 { return uint64(time.Second / sc.SimTickInterval.Duration) } diff --git a/tools/pd-simulator/simulator/drive.go b/tools/pd-simulator/simulator/drive.go index e602ba2df65..947d0664755 100644 --- a/tools/pd-simulator/simulator/drive.go +++ b/tools/pd-simulator/simulator/drive.go @@ -135,7 +135,7 @@ func (d *Driver) allocID() error { return err } ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - rootPath := path.Join("/pd", strconv.FormatUint(ClusterID.Load(), 10)) + rootPath := path.Join("/pd", strconv.FormatUint(clusterID.Load(), 10)) allocIDPath := path.Join(rootPath, "alloc_id") _, err = etcdClient.Put(ctx, allocIDPath, string(typeutil.Uint64ToBytes(maxID+1000))) if err != nil { @@ -171,7 +171,7 @@ func (d *Driver) updateNodesClient() error { PDHTTPClient = pdHttp.NewClientWithServiceDiscovery("pd-simulator", SD) for _, node := range d.conn.Nodes { - node.client = NewRetryClient(node) + node.client = newRetryClient(node) } return nil } @@ -191,6 +191,7 @@ func (d *Driver) Tick() { }() } +// StepRegions steps regions. func (d *Driver) StepRegions(ctx context.Context) { for { select { @@ -209,6 +210,7 @@ func (d *Driver) StepRegions(ctx context.Context) { } } +// StoresHeartbeat sends heartbeat to all stores. func (d *Driver) StoresHeartbeat(ctx context.Context) { config := d.raftEngine.storeConfig storeInterval := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration) @@ -229,6 +231,7 @@ func (d *Driver) StoresHeartbeat(ctx context.Context) { } } +// RegionsHeartbeat sends heartbeat to all regions. func (d *Driver) RegionsHeartbeat(ctx context.Context) { // ensure only wait for the first time heartbeat done firstReport := true @@ -281,9 +284,9 @@ func (d *Driver) RegionsHeartbeat(ctx context.Context) { } } - // Only set HaltSchedule to false when the leader count is 80% of the total region count. + // Only set haltSchedule to false when the leader count is 80% of the total region count. // using firstReport to avoid the haltSchedule set to true manually. - if HaltSchedule.Load() && firstReport { + if haltSchedule.Load() && firstReport { storeInterval := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration) ticker := time.NewTicker(time.Duration(storeInterval)) for range ticker.C { @@ -294,7 +297,7 @@ func (d *Driver) RegionsHeartbeat(ctx context.Context) { leaderCount += store.Status.LeaderCount } // Add halt schedule check to avoid the situation that the leader count is always less than 80%. - if leaderCount > int64(float64(d.simConfig.TotalRegion)*0.8) || !HaltSchedule.Load() { + if leaderCount > int64(float64(d.simConfig.TotalRegion)*0.8) || !haltSchedule.Load() { ChooseToHaltPDSchedule(false) firstReport = false ticker.Stop() @@ -310,11 +313,11 @@ func (d *Driver) RegionsHeartbeat(ctx context.Context) { } } -var HaltSchedule atomic.Bool +var haltSchedule atomic.Bool // Check checks if the simulation is completed. func (d *Driver) Check() bool { - if !HaltSchedule.Load() { + if !haltSchedule.Load() { return false } var stats []info.StoreStats diff --git a/tools/pd-simulator/simulator/event.go b/tools/pd-simulator/simulator/event.go index 86da86ed20d..d22f35756ef 100644 --- a/tools/pd-simulator/simulator/event.go +++ b/tools/pd-simulator/simulator/event.go @@ -177,7 +177,7 @@ type AddNode struct{} func (*AddNode) Run(raft *RaftEngine, _ int64) bool { config := raft.storeConfig nodes := raft.conn.getNodes() - id, err := nodes[0].client.AllocID(context.TODO()) + id, err := nodes[0].client.allocID(context.TODO()) if err != nil { simutil.Logger.Error("alloc node id failed", zap.Error(err)) return false @@ -196,7 +196,7 @@ func (*AddNode) Run(raft *RaftEngine, _ int64) bool { raft.conn.Nodes[s.ID] = n n.raftEngine = raft - n.client = NewRetryClient(n) + n.client = newRetryClient(n) err = n.Start() if err != nil { diff --git a/tools/pd-simulator/simulator/raft.go b/tools/pd-simulator/simulator/raft.go index 9a219d32f9f..7f3bf78622f 100644 --- a/tools/pd-simulator/simulator/raft.go +++ b/tools/pd-simulator/simulator/raft.go @@ -294,6 +294,6 @@ func (r *RaftEngine) allocID(storeID uint64) (uint64, error) { if !ok { return 0, errors.Errorf("node %d not found", storeID) } - id, err := node.client.AllocID(context.Background()) + id, err := node.client.allocID(context.Background()) return id, errors.WithStack(err) } diff --git a/tools/pd-tso-bench/main.go b/tools/pd-tso-bench/main.go index 1e59dfd2a2a..96ff2a51d0a 100644 --- a/tools/pd-tso-bench/main.go +++ b/tools/pd-tso-bench/main.go @@ -176,7 +176,7 @@ func showStats(ctx context.Context, durCh chan time.Duration) { case <-ticker.C: // runtime.GC() if *verbose { - fmt.Println(s.Counter()) + fmt.Println(s.counter()) } total.merge(s) s = newStats() @@ -184,8 +184,8 @@ func showStats(ctx context.Context, durCh chan time.Duration) { s.update(d) case <-statCtx.Done(): fmt.Println("\nTotal:") - fmt.Println(total.Counter()) - fmt.Println(total.Percentage()) + fmt.Println(total.counter()) + fmt.Println(total.percentage()) // Calculate the percentiles by using the tDigest algorithm. fmt.Printf("P0.5: %.4fms, P0.8: %.4fms, P0.9: %.4fms, P0.99: %.4fms\n\n", latencyTDigest.Quantile(0.5), latencyTDigest.Quantile(0.8), latencyTDigest.Quantile(0.9), latencyTDigest.Quantile(0.99)) if *verbose { @@ -329,7 +329,7 @@ func (s *stats) merge(other *stats) { s.oneThousandCnt += other.oneThousandCnt } -func (s *stats) Counter() string { +func (s *stats) counter() string { return fmt.Sprintf( "count: %d, max: %.4fms, min: %.4fms, avg: %.4fms\n<1ms: %d, >1ms: %d, >2ms: %d, >5ms: %d, >10ms: %d, >30ms: %d, >50ms: %d, >100ms: %d, >200ms: %d, >400ms: %d, >800ms: %d, >1s: %d", s.count, float64(s.maxDur.Nanoseconds())/float64(time.Millisecond), float64(s.minDur.Nanoseconds())/float64(time.Millisecond), float64(s.totalDur.Nanoseconds())/float64(s.count)/float64(time.Millisecond), @@ -337,7 +337,7 @@ func (s *stats) Counter() string { s.eightHundredCnt, s.oneThousandCnt) } -func (s *stats) Percentage() string { +func (s *stats) percentage() string { return fmt.Sprintf( "count: %d, <1ms: %2.2f%%, >1ms: %2.2f%%, >2ms: %2.2f%%, >5ms: %2.2f%%, >10ms: %2.2f%%, >30ms: %2.2f%%, >50ms: %2.2f%%, >100ms: %2.2f%%, >200ms: %2.2f%%, >400ms: %2.2f%%, >800ms: %2.2f%%, >1s: %2.2f%%", s.count, s.calculate(s.submilliCnt), s.calculate(s.milliCnt), s.calculate(s.twoMilliCnt), s.calculate(s.fiveMilliCnt), s.calculate(s.tenMSCnt), s.calculate(s.thirtyCnt), s.calculate(s.fiftyCnt), diff --git a/tools/pd-ut/alloc/server.go b/tools/pd-ut/alloc/server.go index a5643001142..ffa3bce0aa5 100644 --- a/tools/pd-ut/alloc/server.go +++ b/tools/pd-ut/alloc/server.go @@ -30,6 +30,7 @@ import ( var statusAddress = flag.String("status-addr", "0.0.0.0:0", "status address") +// RunHTTPServer runs a HTTP server to provide alloc address. func RunHTTPServer() *http.Server { err := os.Setenv(tempurl.AllocURLFromUT, fmt.Sprintf("http://%s/alloc", *statusAddress)) if err != nil {