Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
132108: raft: add ability to determine whether we can safely de-fortify r=iskettaneh a=arulajmani

This patch starts saving a leaderMaxSupported timestamp on the
FortificationTracker, which corresponds to the largest timestamp that's
ever been returned to callers of LeadSupportUntil. Consequently, this is
then serves as an upper bound of a leader lease for a given term.

We then use this to ensure we don't regress a lease's expiration when
we cease to become the leader. In particular, we use it to compute
whether we can de-fortify or not. This then ensures we don't vote for
another peer when we shouldn't (as we aren't de-fortified), we don't
call an election when we shouldn't (which would be quite bad given
it de-fortifies the entire raft group), and we don't broadcast a
MsgDefortifyLeader to the entire raft group.

Informs #129098

Release note: None

132274: opt: add more tests for new opt rule r=DrewKimball a=DrewKimball

This commit adds a few additional tests for the `TryDecorrelateUnion` decorrelation rule.

Epic: none

Release note: None

132460: kvserver: update the send queue size metrics r=kvoli a=sumeerbhola

This is a tiny bug fix, in that all the plumbing work was already done, and just the update calls were forgotten.

Epic: CRDB-37515

Release note: None

132462: cluster-ui: simplify requests for tmj job status and trigger r=xinhaoz a=xinhaoz

This commit removes some unnecessarily complex refresh
logic for the tmj job status and reqs to trigger the job.
We now request both on a 10s interval.

In addition, when data is being refreshed and the refresh
button is disabled, its tooltip text will now display
"Data is currently refreshing".

Epic: CRDB-37558
Release note (ui change): In the new db and tables pages,
when cached data is being refreshed, the refresh button
will be disabled and its tooltip text will display,
"Data is currently refreshing".

132466: rac2: fixes to send-queue metrics maintained by rangeController r=kvoli a=sumeerbhola

Epic: CRDB-37515

Release note: None

132482: ui, tablemetadatacache: disable sorting on auto stats collection columns, expose batch size as a cluster setting r=xinhaoz a=xinhaoz

cluster-ui: disable sorting on auto stats columns

In the table overview page disable sorting on the
columns pertaining to auto stats collection. These
columns are not sortable.

Epic: CRDB-37558
Release note: None

------------------

tablemetadatacache: make the update job table batch size a CS

This commit makes the update job batch size adjustable
via a cluster setting.

Epic: none
Release note: None
`@xinhaoz`

Co-authored-by: Arul Ajmani <[email protected]>
Co-authored-by: Drew Kimball <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
Co-authored-by: Xin Hao Zhang <[email protected]>
  • Loading branch information
5 people committed Oct 11, 2024
7 parents 3ec1c38 + d3f4d01 + 5e49c5d + 0b0803b + d01ed4a + 673466e + b899e9f commit 232a855
Show file tree
Hide file tree
Showing 25 changed files with 655 additions and 427 deletions.
17 changes: 10 additions & 7 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2221,9 +2221,6 @@ func (rss *replicaSendStream) closeLocked(ctx context.Context) {
rss.returnSendTokens(ctx, rss.mu.tracker.UntrackAll(), true /* disconnect */)
rss.returnAllEvalTokensLocked(ctx)
rss.stopAttemptingToEmptySendQueueLocked(ctx, true)
if rss.mu.sendQueue.forceFlushScheduled {
rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Dec(1)
}
rss.mu.closed = true
}

