Skip to content

Commit

Permalink
Merge pull request #134920 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.3-134888

release-24.3: crosscluster/logical: drop leases at each checkpoint
  • Loading branch information
msbutler authored Nov 12, 2024
2 parents a2408f7 + 5f58c3d commit 9ab9138
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1452,6 +1452,7 @@ func (m mockBatchHandler) GetLastRow() cdcevent.Row { return cdcevent
func (m mockBatchHandler) SetSyntheticFailurePercent(_ uint32) {}
func (m mockBatchHandler) Close(context.Context) {}
func (m mockBatchHandler) ReportMutations(_ *stats.Refresher) {}
func (m mockBatchHandler) ReleaseLeases(_ context.Context) {}

type mockDLQ int

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,10 @@ func (lrw *logicalReplicationWriterProcessor) checkpoint(

for _, p := range lrw.bh {
p.ReportMutations(lrw.FlowCtx.Cfg.StatsRefresher)
// We should drop our leases and re-acquire new ones at next flush, to avoid
// holding leases continually until they expire; re-acquire is cheap when it
// can be served from the cache so we can just stop these every checkpoint.
p.ReleaseLeases(ctx)
}
lrw.metrics.CheckpointEvents.Inc(1)
lrw.debug.RecordCheckpoint(lrw.frontier.Frontier().GoTime())
Expand Down Expand Up @@ -1130,6 +1134,7 @@ type BatchHandler interface {
GetLastRow() cdcevent.Row
SetSyntheticFailurePercent(uint32)
ReportMutations(*stats.Refresher)
ReleaseLeases(context.Context)
Close(context.Context)
}

Expand Down
31 changes: 22 additions & 9 deletions pkg/ccl/crosscluster/logical/lww_kv_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,16 @@ func (p *kvRowProcessor) ReportMutations(refresher *stats.Refresher) {
}
}

// ReleaseLeases releases all held leases.
func (p *kvRowProcessor) ReleaseLeases(ctx context.Context) {
for _, w := range p.writers {
if w.leased != nil {
w.leased.Release(ctx)
w.leased = nil
}
}
}

// maxRefreshCount is the maximum number of times we will retry a KV batch that has failed with a
// ConditionFailedError with HadNewerOriginTimetamp=true.
const maxRefreshCount = 10
Expand Down Expand Up @@ -319,22 +329,23 @@ func (p *kvRowProcessor) SetSyntheticFailurePercent(rate uint32) {
}

func (p *kvRowProcessor) Close(ctx context.Context) {
for _, w := range p.writers {
w.leased.Release(ctx)
}
p.ReleaseLeases(ctx)
}

func (p *kvRowProcessor) getWriter(
ctx context.Context, id descpb.ID, ts hlc.Timestamp,
) (*kvTableWriter, error) {
w, ok := p.writers[id]
if ok {
// If the lease is still valid, just use the writer.
if w.leased.Expiration(ctx).After(ts) {
return w, nil
if w.leased != nil {
// If the lease is still valid, just use the writer.
if w.leased.Expiration(ctx).After(ts) {
return w, nil
}
// The lease is invalid; we'll be getting a new one so release this one.
w.leased.Release(ctx)
w.leased = nil
}
// The lease is invalid; we'll be getting a new one so release this one.
w.leased.Release(ctx)
}

l, err := p.cfg.LeaseManager.(*lease.Manager).Acquire(ctx, ts, id)
Expand All @@ -344,7 +355,7 @@ func (p *kvRowProcessor) getWriter(

// If the new lease just so happened to be the same version, we can just swap
// the lease in the existing writer.
if ok && l.Underlying().GetVersion() == w.leased.Underlying().GetVersion() {
if ok && l.Underlying().GetVersion() == w.v {
w.leased = l
return w, nil
}
Expand All @@ -365,6 +376,7 @@ func (p *kvRowProcessor) getWriter(
// it populates should commit no later than the expiration of said lease.
type kvTableWriter struct {
leased lease.LeasedDescriptor
v descpb.DescriptorVersion
newVals, oldVals []tree.Datum
ru row.Updater
ri row.Inserter
Expand Down Expand Up @@ -406,6 +418,7 @@ func newKVTableWriter(

return &kvTableWriter{
leased: leased,
v: leased.Underlying().GetVersion(),
oldVals: make([]tree.Datum, len(readCols)),
newVals: make([]tree.Datum, len(writeCols)),
ri: ri,
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/crosscluster/logical/lww_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type RowProcessor interface {
ProcessRow(context.Context, isql.Txn, roachpb.KeyValue, roachpb.Value) (batchStats, error)
GetLastRow() cdcevent.Row
SetSyntheticFailurePercent(uint32)
ReportMutations(*stats.Refresher)
Close(context.Context)
}

Expand Down Expand Up @@ -258,11 +257,15 @@ func makeSQLProcessorFromQuerier(
}, nil
}

// ReportMutations implements the RowProcessor interface, but is a no-op for
// ReportMutations implements the BatchHandler interface, but is a no-op for
// sqlRowProcessor because its mutations are already reported by the queries it
// runs when they are run.
func (sqlRowProcessor) ReportMutations(_ *stats.Refresher) {}

// ReleaseLeases implements the BatchHandler interface but is a no-op since each
// query does this itself.
func (sqlRowProcessor) ReleaseLeases(_ context.Context) {}

func (*sqlRowProcessor) Close(ctx context.Context) {}

var errInjected = errors.New("injected synthetic error")
Expand Down

0 comments on commit 9ab9138

Please sign in to comment.