From b53e761c3340079942552f471ef5fb1b3d9184d8 Mon Sep 17 00:00:00 2001 From: "Eduardo J. Ortega U." <5791035+ejortegau@users.noreply.github.com> Date: Thu, 23 Jan 2025 11:34:04 +0100 Subject: [PATCH] Use prefix in all vtorc check and recover logs (#17526) Signed-off-by: Eduardo J. Ortega U. <5791035+ejortegau@users.noreply.github.com> --- go/vt/log/log.go | 91 ++++++++++++ go/vt/vtorc/logic/disable_recovery.go | 3 +- go/vt/vtorc/logic/topology_recovery.go | 150 ++++++++++++-------- go/vt/vtorc/logic/topology_recovery_dao.go | 5 +- go/vt/vtorc/logic/topology_recovery_test.go | 3 +- 5 files changed, 190 insertions(+), 62 deletions(-) diff --git a/go/vt/log/log.go b/go/vt/log/log.go index 79be1da464c..fb0c90bbb1c 100644 --- a/go/vt/log/log.go +++ b/go/vt/log/log.go @@ -111,3 +111,94 @@ func (lrms *logRotateMaxSize) String() string { func (lrms *logRotateMaxSize) Type() string { return "uint64" } + +type PrefixedLogger struct { + prefix string +} + +func NewPrefixedLogger(prefix string) *PrefixedLogger { + return &PrefixedLogger{prefix: prefix + ": "} +} + +func (pl *PrefixedLogger) V(level glog.Level) glog.Verbose { + return V(level) +} + +func (pl *PrefixedLogger) Flush() { + Flush() +} + +func (pl *PrefixedLogger) Info(args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Info(args...) +} + +func (pl *PrefixedLogger) Infof(format string, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Infof("%s"+format, args...) +} + +func (pl *PrefixedLogger) InfoDepth(depth int, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + InfoDepth(depth, args...) +} + +func (pl *PrefixedLogger) Warning(args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Warning(args...) +} + +func (pl *PrefixedLogger) Warningf(format string, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Warningf("%s"+format, args...) +} + +func (pl *PrefixedLogger) WarningDepth(depth int, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + WarningDepth(depth, args...) +} + +func (pl *PrefixedLogger) Error(args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Error(args...) +} + +func (pl *PrefixedLogger) Errorf(format string, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Errorf("%s"+format, args...) +} + +func (pl *PrefixedLogger) ErrorDepth(depth int, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + ErrorDepth(depth, args...) +} + +func (pl *PrefixedLogger) Exit(args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Exit(args...) +} + +func (pl *PrefixedLogger) Exitf(format string, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Exitf("%s"+format, args...) +} + +func (pl *PrefixedLogger) ExitDepth(depth int, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + ExitDepth(depth, args...) +} + +func (pl *PrefixedLogger) Fatal(args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Fatal(args...) +} + +func (pl *PrefixedLogger) Fatalf(format string, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Fatalf("%s"+format, args...) +} + +func (pl *PrefixedLogger) FatalDepth(depth int, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + FatalDepth(depth, args...) +} diff --git a/go/vt/vtorc/logic/disable_recovery.go b/go/vt/vtorc/logic/disable_recovery.go index 44e4f5a66ff..60650798876 100644 --- a/go/vt/vtorc/logic/disable_recovery.go +++ b/go/vt/vtorc/logic/disable_recovery.go @@ -30,6 +30,7 @@ package logic // go to the database each time. import ( + "errors" "fmt" "vitess.io/vitess/go/vt/external/golib/sqlutils" @@ -55,7 +56,7 @@ func IsRecoveryDisabled() (disabled bool, err error) { if err != nil { errMsg := fmt.Sprintf("recovery.IsRecoveryDisabled(): %v", err) log.Errorf(errMsg) - err = fmt.Errorf(errMsg) + err = errors.New(errMsg) } return disabled, err } diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index 6e4f82e6980..d6ac04d9414 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -202,13 +202,15 @@ func resolveRecovery(topologyRecovery *TopologyRecovery, successorInstance *inst } // recoverPrimaryHasPrimary resets the replication on the primary instance -func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { +func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, logger *log.PrefixedLogger) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) if topologyRecovery == nil { - _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixPrimaryHasPrimary.", analysisEntry.AnalyzedInstanceAlias)) + message := fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixPrimaryHasPrimary.", analysisEntry.AnalyzedInstanceAlias) + logger.Warning(message) + _ = AuditTopologyRecovery(topologyRecovery, message) return false, nil, err } - log.Infof("Analysis: %v, will fix incorrect primaryship on %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Infof("Analysis: %v, will fix incorrect primaryship on %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) // This has to be done in the end; whether successful or not, we should mark that the recovery is done. // So that after the active period passes, we are able to run other recoveries. defer func() { @@ -218,6 +220,7 @@ func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry *inst.Replicati // Read the tablet information from the database to find the shard and keyspace of the tablet analyzedTablet, err := inst.ReadTablet(analysisEntry.AnalyzedInstanceAlias) if err != nil { + logger.Errorf("Failed to read instance %s, aborting recovery", analysisEntry.AnalyzedInstanceAlias) return false, nil, err } @@ -228,7 +231,7 @@ func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry *inst.Replicati // runEmergencyReparentOp runs a recovery for which we have to run ERS. Here waitForAllTablets is a boolean telling ERS whether it should wait for all the tablets // or is it okay to skip 1. -func runEmergencyReparentOp(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, recoveryName string, waitForAllTablets bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { +func runEmergencyReparentOp(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, recoveryName string, waitForAllTablets bool, logger *log.PrefixedLogger) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { if !analysisEntry.ClusterDetails.HasAutomatedPrimaryRecovery { return false, nil, nil } @@ -236,15 +239,18 @@ func runEmergencyReparentOp(ctx context.Context, analysisEntry *inst.Replication // Read the tablet information from the database to find the shard and keyspace of the tablet tablet, err := inst.ReadTablet(analysisEntry.AnalyzedInstanceAlias) if err != nil { + logger.Errorf("Failed to read instance %s, aborting recovery", analysisEntry.AnalyzedInstanceAlias) return false, nil, err } topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, true, true) if topologyRecovery == nil { - _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another %v.", analysisEntry.AnalyzedInstanceAlias, recoveryName)) + message := fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another %v.", analysisEntry.AnalyzedInstanceAlias, recoveryName) + logger.Warning(message) + _ = AuditTopologyRecovery(topologyRecovery, message) return false, nil, err } - log.Infof("Analysis: %v, %v %+v", analysisEntry.Analysis, recoveryName, analysisEntry.AnalyzedInstanceAlias) + logger.Infof("Analysis: %v, %v %+v", analysisEntry.Analysis, recoveryName, analysisEntry.AnalyzedInstanceAlias) var promotedReplica *inst.Instance // This has to be done in the end; whether successful or not, we should mark that the recovery is done. // So that after the active period passes, we are able to run other recoveries. @@ -258,11 +264,11 @@ func runEmergencyReparentOp(ctx context.Context, analysisEntry *inst.Replication // we only log the warnings and errors explicitly, everything gets logged as an information message anyways in auditing topology recovery switch level { case logutilpb.Level_WARNING: - log.Warningf("ERS - %s", value) + logger.Warningf("ERS - %s", value) case logutilpb.Level_ERROR: - log.Errorf("ERS - %s", value) + logger.Errorf("ERS - %s", value) default: - log.Infof("ERS - %s", value) + logger.Infof("ERS - %s", value) } _ = AuditTopologyRecovery(topologyRecovery, value) })).ReparentShard(ctx, @@ -276,7 +282,7 @@ func runEmergencyReparentOp(ctx context.Context, analysisEntry *inst.Replication }, ) if err != nil { - log.Errorf("Error running ERS - %v", err) + logger.Errorf("Error running ERS - %v", err) } if ev != nil && ev.NewPrimary != nil { @@ -288,13 +294,13 @@ func runEmergencyReparentOp(ctx context.Context, analysisEntry *inst.Replication // recoverDeadPrimary checks a given analysis, decides whether to take action, and possibly takes action // Returns true when action was taken. -func recoverDeadPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - return runEmergencyReparentOp(ctx, analysisEntry, "RecoverDeadPrimary", false) +func recoverDeadPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, logger *log.PrefixedLogger) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + return runEmergencyReparentOp(ctx, analysisEntry, "RecoverDeadPrimary", false, logger) } // recoverPrimaryTabletDeleted tries to run a recovery for the case where the primary tablet has been deleted. -func recoverPrimaryTabletDeleted(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - return runEmergencyReparentOp(ctx, analysisEntry, "PrimaryTabletDeleted", true) +func recoverPrimaryTabletDeleted(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, logger *log.PrefixedLogger) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + return runEmergencyReparentOp(ctx, analysisEntry, "PrimaryTabletDeleted", true, logger) } func postErsCompletion(topologyRecovery *TopologyRecovery, analysisEntry *inst.ReplicationAnalysis, recoveryName string, promotedReplica *inst.Instance) { @@ -308,12 +314,14 @@ func postErsCompletion(topologyRecovery *TopologyRecovery, analysisEntry *inst.R } // checkAndRecoverGenericProblem is a general-purpose recovery function -func checkAndRecoverLockedSemiSyncPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { +func checkAndRecoverLockedSemiSyncPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, logger *log.PrefixedLogger) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + logger.Warning("No actions in checkAndRecoverLockedSemiSyncPrimary") return false, nil, nil } // checkAndRecoverGenericProblem is a general-purpose recovery function -func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (bool, *TopologyRecovery, error) { +func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, logger *log.PrefixedLogger) (bool, *TopologyRecovery, error) { + logger.Warning("No actions in checkAndRecoverGenericProblem") return false, nil, nil } @@ -499,7 +507,7 @@ func hasActionableRecovery(recoveryFunctionCode recoveryFunction) bool { // getCheckAndRecoverFunction gets the recovery function for the given code. func getCheckAndRecoverFunction(recoveryFunctionCode recoveryFunction) ( - checkAndRecoverFunction func(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error), + checkAndRecoverFunction func(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, logger *log.PrefixedLogger) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error), ) { switch recoveryFunctionCode { case noRecoveryFunc: @@ -597,17 +605,22 @@ func runEmergentOperations(analysisEntry *inst.ReplicationAnalysis) { func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (err error) { countPendingRecoveries.Add(1) defer countPendingRecoveries.Add(-1) + + logger := log.NewPrefixedLogger(fmt.Sprintf("Recovery for %s on %s/%s", analysisEntry.Analysis, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard)) + logger.Info("Starting checkAndRecover") + checkAndRecoverFunctionCode := getCheckAndRecoverFunctionCode(analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) isActionableRecovery := hasActionableRecovery(checkAndRecoverFunctionCode) analysisEntry.IsActionableRecovery = isActionableRecovery runEmergentOperations(analysisEntry) if checkAndRecoverFunctionCode == noRecoveryFunc { + logger.Warning("No recovery strategies for problem, aborting recovery") // Unhandled problem type if analysisEntry.Analysis != inst.NoProblem { if util.ClearToLog("executeCheckAndRecoverFunction", analysisEntry.AnalyzedInstanceAlias) { - log.Warningf("executeCheckAndRecoverFunction: ignoring analysisEntry that has no action plan: %+v; tablet: %+v", - analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Warningf("executeCheckAndRecoverFunction: ignoring analysisEntry that has no action plan: tablet: %+v", + analysisEntry.AnalyzedInstanceAlias) } } @@ -616,7 +629,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er // we have a recovery function; its execution still depends on filters if not disabled. if isActionableRecovery || util.ClearToLog("executeCheckAndRecoverFunction: detection", analysisEntry.AnalyzedInstanceAlias) { - log.Infof("executeCheckAndRecoverFunction: proceeding with %+v detection on %+v; isActionable?: %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, isActionableRecovery) + logger.Infof("executeCheckAndRecoverFunction: proceeding with %+v detection on %+v; isActionable?: %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, isActionableRecovery) } // At this point we have validated there's a failure scenario for which we have a recovery path. @@ -624,7 +637,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er // Initiate detection: _, _, err = checkAndExecuteFailureDetectionProcesses(analysisEntry) if err != nil { - log.Errorf("executeCheckAndRecoverFunction: error on failure detection: %+v", err) + logger.Errorf("executeCheckAndRecoverFunction: error on failure detection: %+v", err) return err } // We don't mind whether detection really executed the processes or not @@ -635,10 +648,10 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er // Check for recovery being disabled globally if recoveryDisabledGlobally, err := IsRecoveryDisabled(); err != nil { // Unexpected. Shouldn't get this - log.Errorf("Unable to determine if recovery is disabled globally: %v", err) + logger.Errorf("Unable to determine if recovery is disabled globally, still attempting to recover: %v", err) } else if recoveryDisabledGlobally { - log.Infof("CheckAndRecover: Analysis: %+v, Tablet: %+v: NOT Recovering host (disabled globally)", - analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Infof("CheckAndRecover: Tablet: %+v: NOT Recovering host (disabled globally)", + analysisEntry.AnalyzedInstanceAlias) return err } @@ -646,6 +659,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er // We lock the shard here and then refresh the tablets information ctx, unlock, err := LockShard(context.Background(), analysisEntry.AnalyzedInstanceAlias, getLockAction(analysisEntry.AnalyzedInstanceAlias, analysisEntry.Analysis)) if err != nil { + logger.Errorf("Failed to lock shard, aborting recovery: %v", err) return err } defer unlock(&err) @@ -655,7 +669,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er // changes, we should be checking that this failure is indeed needed to be fixed. We do this after locking the shard to be sure // that the data that we use now is up-to-date. if isActionableRecovery { - log.Errorf("executeCheckAndRecoverFunction: Proceeding with %v recovery on %v validation after acquiring shard lock.", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Infof("executeCheckAndRecoverFunction: Proceeding with %v recovery on %v validation after acquiring shard lock.", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) // The first step we have to do is refresh the keyspace and shard information // This is required to know if the durability policies have changed or not // If they have, then recoveries like ReplicaSemiSyncMustNotBeSet, etc won't be valid anymore. @@ -663,6 +677,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er // a change in the recovery we run. err = RefreshKeyspaceAndShard(analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard) if err != nil { + logger.Errorf("Failed to refresh keyspace and shard, aborting recovery: %v", err) return err } // If we are about to run a cluster-wide recovery, it is imperative to first refresh all the tablets @@ -675,6 +690,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er } // We ignore the dead primary tablet because it is going to be unreachable. If all the other tablets aren't able to reach this tablet either, // we can proceed with the dead primary recovery. We don't need to refresh the information for this dead tablet. + logger.Info("Force refreshing all shard tablets") forceRefreshAllTabletsInShard(ctx, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard, tabletsToIgnore) } else { // If we are not running a cluster-wide recovery, then it is only concerned with the specific tablet @@ -683,74 +699,80 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er // and the host-port set on the tablet in question. // So, we only need to refresh the tablet info records (to know if the primary tablet has changed), // and the replication data of the new primary and this tablet. + logger.Info("Refreshing shard tablet info") refreshTabletInfoOfShard(ctx, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard) + logger.Info("Discovering analysis instance") DiscoverInstance(analysisEntry.AnalyzedInstanceAlias, true) + logger.Info("Getting shard primary") primaryTablet, err := shardPrimary(analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard) if err != nil { - log.Errorf("executeCheckAndRecoverFunction: Analysis: %+v, Tablet: %+v: error while finding the shard primary: %v", - analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, err) + logger.Errorf("executeCheckAndRecoverFunction: Tablet: %+v: error while finding the shard primary: %v", + analysisEntry.AnalyzedInstanceAlias, err) return err } primaryTabletAlias := topoproto.TabletAliasString(primaryTablet.Alias) // We can skip the refresh if we know the tablet we are looking at is the primary tablet. // This would be the case for PrimaryHasPrimary recovery. We don't need to refresh the same tablet twice. if analysisEntry.AnalyzedInstanceAlias != primaryTabletAlias { + logger.Info("Discovering primary instance") DiscoverInstance(primaryTabletAlias, true) } } alreadyFixed, err := checkIfAlreadyFixed(analysisEntry) if err != nil { - log.Errorf("executeCheckAndRecoverFunction: Analysis: %+v, Tablet: %+v: error while trying to find if the problem is already fixed: %v", - analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, err) + logger.Errorf("executeCheckAndRecoverFunction: Tablet: %+v: error while trying to find if the problem is already fixed: %v", + analysisEntry.AnalyzedInstanceAlias, err) return err } if alreadyFixed { - log.Infof("Analysis: %v on tablet %v - No longer valid, some other agent must have fixed the problem.", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Infof("Analysis: %v on tablet %v - No longer valid, some other agent must have fixed the problem.", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) return nil } } // Actually attempt recovery: if isActionableRecovery || util.ClearToLog("executeCheckAndRecoverFunction: recovery", analysisEntry.AnalyzedInstanceAlias) { - log.Infof("executeCheckAndRecoverFunction: proceeding with %+v recovery on %+v; isRecoverable?: %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, isActionableRecovery) + logger.Infof("executeCheckAndRecoverFunction: proceeding with recovery on %+v; isRecoverable?: %+v", analysisEntry.AnalyzedInstanceAlias, isActionableRecovery) } - - recoveryAttempted, topologyRecovery, err := getCheckAndRecoverFunction(checkAndRecoverFunctionCode)(ctx, analysisEntry) + recoveryAttempted, topologyRecovery, err := getCheckAndRecoverFunction(checkAndRecoverFunctionCode)(ctx, analysisEntry, logger) if !recoveryAttempted { - log.Infof("No recovery attempted on %s for problem %s.", analysisEntry.AnalyzedInstanceHostname, analysisEntry.Analysis) + logger.Errorf("Recovery not attempted: %+v", err) return err } recoveryName := getRecoverFunctionName(checkAndRecoverFunctionCode) recoveriesCounter.Add(recoveryName, 1) if err != nil { message := fmt.Sprintf("Recovery failed on %s for problem %s. Error: %s", analysisEntry.AnalyzedInstanceHostname, analysisEntry.Analysis, err.Error()) - log.Info(message) vtopsExec.SendSlackMessage(message, vtopsSlackChannel) + logger.Errorf(message) recoveriesFailureCounter.Add(recoveryName, 1) } else { message := fmt.Sprintf("Recovery succeeded on %s for problem %s.", analysisEntry.AnalyzedInstanceHostname, analysisEntry.Analysis) - log.Info(message) + logger.Info(message) vtopsExec.SendSlackMessage(message, vtopsSlackChannel) recoveriesSuccessfulCounter.Add(recoveryName, 1) } if topologyRecovery == nil { + logger.Error("Topology recovery is nil - recovery might have failed") return err } if b, err := json.Marshal(topologyRecovery); err == nil { - log.Infof("Topology recovery: %+v", string(b)) + logger.Infof("Topology recovery: %+v", string(b)) } else { - log.Infof("Topology recovery: %+v", topologyRecovery) + logger.Infof("Topology recovery: %+v", topologyRecovery) } // If we ran a cluster wide recovery and actually attempted it, then we know that the replication state for all the tablets in this cluster // would have changed. So we can go ahead and pre-emptively refresh them. // For this refresh we don't use the same context that we used for the recovery, since that context might have expired or could expire soon // Instead we pass the background context. The call forceRefreshAllTabletsInShard handles adding a timeout to it for us. if isClusterWideRecovery(checkAndRecoverFunctionCode) { + logger.Info("Forcing refresh of all tablets post recovery") forceRefreshAllTabletsInShard(context.Background(), analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard, nil) } else { // For all other recoveries, we would have changed the replication status of the analyzed tablet // so it doesn't hurt to re-read the information of this tablet, otherwise we'll requeue the same recovery // that we just completed because we would be using stale data. + logger.Info("Force discovering problem instance %s post recovery", analysisEntry.AnalyzedInstanceAlias) DiscoverInstance(analysisEntry.AnalyzedInstanceAlias, true) } return err @@ -832,13 +854,15 @@ func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry *inst.R } // electNewPrimary elects a new primary while none were present before. -func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { +func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, logger *log.PrefixedLogger) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false /*failIfFailedInstanceInActiveRecovery*/, true /*failIfClusterInActiveRecovery*/) if topologyRecovery == nil || err != nil { - _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another electNewPrimary.", analysisEntry.AnalyzedInstanceAlias)) + message := fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another electNewPrimary.", analysisEntry.AnalyzedInstanceAlias) + logger.Warning(message) + _ = AuditTopologyRecovery(topologyRecovery, message) return false, nil, err } - log.Infof("Analysis: %v, will elect a new primary for %v:%v", analysisEntry.Analysis, analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard) + logger.Infof("Analysis: %v, will elect a new primary for %v:%v", analysisEntry.Analysis, analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard) var promotedReplica *inst.Instance // This has to be done in the end; whether successful or not, we should mark that the recovery is done. @@ -849,6 +873,7 @@ func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysi analyzedTablet, err := inst.ReadTablet(analysisEntry.AnalyzedInstanceAlias) if err != nil { + logger.Errorf("Failed to read instance %s, aborting recovery", analysisEntry.AnalyzedInstanceAlias) return false, topologyRecovery, err } _ = AuditTopologyRecovery(topologyRecovery, "starting PlannedReparentShard for electing new primary.") @@ -859,9 +884,9 @@ func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysi // we only log the warnings and errors explicitly, everything gets logged as an information message anyways in auditing topology recovery switch level { case logutilpb.Level_WARNING: - log.Warningf("PRS - %s", value) + logger.Warningf("PRS - %s", value) case logutilpb.Level_ERROR: - log.Errorf("PRS - %s", value) + logger.Errorf("PRS - %s", value) } _ = AuditTopologyRecovery(topologyRecovery, value) })).ReparentShard(ctx, @@ -881,13 +906,16 @@ func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysi } // fixPrimary sets the primary as read-write. -func fixPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { +func fixPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, logger *log.PrefixedLogger) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) + if topologyRecovery == nil { - _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixPrimary.", analysisEntry.AnalyzedInstanceAlias)) + message := fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixPrimary.", analysisEntry.AnalyzedInstanceAlias) + logger.Warning(message) + _ = AuditTopologyRecovery(topologyRecovery, message) return false, nil, err } - log.Infof("Analysis: %v, will fix primary to read-write %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Infof("Analysis: %v, will fix primary to read-write %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) // This has to be done in the end; whether successful or not, we should mark that the recovery is done. // So that after the active period passes, we are able to run other recoveries. defer func() { @@ -896,12 +924,13 @@ func fixPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (r analyzedTablet, err := inst.ReadTablet(analysisEntry.AnalyzedInstanceAlias) if err != nil { + logger.Errorf("Failed to read instance %s, aborting recovery", analysisEntry.AnalyzedInstanceAlias) return false, topologyRecovery, err } durabilityPolicy, err := inst.GetDurabilityPolicy(analyzedTablet.Keyspace) if err != nil { - log.Info("Could not read the durability policy for %v/%v", analyzedTablet.Keyspace, analyzedTablet.Shard) + logger.Info("Could not read the durability policy for %v/%v", analyzedTablet.Keyspace, analyzedTablet.Shard) return false, topologyRecovery, err } @@ -912,13 +941,15 @@ func fixPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (r } // fixReplica sets the replica as read-only and points it at the current primary. -func fixReplica(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { +func fixReplica(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, logger *log.PrefixedLogger) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) if topologyRecovery == nil { - _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixReplica.", analysisEntry.AnalyzedInstanceAlias)) + message := fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixReplica.", analysisEntry.AnalyzedInstanceAlias) + logger.Warning(message) + _ = AuditTopologyRecovery(topologyRecovery, message) return false, nil, err } - log.Infof("Analysis: %v, will fix replica %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Infof("Analysis: %v, will fix replica %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) // This has to be done in the end; whether successful or not, we should mark that the recovery is done. // So that after the active period passes, we are able to run other recoveries. defer func() { @@ -927,24 +958,25 @@ func fixReplica(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (r analyzedTablet, err := inst.ReadTablet(analysisEntry.AnalyzedInstanceAlias) if err != nil { + logger.Errorf("Failed to read instance %s, aborting recovery", analysisEntry.AnalyzedInstanceAlias) return false, topologyRecovery, err } primaryTablet, err := shardPrimary(analyzedTablet.Keyspace, analyzedTablet.Shard) if err != nil { - log.Info("Could not compute primary for %v/%v", analyzedTablet.Keyspace, analyzedTablet.Shard) + logger.Info("Could not compute primary for %v/%v", analyzedTablet.Keyspace, analyzedTablet.Shard) return false, topologyRecovery, err } durabilityPolicy, err := inst.GetDurabilityPolicy(analyzedTablet.Keyspace) if err != nil { - log.Info("Could not read the durability policy for %v/%v", analyzedTablet.Keyspace, analyzedTablet.Shard) + logger.Info("Could not read the durability policy for %v/%v", analyzedTablet.Keyspace, analyzedTablet.Shard) return false, topologyRecovery, err } err = setReadOnly(ctx, analyzedTablet) if err != nil { - log.Info("Could not set the tablet %v to readonly - %v", analysisEntry.AnalyzedInstanceAlias, err) + logger.Info("Could not set the tablet %v to readonly - %v", analysisEntry.AnalyzedInstanceAlias, err) return true, topologyRecovery, err } @@ -953,13 +985,15 @@ func fixReplica(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (r } // recoverErrantGTIDDetected changes the tablet type of a replica tablet that has errant GTIDs. -func recoverErrantGTIDDetected(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { +func recoverErrantGTIDDetected(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, logger *log.PrefixedLogger) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) if topologyRecovery == nil { - _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another recoverErrantGTIDDetected.", analysisEntry.AnalyzedInstanceAlias)) + message := fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another recoverErrantGTIDDetected.", analysisEntry.AnalyzedInstanceAlias) + logger.Warning(message) + _ = AuditTopologyRecovery(topologyRecovery, message) return false, nil, err } - log.Infof("Analysis: %v, will fix tablet %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Infof("Analysis: %v, will fix tablet %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) // This has to be done in the end; whether successful or not, we should mark that the recovery is done. // So that after the active period passes, we are able to run other recoveries. defer func() { @@ -973,13 +1007,13 @@ func recoverErrantGTIDDetected(ctx context.Context, analysisEntry *inst.Replicat primaryTablet, err := shardPrimary(analyzedTablet.Keyspace, analyzedTablet.Shard) if err != nil { - log.Info("Could not compute primary for %v/%v", analyzedTablet.Keyspace, analyzedTablet.Shard) + logger.Info("Could not compute primary for %v/%v", analyzedTablet.Keyspace, analyzedTablet.Shard) return false, topologyRecovery, err } durabilityPolicy, err := inst.GetDurabilityPolicy(analyzedTablet.Keyspace) if err != nil { - log.Info("Could not read the durability policy for %v/%v", analyzedTablet.Keyspace, analyzedTablet.Shard) + logger.Info("Could not read the durability policy for %v/%v", analyzedTablet.Keyspace, analyzedTablet.Shard) return false, topologyRecovery, err } diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index a08a436fc81..bfa006ae205 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -17,6 +17,7 @@ package logic import ( + "errors" "fmt" "strings" @@ -184,7 +185,7 @@ func AttemptRecoveryRegistration(analysisEntry *inst.ReplicationAnalysis, failIf _ = RegisterBlockedRecoveries(analysisEntry, recoveries) errMsg := fmt.Sprintf("AttemptRecoveryRegistration: tablet %+v has recently been promoted (by failover of %+v) and is in active period. It will not be failed over. You may acknowledge the failure on %+v (-c ack-instance-recoveries) to remove this blockage", analysisEntry.AnalyzedInstanceAlias, recoveries[0].AnalysisEntry.AnalyzedInstanceAlias, recoveries[0].AnalysisEntry.AnalyzedInstanceAlias) log.Errorf(errMsg) - return nil, fmt.Errorf(errMsg) + return nil, errors.New(errMsg) } } if failIfClusterInActiveRecovery { @@ -199,7 +200,7 @@ func AttemptRecoveryRegistration(analysisEntry *inst.ReplicationAnalysis, failIf _ = RegisterBlockedRecoveries(analysisEntry, recoveries) errMsg := fmt.Sprintf("AttemptRecoveryRegistration: keyspace %+v shard %+v has recently experienced a failover (of %+v) and is in active period. It will not be failed over again. You may acknowledge the failure on this cluster (-c ack-cluster-recoveries) or on %+v (-c ack-instance-recoveries) to remove this blockage", analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, recoveries[0].AnalysisEntry.AnalyzedInstanceAlias, recoveries[0].AnalysisEntry.AnalyzedInstanceAlias) log.Errorf(errMsg) - return nil, fmt.Errorf(errMsg) + return nil, errors.New(errMsg) } } if !failIfFailedInstanceInActiveRecovery { diff --git a/go/vt/vtorc/logic/topology_recovery_test.go b/go/vt/vtorc/logic/topology_recovery_test.go index d517649fd13..26aa0aca384 100644 --- a/go/vt/vtorc/logic/topology_recovery_test.go +++ b/go/vt/vtorc/logic/topology_recovery_test.go @@ -24,6 +24,7 @@ import ( "github.com/patrickmn/go-cache" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/log" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -129,7 +130,7 @@ func TestElectNewPrimaryPanic(t *testing.T) { defer cancel() ts = memorytopo.NewServer(ctx, "zone1") - recoveryAttempted, _, err := electNewPrimary(context.Background(), analysisEntry) + recoveryAttempted, _, err := electNewPrimary(context.Background(), analysisEntry, log.NewPrefixedLogger("prefix")) require.True(t, recoveryAttempted) require.Error(t, err) }