Expand Down Expand Up @@ -2487,10 +2484,14 @@ func (rss *replicaSendStream) dequeueFromQueueAndSendLocked(
} else {
tokensNeeded = 0
}
sendQueueMetrics := rss.parent.parent.opts.RangeControllerMetrics.SendQueue
afterDeductedTokens := rss.mu.sendQueue.deductedForSchedulerTokens
if buildutil.CrdbTestBuild && beforeDeductedTokens < afterDeductedTokens {
panic(errors.AssertionFailedf("beforeDeductedTokens %s < afterDeductedTokens %s",
beforeDeductedTokens, afterDeductedTokens))
}
if beforeDeductedTokens > afterDeductedTokens {
sendQueueMetrics.DeductedForSchedulerBytes.Dec(int64(afterDeductedTokens - beforeDeductedTokens))
sendQueueMetrics := rss.parent.parent.opts.RangeControllerMetrics.SendQueue
sendQueueMetrics.DeductedForSchedulerBytes.Dec(int64(beforeDeductedTokens - afterDeductedTokens))
}
}
if tokensNeeded > 0 {
Expand Down Expand Up @@ -2542,8 +2543,10 @@ func (rss *replicaSendStream) changeToProbeLocked(ctx context.Context, now time.
func (rss *replicaSendStream) stopAttemptingToEmptySendQueueLocked(
ctx context.Context, disconnect bool,
) {
rss.mu.sendQueue.forceFlushScheduled = false
rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Dec(1)
if rss.mu.sendQueue.forceFlushScheduled {
rss.mu.sendQueue.forceFlushScheduled = false
rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Dec(1)
}
rss.stopAttemptingToEmptySendQueueViaWatcherLocked(ctx, disconnect)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ metrics type=send_queue
----
kvflowcontrol.send_queue.count : 2
kvflowcontrol.send_queue.bytes : 2097152
kvflowcontrol.send_queue.scheduled.force_flush : 0
kvflowcontrol.send_queue.scheduled.force_flush : 1
kvflowcontrol.send_queue.scheduled.deducted_bytes : 0
kvflowcontrol.send_queue.prevent.count : 0
kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue : 0
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3484,6 +3484,8 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
nanos := timeutil.Since(minMaxClosedTS.GoTime()).Nanoseconds()
s.metrics.ClosedTimestampMaxBehindNanos.Update(nanos)
}
s.cfg.KVFlowRangeControllerMetrics.SendQueue.SizeCount.Update(kvflowSendQueueSizeCount)
s.cfg.KVFlowRangeControllerMetrics.SendQueue.SizeBytes.Update(kvflowSendQueueSizeBytes)

if err := s.metrics.RecentReplicaCPUNanosPerSecond.Rotate(); err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ type raft struct {
config quorum.Config
trk tracker.ProgressTracker
electionTracker tracker.ElectionTracker
fortificationTracker tracker.FortificationTracker
fortificationTracker *tracker.FortificationTracker
lazyReplication bool

state pb.StateType
Expand Down Expand Up @@ -455,7 +455,7 @@ func newRaft(c *Config) *raft {
lastID := r.raftLog.lastEntryID()

r.electionTracker = tracker.MakeElectionTracker(&r.config)
r.fortificationTracker = tracker.MakeFortificationTracker(&r.config, r.storeLiveness)
r.fortificationTracker = tracker.NewFortificationTracker(&r.config, r.storeLiveness, r.logger)

cfg, progressMap, err := confchange.Restore(confchange.Changer{
Config: quorum.MakeEmptyConfig(),
Expand Down
7 changes: 7 additions & 0 deletions pkg/raft/rafttest/interaction_env_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
// Explanation:
// 1 (from_store) grants support for 2 (for_store) at a higher epoch.
err = env.handleGrantSupport(t, d)
case "support-expired":
// Configures whether a store considers its leader's support to be expired
// or not.
//
// Example:
// support-expired 1 [reset]
err = env.handleSupportExpired(t, d)
case "print-fortification-state":
// Prints the fortification state being tracked by a raft leader. Empty on a
// follower.
Expand Down
33 changes: 29 additions & 4 deletions pkg/raft/rafttest/interaction_env_handler_store_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type livenessFabric struct {
// state is a 2D array, where state[i][j] represents store i's support for
// store j. Stores are 1-indexed.
state [][]livenessEntry

// leadSupportExpired tracks whether a store considers its leadSupportUntil
// to be expired or not.
leadSupportExpired []bool
}

// newLivenessFabric initializes and returns a livenessFabric.
Expand All @@ -44,8 +48,12 @@ func newLivenessFabric() *livenessFabric {
state := make([][]livenessEntry, 1)
state[0] = make([]livenessEntry, 1)
state[0][0] = initLivenessEntry
// Ditto for leadSupportExpired.
leadSupportExpired := make([]bool, 1)
leadSupportExpired[0] = false
return &livenessFabric{
state: state,
state: state,
leadSupportExpired: leadSupportExpired,
}
}

Expand All @@ -71,6 +79,7 @@ func (l *livenessFabric) addNode() {
// Finally, initialize the liveness entry for the node we've just added. It'll
// start off with epoch 1 and support itself.
l.state[len(l.state)-1][len(l.state)-1] = initLivenessEntry
l.leadSupportExpired = append(l.leadSupportExpired, false)
}

func (l *livenessFabric) String() string {
Expand Down Expand Up @@ -140,6 +149,10 @@ func (s *storeLiveness) SupportFromEnabled() bool {

// SupportExpired implements the StoreLiveness interface.
func (s *storeLiveness) SupportExpired(ts hlc.Timestamp) bool {
if s.livenessFabric.leadSupportExpired[s.nodeID] {
return true
}
// If not configured explicitly, infer from the supplied timestamp.
switch ts {
case hlc.Timestamp{}:
return true
Expand Down Expand Up @@ -175,9 +188,9 @@ func (env *InteractionEnv) handleWithdrawSupport(t *testing.T, d datadriven.Test
return err
}

// handleGrantSupport handles the case where where a store grants support for
// another store. To enable this, the store for whom support is being granted
// must seek support at a higher epoch.
// handleGrantSupport handles the case where a store grants support for another
// store. To enable this, the store for whom support is being granted must seek
// support at a higher epoch.
func (env *InteractionEnv) handleGrantSupport(t *testing.T, d datadriven.TestData) error {
fromStore := nthAsInt(t, d, 0)
forStore := nthAsInt(t, d, 1)
Expand All @@ -195,3 +208,15 @@ func (env *InteractionEnv) handleGrantSupport(t *testing.T, d datadriven.TestDat
_, err := env.Output.WriteString(env.Fabric.String())
return err
}

// handleSupportExpired is a testing hook to configure whether a store
// considers its leadSupportUntil expired or not.
func (env *InteractionEnv) handleSupportExpired(t *testing.T, d datadriven.TestData) error {
idx := firstAsInt(t, d)
if d.HasArg("reset") {
env.Fabric.leadSupportExpired[idx] = false
} else {
env.Fabric.leadSupportExpired[idx] = true
}
return nil
}
4 changes: 2 additions & 2 deletions pkg/raft/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func getStatus(r *raft) Status {
// NOTE: we assign to LeadSupportUntil even if RaftState is not currently
// StateLeader. The replica may have been the leader and stepped down to a
// follower before its lead support ran out.
s.LeadSupportUntil = r.fortificationTracker.LeadSupportUntil()
s.LeadSupportUntil = r.fortificationTracker.LeadSupportUntil(r.state)
return s
}

Expand All @@ -156,7 +156,7 @@ func getSparseStatus(r *raft) SparseStatus {
func getLeadSupportStatus(r *raft) LeadSupportStatus {
var s LeadSupportStatus
s.BasicStatus = getBasicStatus(r)
s.LeadSupportUntil = r.fortificationTracker.LeadSupportUntil()
s.LeadSupportUntil = r.fortificationTracker.LeadSupportUntil(r.state)
return s
}

Expand Down
Loading

0 comments on commit 232a855

Please sign in to comment.