Skip to content

Commit

Permalink
added some metrics
Browse files Browse the repository at this point in the history
Signed-off-by: hillium <[email protected]>
  • Loading branch information
YuJuncen committed Dec 24, 2024
1 parent 5b262a4 commit 9f64153
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 24 deletions.
24 changes: 24 additions & 0 deletions br/pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/metric"
"github.com/pingcap/tidb/pkg/util/redact"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -356,3 +358,25 @@ func (b HexBytes) String() string {
func (b HexBytes) MarshalJSON() ([]byte, error) {
return json.Marshal(hex.EncodeToString(b))
}

func MarshalHistogram(m prometheus.Histogram) zapcore.ObjectMarshaler {
return zapcore.ObjectMarshalerFunc(func(mal zapcore.ObjectEncoder) error {
if m == nil {
return nil
}

met := metric.ReadHistogram(m)
if met == nil || met.Histogram == nil {
return nil
}

hist := met.Histogram
for _, b := range hist.GetBucket() {
key := fmt.Sprintf("lt_%f", b.GetUpperBound())
mal.AddUint64(key, b.GetCumulativeCount())
}
mal.AddUint64("count", hist.GetSampleCount())
mal.AddFloat64("total", hist.GetSampleSum())
return nil
})
}
5 changes: 3 additions & 2 deletions br/pkg/restore/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ..
m.ectx = opentracing.ContextWithSpan(m.ectx, span1)
}

