diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index 2c004d91e89e..6328aec91c75 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -153,6 +154,10 @@ type tableWriterBase struct { // originID is an identifier for the cluster that originally wrote the data // being written by the table writer during Logical Data Replication. originID uint32 + // originTimestamp is the timestamp the data written by this table writer were + // originally written with before being replicated via Logical Data + // Replication. + originTimestamp hlc.Timestamp } var maxBatchBytes = settings.RegisterByteSizeSetting( @@ -173,10 +178,12 @@ func (tb *tableWriterBase) init( tb.lockTimeout = 0 tb.deadlockTimeout = 0 tb.originID = 0 + tb.originTimestamp = hlc.Timestamp{} if evalCtx != nil { tb.lockTimeout = evalCtx.SessionData().LockTimeout tb.deadlockTimeout = evalCtx.SessionData().DeadlockTimeout tb.originID = evalCtx.SessionData().OriginIDForLogicalDataReplication + tb.originTimestamp = evalCtx.SessionData().OriginTimestampForLogicalDataReplication } tb.forceProductionBatchSizes = evalCtx != nil && evalCtx.TestingKnobs.ForceProductionValues tb.maxBatchSize = mutations.MaxBatchSize(tb.forceProductionBatchSizes) @@ -277,7 +284,10 @@ func (tb *tableWriterBase) initNewBatch() { tb.b.Header.LockTimeout = tb.lockTimeout tb.b.Header.DeadlockTimeout = tb.deadlockTimeout if tb.originID != 0 { - tb.b.Header.WriteOptions = &kvpb.WriteOptions{OriginID: tb.originID} + tb.b.Header.WriteOptions = &kvpb.WriteOptions{ + OriginID: tb.originID, + OriginTimestamp: tb.originTimestamp, + } } }