diff --git a/br/pkg/logutil/logging.go b/br/pkg/logutil/logging.go index 353ca6622e896..22480e7b12d48 100644 --- a/br/pkg/logutil/logging.go +++ b/br/pkg/logutil/logging.go @@ -15,7 +15,9 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/lightning/metric" "github.com/pingcap/tidb/pkg/util/redact" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -356,3 +358,25 @@ func (b HexBytes) String() string { func (b HexBytes) MarshalJSON() ([]byte, error) { return json.Marshal(hex.EncodeToString(b)) } + +func MarshalHistogram(m prometheus.Histogram) zapcore.ObjectMarshaler { + return zapcore.ObjectMarshalerFunc(func(mal zapcore.ObjectEncoder) error { + if m == nil { + return nil + } + + met := metric.ReadHistogram(m) + if met == nil || met.Histogram == nil { + return nil + } + + hist := met.Histogram + for _, b := range hist.GetBucket() { + key := fmt.Sprintf("lt_%f", b.GetUpperBound()) + mal.AddUint64(key, b.GetCumulativeCount()) + } + mal.AddUint64("count", hist.GetSampleCount()) + mal.AddFloat64("total", hist.GetSampleSum()) + return nil + }) +} diff --git a/br/pkg/restore/restorer.go b/br/pkg/restore/restorer.go index 9d999af9c09fc..cecbe359bfe5d 100644 --- a/br/pkg/restore/restorer.go +++ b/br/pkg/restore/restorer.go @@ -276,7 +276,7 @@ func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets .. m.ectx = opentracing.ContextWithSpan(m.ectx, span1) } - for _, batchFileSet := range batchFileSets { + for i, batchFileSet := range batchFileSets { if m.ectx.Err() != nil { log.Warn("Restoring encountered error and already stopped, give up remained files.", logutil.ShortError(m.ectx.Err())) @@ -295,7 +295,8 @@ func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets .. onProgress(int64(len(filesReplica))) } }() - if importErr := m.fileImporter.Import(m.ectx, filesReplica...); importErr != nil { + cx := logutil.ContextWithField(m.ectx, zap.Int("sn", i)) + if importErr := m.fileImporter.Import(cx, filesReplica...); importErr != nil { return errors.Trace(importErr) } diff --git a/br/pkg/restore/snap_client/import.go b/br/pkg/restore/snap_client/import.go index fed9b2101c559..eabb5be7ede0f 100644 --- a/br/pkg/restore/snap_client/import.go +++ b/br/pkg/restore/snap_client/import.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/util/codec" kvutil "github.com/tikv/client-go/v2/util" "go.uber.org/zap" @@ -142,7 +143,7 @@ type SnapFileImporter struct { ingestTokensMap *storeTokenChannelMap closeCallbacks []func(*SnapFileImporter) error - beforeIngestCallbacks []func(context.Context, restore.BatchBackupFileSet) error + beforeIngestCallbacks []func(context.Context, restore.BatchBackupFileSet) (afterIngest func() error, err error) concurrencyPerStore uint @@ -373,12 +374,18 @@ func (importer *SnapFileImporter) Import( ctx context.Context, backupFileSets ...restore.BackupFileSet, ) error { + delayCbs := []func() error{} for i, cb := range importer.beforeIngestCallbacks { - if err := cb(ctx, backupFileSets); err != nil { + d, err := cb(ctx, backupFileSets) + if err != nil { return errors.Annotatef(err, "failed to executing the callback #%d", i) } + if d != nil { + delayCbs = append(delayCbs, d) + } } + importBegin := time.Now() // Rewrite the start key and end key of file to scan regions startKey, endKey, err := importer.getKeyRangeForFiles(backupFileSets) if err != nil { @@ -393,7 +400,7 @@ func (importer *SnapFileImporter) Import( return errors.Trace(errScanRegion) } - log.Debug("scan regions", logutil.Key("start key", startKey), logutil.Key("end key", endKey), zap.Int("count", len(regionInfos))) + logutil.CL(ctx).Debug("scan regions", logutil.Key("start key", startKey), logutil.Key("end key", endKey), zap.Int("count", len(regionInfos))) start := time.Now() // Try to download and ingest the file in every region for _, regionInfo := range regionInfos { @@ -401,18 +408,18 @@ func (importer *SnapFileImporter) Import( // Try to download file. downloadMetas, errDownload := importer.download(ctx, info, backupFileSets, importer.cipher, importer.apiVersion) if errDownload != nil { - log.Warn("download file failed, retry later", + logutil.CL(ctx).Warn("download file failed, retry later", logutil.Region(info.Region), logutil.Key("startKey", startKey), logutil.Key("endKey", endKey), logutil.ShortError(errDownload)) return errors.Trace(errDownload) } - log.Debug("download file done", zap.Stringer("take", time.Since(start)), + logutil.CL(ctx).Debug("download file done", zap.Stringer("take", time.Since(start)), logutil.Key("start", startKey), logutil.Key("end", endKey)) start = time.Now() if errIngest := importer.ingest(ctx, info, downloadMetas); errIngest != nil { - log.Warn("ingest file failed, retry later", + logutil.CL(ctx).Warn("ingest file failed, retry later", logutil.Key("start", startKey), logutil.Key("end", endKey), logutil.SSTMetas(downloadMetas), @@ -420,14 +427,22 @@ func (importer *SnapFileImporter) Import( zap.Error(errIngest)) return errors.Trace(errIngest) } - log.Debug("ingest file done", logutil.Key("start", startKey), logutil.Key("end", endKey), zap.Stringer("take", time.Since(start))) + logutil.CL(ctx).Debug("ingest file done", logutil.Key("start", startKey), logutil.Key("end", endKey), zap.Stringer("take", time.Since(start))) } return nil }, utils.NewImportSSTBackoffStrategy()) if err != nil { - log.Error("import sst file failed after retry, stop the whole progress", restore.ZapBatchBackupFileSet(backupFileSets), zap.Error(err)) + logutil.CL(ctx).Error("import sst file failed after retry, stop the whole progress", restore.ZapBatchBackupFileSet(backupFileSets), zap.Error(err)) return errors.Trace(err) } + metrics.RestoreImportFileSeconds.Observe(time.Since(importBegin).Seconds()) + + for i, cb := range delayCbs { + if err := cb(); err != nil { + return errors.Annotatef(err, "failed to execute the delaied callback #%d", i) + } + } + for _, files := range backupFileSets { for _, f := range files.SSTFiles { summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs) @@ -538,15 +553,15 @@ func (importer *SnapFileImporter) download( failpoint.Inject("restore-storage-error", func(val failpoint.Value) { msg := val.(string) - log.Debug("failpoint restore-storage-error injected.", zap.String("msg", msg)) + logutil.CL(ctx).Debug("failpoint restore-storage-error injected.", zap.String("msg", msg)) e = errors.Annotate(e, msg) }) failpoint.Inject("restore-gRPC-error", func(_ failpoint.Value) { - log.Warn("the connection to TiKV has been cut by a neko, meow :3") + logutil.CL(ctx).Warn("the connection to TiKV has been cut by a neko, meow :3") e = status.Error(codes.Unavailable, "the connection to TiKV has been cut by a neko, meow :3") }) if isDecryptSstErr(e) { - log.Info("fail to decrypt when download sst, try again with no-crypt") + logutil.CL(ctx).Info("fail to decrypt when download sst, try again with no-crypt") if importer.kvMode == Raw || importer.kvMode == Txn { downloadMetas, e = importer.downloadRawKVSST(ctx, regionInfo, filesGroup, nil, apiVersion) } else { @@ -847,7 +862,7 @@ func (importer *SnapFileImporter) ingest( break } // do not get region info, wait a second and GetRegion() again. - log.Warn("ingest get region by key return nil", logutil.Region(info.Region), + logutil.CL(ctx).Warn("ingest get region by key return nil", logutil.Region(info.Region), logutil.SSTMetas(downloadMetas), ) time.Sleep(time.Second) @@ -857,7 +872,7 @@ func (importer *SnapFileImporter) ingest( if !split.CheckRegionEpoch(newInfo, info) { return errors.Trace(berrors.ErrKVEpochNotMatch) } - log.Debug("ingest sst returns not leader error, retry it", + logutil.CL(ctx).Debug("ingest sst returns not leader error, retry it", logutil.SSTMetas(downloadMetas), logutil.Region(info.Region), zap.Stringer("newLeader", newInfo.Leader)) @@ -900,7 +915,7 @@ func (importer *SnapFileImporter) ingestSSTs( Context: reqCtx, Ssts: sstMetas, } - log.Debug("ingest SSTs", logutil.SSTMetas(sstMetas), logutil.Leader(leader)) + logutil.CL(ctx).Debug("ingest SSTs", logutil.SSTMetas(sstMetas), logutil.Leader(leader)) resp, err := importer.importClient.MultiIngest(ctx, leader.GetStoreId(), req) return resp, errors.Trace(err) } diff --git a/br/pkg/restore/snap_client/pitr_collector.go b/br/pkg/restore/snap_client/pitr_collector.go index fe22df4119029..b1d5f008c8363 100644 --- a/br/pkg/restore/snap_client/pitr_collector.go +++ b/br/pkg/restore/snap_client/pitr_collector.go @@ -12,11 +12,13 @@ import ( pb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" "github.com/pingcap/tidb/br/pkg/streamhelper" "github.com/pingcap/tidb/br/pkg/summary" + "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/util" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" @@ -79,15 +81,16 @@ func (c *pitrCollector) close() error { } -func (c *pitrCollector) onBatch(ctx context.Context, fileSets restore.BatchBackupFileSet) error { +func (c *pitrCollector) onBatch(ctx context.Context, fileSets restore.BatchBackupFileSet) (func() error, error) { if !c.enabled { - return nil + return nil, nil } if err := c.prepareMigIfNeeded(ctx); err != nil { - return err + return nil, err } + begin := time.Now() eg, ectx := errgroup.WithContext(ctx) for _, fileSet := range fileSets { for _, file := range fileSet.SSTFiles { @@ -110,12 +113,22 @@ func (c *pitrCollector) onBatch(ctx context.Context, fileSets restore.BatchBacku } } - err := eg.Wait() + err := c.persistExtraBackupMeta(ctx) if err != nil { - return err + return nil, errors.Annotatef(err, "failed to persist backup meta when finishing batch") } - return errors.Annotatef(c.persistExtraBackupMeta(ctx), "failed to persist backup meta when finishing batch") + waitDone := func() error { + err := eg.Wait() + if err != nil { + logutil.CL(ctx).Warn("Failed to upload SSTs for future PiTR.", logutil.ShortError(err)) + return err + } + + logutil.CL(ctx).Info("Uploaded a batch of SSTs for future PiTR.", zap.Duration("take", time.Since(begin))) + return nil + } + return waitDone, nil } func (c *pitrCollector) doWithExtraBackupMetaLock(f func()) { @@ -143,6 +156,8 @@ func (c *pitrCollector) putSST(ctx context.Context, f *pb.File) error { return nil } + begin := time.Now() + f = util.ProtoV1Clone(f) out := c.sstPath(f.Name) @@ -160,6 +175,8 @@ func (c *pitrCollector) putSST(ctx context.Context, f *pb.File) error { f.Name = out c.doWithExtraBackupMetaLock(func() { c.extraBackupMeta.msg.Files = append(c.extraBackupMeta.msg.Files, f) }) + + metrics.RestoreUploadSSTForPiTRSeconds.Observe(time.Since(begin).Seconds()) return nil } @@ -189,7 +206,7 @@ func (c *pitrCollector) persistExtraBackupMeta(ctx context.Context) (err error) c.extraBackupMetaLock.Lock() defer c.extraBackupMetaLock.Unlock() - log.Info("Persisting extra backup meta.", zap.Stringer("uuid", c.restoreUUID), zap.String("path", c.metaPath())) + logutil.CL(ctx).Info("Persisting extra backup meta.", zap.Stringer("uuid", c.restoreUUID), zap.String("path", c.metaPath())) msg := c.extraBackupMeta.genMsg() bs, err := msg.Marshal() if err != nil { @@ -300,7 +317,6 @@ func newPiTRColl(ctx context.Context, deps PiTRCollDep) (*pitrCollector, error) return nil, errors.Trace(err) } coll.restoreStorage = restoreStrg - coll.resetCommitting() return coll, nil } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 26d94ccbfc079..ace31ccf42606 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/metrics" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/collate" @@ -671,6 +672,11 @@ func DefaultRestoreConfig(commonConfig Config) RestoreConfig { return cfg } +func printRestoreMetrics() { + log.Info("Metric: import_file_seconds", zap.Object("metric", logutil.MarshalHistogram(metrics.RestoreImportFileSeconds))) + log.Info("Metric: upload_sst_for_pitr_seconds", zap.Object("metric", logutil.MarshalHistogram(metrics.RestoreUploadSSTForPiTRSeconds))) +} + // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { etcdCLI, err := dialEtcdWithCfg(c, cfg.Config) @@ -704,6 +710,8 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } defer mgr.Close() + defer printRestoreMetrics() + var restoreError error if IsStreamRestore(cmdName) { if err := version.CheckClusterVersion(c, mgr.GetPDClient(), version.CheckVersionForBRPiTR); err != nil { diff --git a/pkg/metrics/br.go b/pkg/metrics/br.go new file mode 100644 index 0000000000000..1754bd190a5a9 --- /dev/null +++ b/pkg/metrics/br.go @@ -0,0 +1,30 @@ +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +var ( + RestoreImportFileSeconds prometheus.Histogram + RestoreUploadSSTForPiTRSeconds prometheus.Histogram +) + +func InitBRMetrics() { + RestoreImportFileSeconds = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "tidb", + Subsystem: "br", + Name: "restore_import_file_seconds", + + Help: "The time cost for importing a file (send it to a file's all peers and put metadata)", + + Buckets: prometheus.ExponentialBuckets(0.01, 2, 14), + }) + + RestoreUploadSSTForPiTRSeconds = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "tidb", + Subsystem: "br", + Name: "restore_upload_sst_for_pitr_seconds", + + Help: "The time cost for uploading SST files for point-in-time recovery", + + Buckets: prometheus.DefBuckets, + }) +} diff --git a/pkg/util/metricsutil/common.go b/pkg/util/metricsutil/common.go index 0964c7a36ee92..5d15399282754 100644 --- a/pkg/util/metricsutil/common.go +++ b/pkg/util/metricsutil/common.go @@ -92,6 +92,9 @@ func RegisterMetricsForBR(pdAddrs []string, keyspaceName string) error { return err } + // For now, those metrics are not registered. + // They will be printed to log during restoring... + metrics.InitBRMetrics() return registerMetrics(keyspaceMeta) }