Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: plumb LDR originTimestamp from session var to TableWriter #131456

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,9 @@ func applyOverrides(o sessiondata.InternalExecutorOverride, sd *sessiondata.Sess
if o.OriginIDForLogicalDataReplication != 0 {
sd.OriginIDForLogicalDataReplication = o.OriginIDForLogicalDataReplication
}
if o.OriginTimestampForLogicalDataReplication.IsSet() {
sd.OriginTimestampForLogicalDataReplication = o.OriginTimestampForLogicalDataReplication
}
if o.PlanCacheMode != nil {
sd.PlanCacheMode = *o.PlanCacheMode
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sessiondata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/sql/sem/catconstants",
"//pkg/sql/sessiondatapb",
"//pkg/util/duration",
"//pkg/util/hlc",
"//pkg/util/intsets",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/sessiondata/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package sessiondata
import (
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// InternalExecutorOverride is used by the Executor interface
Expand Down Expand Up @@ -75,6 +76,12 @@ type InternalExecutorOverride struct {
// write of unspecified origin, and 2+ are reserved to identify remote writes
// from specific clusters.
OriginIDForLogicalDataReplication uint32
// OriginTimestampForLogicalDataReplication is the mvcc timestamp the data
// written in this session were originally written with before being
// replicated via Logical Data Replication. The creator of this internal
// executor session is responsible for ensuring that every row it writes via
// the internal executor had this origin timestamp.
OriginTimestampForLogicalDataReplication hlc.Timestamp
// PlanCacheMode, if set, overrides the plan_cache_mode session variable.
PlanCacheMode *sessiondatapb.PlanCacheMode
// GrowStackSize, if true, indicates that the connExecutor goroutine stack
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sessiondatapb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ proto_library(
deps = [
"//pkg/sql/lex:lex_proto",
"//pkg/util/duration:duration_proto",
"//pkg/util/hlc:hlc_proto",
"//pkg/util/timeutil/pgdate:pgdate_proto",
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
"@com_google_protobuf//:duration_proto",
Expand All @@ -53,6 +54,7 @@ go_proto_library(
deps = [
"//pkg/sql/lex",
"//pkg/util/duration",
"//pkg/util/hlc",
"//pkg/util/timeutil/pgdate",
"@com_github_gogo_protobuf//gogoproto",
],
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/sessiondatapb/local_only_session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package cockroach.sql.sessiondatapb;
option go_package = "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb";

import "gogoproto/gogo.proto";
import "util/hlc/timestamp.proto";

// LocalOnlySessionData contains the serializable components of session
// parameters that only influence execution on the gateway nodes.
Expand Down Expand Up @@ -548,6 +549,10 @@ message LocalOnlySessionData {
// OptimizerPushLimitIntoProjectFilteredScan, when true, indicates that the
// optimizer should push limit expressions into projects of filtered scans.
bool optimizer_push_limit_into_project_filtered_scan = 139;
// OriginTimestampForLogicalDataReplication is the mvcc timestamp the data
// written in this session were originally written with before being
// replicated via Logical Data Replication.
util.hlc.Timestamp origin_timestamp_for_logical_data_replication = 140 [(gogoproto.nullable) = false];

///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/tablewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -153,6 +154,10 @@ type tableWriterBase struct {
// originID is an identifier for the cluster that originally wrote the data
// being written by the table writer during Logical Data Replication.
originID uint32
// originTimestamp is the timestamp the data written by this table writer were
// originally written with before being replicated via Logical Data
// Replication.
originTimestamp hlc.Timestamp
}

var maxBatchBytes = settings.RegisterByteSizeSetting(
Expand All @@ -173,10 +178,12 @@ func (tb *tableWriterBase) init(
tb.lockTimeout = 0
tb.deadlockTimeout = 0
tb.originID = 0
tb.originTimestamp = hlc.Timestamp{}
if evalCtx != nil {
tb.lockTimeout = evalCtx.SessionData().LockTimeout
tb.deadlockTimeout = evalCtx.SessionData().DeadlockTimeout
tb.originID = evalCtx.SessionData().OriginIDForLogicalDataReplication
tb.originTimestamp = evalCtx.SessionData().OriginTimestampForLogicalDataReplication
}
tb.forceProductionBatchSizes = evalCtx != nil && evalCtx.TestingKnobs.ForceProductionValues
tb.maxBatchSize = mutations.MaxBatchSize(tb.forceProductionBatchSizes)
Expand Down Expand Up @@ -277,7 +284,10 @@ func (tb *tableWriterBase) initNewBatch() {
tb.b.Header.LockTimeout = tb.lockTimeout
tb.b.Header.DeadlockTimeout = tb.deadlockTimeout
if tb.originID != 0 {
tb.b.Header.WriteOptions = &kvpb.WriteOptions{OriginID: tb.originID}
tb.b.Header.WriteOptions = &kvpb.WriteOptions{
OriginID: tb.originID,
OriginTimestamp: tb.originTimestamp,
}
}
}

Expand Down
Loading