Skip to content

Commit

Permalink
sql: bind OriginTimestamp to table writer based on session var
Browse files Browse the repository at this point in the history
This patch follows the same strategy for plumbing the OriginID from a session
variable to every batch request header in the table writer, seen in #126394.

In a future commit, Logical Data Replication will use the origin timestamp
session variable to ensure all LDR writes are plumbed with their associated
OriginTimestamp.

Epic: none

Release note: none
  • Loading branch information
msbutler committed Sep 27, 2024
1 parent b5c65a0 commit d707369
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion pkg/sql/tablewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -153,6 +154,10 @@ type tableWriterBase struct {
// originID is an identifier for the cluster that originally wrote the data
// being written by the table writer during Logical Data Replication.
originID uint32
// originTimestamp is the timestamp the data written by this table writer were
// originally written with before being replicated via Logical Data
// Replication.
originTimestamp hlc.Timestamp
}

var maxBatchBytes = settings.RegisterByteSizeSetting(
Expand All @@ -173,10 +178,12 @@ func (tb *tableWriterBase) init(
tb.lockTimeout = 0
tb.deadlockTimeout = 0
tb.originID = 0
tb.originTimestamp = hlc.Timestamp{}
if evalCtx != nil {
tb.lockTimeout = evalCtx.SessionData().LockTimeout
tb.deadlockTimeout = evalCtx.SessionData().DeadlockTimeout
tb.originID = evalCtx.SessionData().OriginIDForLogicalDataReplication
tb.originTimestamp = evalCtx.SessionData().OriginTimestampForLogicalDataReplication
}
tb.forceProductionBatchSizes = evalCtx != nil && evalCtx.TestingKnobs.ForceProductionValues
tb.maxBatchSize = mutations.MaxBatchSize(tb.forceProductionBatchSizes)
Expand Down Expand Up @@ -277,7 +284,10 @@ func (tb *tableWriterBase) initNewBatch() {
tb.b.Header.LockTimeout = tb.lockTimeout
tb.b.Header.DeadlockTimeout = tb.deadlockTimeout
if tb.originID != 0 {
tb.b.Header.WriteOptions = &kvpb.WriteOptions{OriginID: tb.originID}
tb.b.Header.WriteOptions = &kvpb.WriteOptions{
OriginID: tb.originID,
OriginTimestamp: tb.originTimestamp,
}
}
}

Expand Down

0 comments on commit d707369

Please sign in to comment.