diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index d4c9f6d383d4..c641d8f195e2 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -932,6 +932,9 @@ func applyOverrides(o sessiondata.InternalExecutorOverride, sd *sessiondata.Sess if o.OriginIDForLogicalDataReplication != 0 { sd.OriginIDForLogicalDataReplication = o.OriginIDForLogicalDataReplication } + if o.OriginTimestampForLogicalDataReplication.IsSet() { + sd.OriginTimestampForLogicalDataReplication = o.OriginTimestampForLogicalDataReplication + } if o.PlanCacheMode != nil { sd.PlanCacheMode = *o.PlanCacheMode } diff --git a/pkg/sql/sessiondata/BUILD.bazel b/pkg/sql/sessiondata/BUILD.bazel index 2854b549f8a6..6cfd22a78fe0 100644 --- a/pkg/sql/sessiondata/BUILD.bazel +++ b/pkg/sql/sessiondata/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/sql/sem/catconstants", "//pkg/sql/sessiondatapb", "//pkg/util/duration", + "//pkg/util/hlc", "//pkg/util/intsets", "//pkg/util/syncutil", "//pkg/util/timeutil", diff --git a/pkg/sql/sessiondata/internal.go b/pkg/sql/sessiondata/internal.go index 0d504a74ff3c..e7575e8703a0 100644 --- a/pkg/sql/sessiondata/internal.go +++ b/pkg/sql/sessiondata/internal.go @@ -8,6 +8,7 @@ package sessiondata import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // InternalExecutorOverride is used by the Executor interface @@ -70,6 +71,12 @@ type InternalExecutorOverride struct { // write of unspecified origin, and 2+ are reserved to identify remote writes // from specific clusters. OriginIDForLogicalDataReplication uint32 + // OriginTimestampForLogicalDataReplication is the mvcc timestamp the data + // written in this session were originally written with before being + // replicated via Logical Data Replication. The creator of this internal + // executor session is responsible for ensuring that every row it writes via + // the internal executor had this origin timestamp. + OriginTimestampForLogicalDataReplication hlc.Timestamp // PlanCacheMode, if set, overrides the plan_cache_mode session variable. PlanCacheMode *sessiondatapb.PlanCacheMode // GrowStackSize, if true, indicates that the connExecutor goroutine stack diff --git a/pkg/sql/sessiondatapb/BUILD.bazel b/pkg/sql/sessiondatapb/BUILD.bazel index 4b204ade394e..55116e7c2565 100644 --- a/pkg/sql/sessiondatapb/BUILD.bazel +++ b/pkg/sql/sessiondatapb/BUILD.bazel @@ -37,6 +37,7 @@ proto_library( deps = [ "//pkg/sql/lex:lex_proto", "//pkg/util/duration:duration_proto", + "//pkg/util/hlc:hlc_proto", "//pkg/util/timeutil/pgdate:pgdate_proto", "@com_github_gogo_protobuf//gogoproto:gogo_proto", "@com_google_protobuf//:duration_proto", @@ -53,6 +54,7 @@ go_proto_library( deps = [ "//pkg/sql/lex", "//pkg/util/duration", + "//pkg/util/hlc", "//pkg/util/timeutil/pgdate", "@com_github_gogo_protobuf//gogoproto", ], diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index 8782c0a3fe20..76b8103522da 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -8,6 +8,7 @@ package cockroach.sql.sessiondatapb; option go_package = "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"; import "gogoproto/gogo.proto"; +import "util/hlc/timestamp.proto"; // LocalOnlySessionData contains the serializable components of session // parameters that only influence execution on the gateway nodes. @@ -543,6 +544,10 @@ message LocalOnlySessionData { // OptimizerPushLimitIntoProjectFilteredScan, when true, indicates that the // optimizer should push limit expressions into projects of filtered scans. bool optimizer_push_limit_into_project_filtered_scan = 139; + // OriginTimestampForLogicalDataReplication is the mvcc timestamp the data + // written in this session were originally written with before being + // replicated via Logical Data Replication. + util.hlc.Timestamp origin_timestamp_for_logical_data_replication = 140 [(gogoproto.nullable) = false]; /////////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index 3fb4c198e17e..38822a548262 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -148,6 +149,10 @@ type tableWriterBase struct { // originID is an identifier for the cluster that originally wrote the data // being written by the table writer during Logical Data Replication. originID uint32 + // originTimestamp is the timestamp the data written by this table writer were + // originally written with before being replicated via Logical Data + // Replication. + originTimestamp hlc.Timestamp } var maxBatchBytes = settings.RegisterByteSizeSetting( @@ -168,10 +173,12 @@ func (tb *tableWriterBase) init( tb.lockTimeout = 0 tb.deadlockTimeout = 0 tb.originID = 0 + tb.originTimestamp = hlc.Timestamp{} if evalCtx != nil { tb.lockTimeout = evalCtx.SessionData().LockTimeout tb.deadlockTimeout = evalCtx.SessionData().DeadlockTimeout tb.originID = evalCtx.SessionData().OriginIDForLogicalDataReplication + tb.originTimestamp = evalCtx.SessionData().OriginTimestampForLogicalDataReplication } tb.forceProductionBatchSizes = evalCtx != nil && evalCtx.TestingKnobs.ForceProductionValues tb.maxBatchSize = mutations.MaxBatchSize(tb.forceProductionBatchSizes) @@ -272,7 +279,10 @@ func (tb *tableWriterBase) initNewBatch() { tb.b.Header.LockTimeout = tb.lockTimeout tb.b.Header.DeadlockTimeout = tb.deadlockTimeout if tb.originID != 0 { - tb.b.Header.WriteOptions = &kvpb.WriteOptions{OriginID: tb.originID} + tb.b.Header.WriteOptions = &kvpb.WriteOptions{ + OriginID: tb.originID, + OriginTimestamp: tb.originTimestamp, + } } }