for _, batchFileSet := range batchFileSets {
for i, batchFileSet := range batchFileSets {
if m.ectx.Err() != nil {
log.Warn("Restoring encountered error and already stopped, give up remained files.",
logutil.ShortError(m.ectx.Err()))
Expand All @@ -295,7 +295,8 @@ func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ..
onProgress(int64(len(filesReplica)))
}
}()
if importErr := m.fileImporter.Import(m.ectx, filesReplica...); importErr != nil {
cx := logutil.ContextWithField(m.ectx, zap.Int("sn", i))
if importErr := m.fileImporter.Import(cx, filesReplica...); importErr != nil {
return errors.Trace(importErr)
}

Expand Down
43 changes: 29 additions & 14 deletions br/pkg/restore/snap_client/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/util/codec"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
Expand Down Expand Up @@ -142,7 +143,7 @@ type SnapFileImporter struct {
ingestTokensMap *storeTokenChannelMap

closeCallbacks []func(*SnapFileImporter) error
beforeIngestCallbacks []func(context.Context, restore.BatchBackupFileSet) error
beforeIngestCallbacks []func(context.Context, restore.BatchBackupFileSet) (afterIngest func() error, err error)

concurrencyPerStore uint

Expand Down Expand Up @@ -373,12 +374,18 @@ func (importer *SnapFileImporter) Import(
ctx context.Context,
backupFileSets ...restore.BackupFileSet,
) error {
delayCbs := []func() error{}
for i, cb := range importer.beforeIngestCallbacks {
if err := cb(ctx, backupFileSets); err != nil {
d, err := cb(ctx, backupFileSets)
if err != nil {
return errors.Annotatef(err, "failed to executing the callback #%d", i)
}
if d != nil {
delayCbs = append(delayCbs, d)
}
}

importBegin := time.Now()
// Rewrite the start key and end key of file to scan regions
startKey, endKey, err := importer.getKeyRangeForFiles(backupFileSets)
if err != nil {
Expand All @@ -393,41 +400,49 @@ func (importer *SnapFileImporter) Import(
return errors.Trace(errScanRegion)
}

log.Debug("scan regions", logutil.Key("start key", startKey), logutil.Key("end key", endKey), zap.Int("count", len(regionInfos)))
logutil.CL(ctx).Debug("scan regions", logutil.Key("start key", startKey), logutil.Key("end key", endKey), zap.Int("count", len(regionInfos)))
start := time.Now()
// Try to download and ingest the file in every region
for _, regionInfo := range regionInfos {
info := regionInfo
// Try to download file.
downloadMetas, errDownload := importer.download(ctx, info, backupFileSets, importer.cipher, importer.apiVersion)
if errDownload != nil {
log.Warn("download file failed, retry later",
logutil.CL(ctx).Warn("download file failed, retry later",
logutil.Region(info.Region),
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey),
logutil.ShortError(errDownload))
return errors.Trace(errDownload)
}
log.Debug("download file done", zap.Stringer("take", time.Since(start)),
logutil.CL(ctx).Debug("download file done", zap.Stringer("take", time.Since(start)),
logutil.Key("start", startKey), logutil.Key("end", endKey))
start = time.Now()
if errIngest := importer.ingest(ctx, info, downloadMetas); errIngest != nil {
log.Warn("ingest file failed, retry later",
logutil.CL(ctx).Warn("ingest file failed, retry later",
logutil.Key("start", startKey),
logutil.Key("end", endKey),
logutil.SSTMetas(downloadMetas),
logutil.Region(info.Region),
zap.Error(errIngest))
return errors.Trace(errIngest)
}
log.Debug("ingest file done", logutil.Key("start", startKey), logutil.Key("end", endKey), zap.Stringer("take", time.Since(start)))
logutil.CL(ctx).Debug("ingest file done", logutil.Key("start", startKey), logutil.Key("end", endKey), zap.Stringer("take", time.Since(start)))
}
return nil
}, utils.NewImportSSTBackoffStrategy())
if err != nil {
log.Error("import sst file failed after retry, stop the whole progress", restore.ZapBatchBackupFileSet(backupFileSets), zap.Error(err))
logutil.CL(ctx).Error("import sst file failed after retry, stop the whole progress", restore.ZapBatchBackupFileSet(backupFileSets), zap.Error(err))
return errors.Trace(err)
}
metrics.RestoreImportFileSeconds.Observe(time.Since(importBegin).Seconds())

for i, cb := range delayCbs {
if err := cb(); err != nil {
return errors.Annotatef(err, "failed to execute the delaied callback #%d", i)
}
}

for _, files := range backupFileSets {
for _, f := range files.SSTFiles {
summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs)
Expand Down Expand Up @@ -538,15 +553,15 @@ func (importer *SnapFileImporter) download(

failpoint.Inject("restore-storage-error", func(val failpoint.Value) {
msg := val.(string)
log.Debug("failpoint restore-storage-error injected.", zap.String("msg", msg))
logutil.CL(ctx).Debug("failpoint restore-storage-error injected.", zap.String("msg", msg))
e = errors.Annotate(e, msg)
})
failpoint.Inject("restore-gRPC-error", func(_ failpoint.Value) {
log.Warn("the connection to TiKV has been cut by a neko, meow :3")
logutil.CL(ctx).Warn("the connection to TiKV has been cut by a neko, meow :3")
e = status.Error(codes.Unavailable, "the connection to TiKV has been cut by a neko, meow :3")
})
if isDecryptSstErr(e) {
log.Info("fail to decrypt when download sst, try again with no-crypt")
logutil.CL(ctx).Info("fail to decrypt when download sst, try again with no-crypt")
if importer.kvMode == Raw || importer.kvMode == Txn {
downloadMetas, e = importer.downloadRawKVSST(ctx, regionInfo, filesGroup, nil, apiVersion)
} else {
Expand Down Expand Up @@ -847,7 +862,7 @@ func (importer *SnapFileImporter) ingest(
break
}
// do not get region info, wait a second and GetRegion() again.
log.Warn("ingest get region by key return nil", logutil.Region(info.Region),
logutil.CL(ctx).Warn("ingest get region by key return nil", logutil.Region(info.Region),
logutil.SSTMetas(downloadMetas),
)
time.Sleep(time.Second)
Expand All @@ -857,7 +872,7 @@ func (importer *SnapFileImporter) ingest(
if !split.CheckRegionEpoch(newInfo, info) {
return errors.Trace(berrors.ErrKVEpochNotMatch)
}
log.Debug("ingest sst returns not leader error, retry it",
logutil.CL(ctx).Debug("ingest sst returns not leader error, retry it",
logutil.SSTMetas(downloadMetas),
logutil.Region(info.Region),
zap.Stringer("newLeader", newInfo.Leader))
Expand Down Expand Up @@ -900,7 +915,7 @@ func (importer *SnapFileImporter) ingestSSTs(
Context: reqCtx,
Ssts: sstMetas,
}
log.Debug("ingest SSTs", logutil.SSTMetas(sstMetas), logutil.Leader(leader))
logutil.CL(ctx).Debug("ingest SSTs", logutil.SSTMetas(sstMetas), logutil.Leader(leader))
resp, err := importer.importClient.MultiIngest(ctx, leader.GetStoreId(), req)
return resp, errors.Trace(err)
}
Expand Down
32 changes: 24 additions & 8 deletions br/pkg/restore/snap_client/pitr_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
pb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/util"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -79,15 +81,16 @@ func (c *pitrCollector) close() error {

}

func (c *pitrCollector) onBatch(ctx context.Context, fileSets restore.BatchBackupFileSet) error {
func (c *pitrCollector) onBatch(ctx context.Context, fileSets restore.BatchBackupFileSet) (func() error, error) {
if !c.enabled {
return nil
return nil, nil
}

if err := c.prepareMigIfNeeded(ctx); err != nil {
return err
return nil, err
}

begin := time.Now()
eg, ectx := errgroup.WithContext(ctx)
for _, fileSet := range fileSets {
for _, file := range fileSet.SSTFiles {
Expand All @@ -110,12 +113,22 @@ func (c *pitrCollector) onBatch(ctx context.Context, fileSets restore.BatchBacku
}
}

err := eg.Wait()
err := c.persistExtraBackupMeta(ctx)
if err != nil {
return err
return nil, errors.Annotatef(err, "failed to persist backup meta when finishing batch")
}

return errors.Annotatef(c.persistExtraBackupMeta(ctx), "failed to persist backup meta when finishing batch")
waitDone := func() error {
err := eg.Wait()
if err != nil {
logutil.CL(ctx).Warn("Failed to upload SSTs for future PiTR.", logutil.ShortError(err))
return err
}

logutil.CL(ctx).Info("Uploaded a batch of SSTs for future PiTR.", zap.Duration("take", time.Since(begin)))
return nil
}
return waitDone, nil
}

func (c *pitrCollector) doWithExtraBackupMetaLock(f func()) {
Expand Down Expand Up @@ -143,6 +156,8 @@ func (c *pitrCollector) putSST(ctx context.Context, f *pb.File) error {
return nil
}

begin := time.Now()

f = util.ProtoV1Clone(f)
out := c.sstPath(f.Name)

Expand All @@ -160,6 +175,8 @@ func (c *pitrCollector) putSST(ctx context.Context, f *pb.File) error {

f.Name = out
c.doWithExtraBackupMetaLock(func() { c.extraBackupMeta.msg.Files = append(c.extraBackupMeta.msg.Files, f) })

metrics.RestoreUploadSSTForPiTRSeconds.Observe(time.Since(begin).Seconds())
return nil
}

Expand Down Expand Up @@ -189,7 +206,7 @@ func (c *pitrCollector) persistExtraBackupMeta(ctx context.Context) (err error)
c.extraBackupMetaLock.Lock()
defer c.extraBackupMetaLock.Unlock()

log.Info("Persisting extra backup meta.", zap.Stringer("uuid", c.restoreUUID), zap.String("path", c.metaPath()))
logutil.CL(ctx).Info("Persisting extra backup meta.", zap.Stringer("uuid", c.restoreUUID), zap.String("path", c.metaPath()))
msg := c.extraBackupMeta.genMsg()
bs, err := msg.Marshal()
if err != nil {
Expand Down Expand Up @@ -300,7 +317,6 @@ func newPiTRColl(ctx context.Context, deps PiTRCollDep) (*pitrCollector, error)
return nil, errors.Trace(err)
}
coll.restoreStorage = restoreStrg

coll.resetCommitting()
return coll, nil
}
8 changes: 8 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/collate"
Expand Down Expand Up @@ -671,6 +672,11 @@ func DefaultRestoreConfig(commonConfig Config) RestoreConfig {
return cfg
}

func printRestoreMetrics() {
log.Info("Metric: import_file_seconds", zap.Object("metric", logutil.MarshalHistogram(metrics.RestoreImportFileSeconds)))
log.Info("Metric: upload_sst_for_pitr_seconds", zap.Object("metric", logutil.MarshalHistogram(metrics.RestoreUploadSSTForPiTRSeconds)))
}

// RunRestore starts a restore task inside the current goroutine.
func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
etcdCLI, err := dialEtcdWithCfg(c, cfg.Config)
Expand Down Expand Up @@ -704,6 +710,8 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
}
defer mgr.Close()

defer printRestoreMetrics()

var restoreError error
if IsStreamRestore(cmdName) {
if err := version.CheckClusterVersion(c, mgr.GetPDClient(), version.CheckVersionForBRPiTR); err != nil {
Expand Down
30 changes: 30 additions & 0 deletions pkg/metrics/br.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package metrics

import "github.com/prometheus/client_golang/prometheus"

var (
RestoreImportFileSeconds prometheus.Histogram
RestoreUploadSSTForPiTRSeconds prometheus.Histogram
)

func InitBRMetrics() {
RestoreImportFileSeconds = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "br",
Name: "restore_import_file_seconds",

Help: "The time cost for importing a file (send it to a file's all peers and put metadata)",

Buckets: prometheus.ExponentialBuckets(0.01, 2, 14),
})

RestoreUploadSSTForPiTRSeconds = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "br",
Name: "restore_upload_sst_for_pitr_seconds",

Help: "The time cost for uploading SST files for point-in-time recovery",

Buckets: prometheus.DefBuckets,
})
}
3 changes: 3 additions & 0 deletions pkg/util/metricsutil/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func RegisterMetricsForBR(pdAddrs []string, keyspaceName string) error {
return err
}

// For now, those metrics are not registered.
// They will be printed to log during restoring...
metrics.InitBRMetrics()
return registerMetrics(keyspaceMeta)
}

Expand Down

0 comments on commit 9f64153

Please sign in to comment.