Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…achdb#133683 cockroachdb#133693

133347: crossclsuter/logical: add settings/stats to ldr ingest chunking r=dt a=dt



133608: schemachanger: force prod values in expensive test r=rafiss a=rafiss

fixes cockroachdb#133437
Release note: None

133681: roachtest: minor fixes in rebalance/by-load test r=arulajmani a=kvoli

`%` was not escaped, causing it to be substituted with values which
were meant to go later.

e.g., from:

```
node 0 has core count normalized CPU utilization ts datapoint not in [0%!,(float64=1.4920845083839689)100{[{{%!](string=cr.node.sys.cpu.combined.percent-normalized) %!]
...
```

To

```
node idx 0 has core count normalized CPU utilization ts datapoint not in [0%,100%]
...
```

---

The `rebalance/by-load/*` roachtests compare the CPU of nodes and assert
that the distribution of node cpu is bounded +- 20%. The previous metric:

```
sys.cpu.combined.percent_normalized
```

Would occasionally over-report the CPU, as greater than 100% (>1.0),
which is impossible. Use the host CPU instead, which will look at the
machines CPU utilization, rather than any cockroach processes.

```
sys.cpu.host.combined.percent_normalized
```

Part of: cockroachdb#133004
Part of: cockroachdb#133054
Part of: cockroachdb#132019
Part of: cockroachdb#133223
Part of: cockroachdb#132633
Release note: None

133683: license: don't hit EnvOrDefaultInt64 in hot path r=fqazi,mgartner a=tbg

Saves 0.3%cpu on sysbench.

Fixes cockroachdb#133088.

Release note: None
Epic: None


133693: kvserver: deflake TestSnapshotsToDrainingNodes r=kvoli a=arulajmani

This test was making tight assertions about the size of the snapshot that was sent. To do so, it was trying to reimplement the actual snapshot sending logic in `kvBatchSnapshotStrategy.Send()`. So these tight assertions weren't of much use -- they were asserting that we were correctly re-implementing `kvBatchSnapshotStrategy.Send()` in `getExpectedSnapshotSizeBytes`. We weren't, as evidenced by some rare flakes.

This patch loosens assertions to deflake the test.

Closes cockroachdb#133517
Release note: None

Co-authored-by: David Taylor <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Austen McClernon <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
6 people committed Oct 29, 2024
6 parents c14a15d + 2c3e186 + b2c5ad3 + 2d6c13f + 089eb3c + e38cf40 commit ab2f9cf
Show file tree
Hide file tree
Showing 15 changed files with 125 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package logical
import (
"context"
"fmt"
"runtime/pprof"
"slices"
"time"

Expand Down Expand Up @@ -57,6 +58,22 @@ var flushBatchSize = settings.RegisterIntSetting(
settings.NonNegativeInt,
)

var writerWorkers = settings.RegisterIntSetting(
settings.ApplicationLevel,
"logical_replication.consumer.flush_worker_per_proc",
"the maximum number of workers per processor to use to flush each batch",
32,
settings.NonNegativeInt,
)

var minChunkSize = settings.RegisterIntSetting(
settings.ApplicationLevel,
"logical_replication.consumer.flush_chunk_size",
"the minimum number of row updates to pass to each flush worker",
64,
settings.NonNegativeInt,
)

// logicalReplicationWriterProcessor consumes a cross-cluster replication stream
// by decoding kvs in it to logical changes and applying them by executing DMLs.
type logicalReplicationWriterProcessor struct {
Expand Down Expand Up @@ -107,6 +124,13 @@ var (

const logicalReplicationWriterProcessorName = "logical-replication-writer-processor"

var batchSizeSetting = settings.RegisterByteSizeSetting(
settings.ApplicationLevel,
"logical_replication.stream_batch_size",
"target batch size for logical replication stream",
1<<20,
)

func newLogicalReplicationWriterProcessor(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand Down Expand Up @@ -138,7 +162,7 @@ func newLogicalReplicationWriterProcessor(
tableID: descpb.ID(dstTableID),
}
}
bhPool := make([]BatchHandler, maxWriterWorkers)
bhPool := make([]BatchHandler, writerWorkers.Get(&flowCtx.Cfg.Settings.SV))
for i := range bhPool {
var rp RowProcessor
var err error
Expand Down Expand Up @@ -315,6 +339,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {
lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes ||
lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardAllDeletes),
streamclient.WithDiff(true),
streamclient.WithBatchSize(batchSizeSetting.Get(&lrw.FlowCtx.Cfg.Settings.SV)),
)
if err != nil {
lrw.MoveToDrainingAndLogError(errors.Wrapf(err, "subscribing to partition from %s", redactedAddr))
Expand All @@ -336,10 +361,12 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {
})
lrw.workerGroup.GoCtx(func(ctx context.Context) error {
defer close(lrw.checkpointCh)
if err := lrw.consumeEvents(ctx); err != nil {
log.Infof(lrw.Ctx(), "consumer completed. Error: %s", err)
lrw.sendError(errors.Wrap(err, "consume events"))
}
pprof.Do(ctx, pprof.Labels("proc", fmt.Sprintf("%d", lrw.ProcessorID)), func(ctx context.Context) {
if err := lrw.consumeEvents(ctx); err != nil {
log.Infof(lrw.Ctx(), "consumer completed. Error: %s", err)
lrw.sendError(errors.Wrap(err, "consume events"))
}
})
return nil
})
}
Expand Down Expand Up @@ -595,8 +622,6 @@ func filterRemaining(kvs []streampb.StreamEvent_KV) []streampb.StreamEvent_KV {
return remaining[:j]
}

const maxWriterWorkers = 32

// flushBuffer processes some or all of the events in the passed buffer, and
// zeros out each event in the passed buffer for which it successfully completed
// processing either by applying it or by sending it to a DLQ. If mustProcess is
Expand Down Expand Up @@ -649,15 +674,20 @@ func (lrw *logicalReplicationWriterProcessor) flushBuffer(
return a.KeyValue.Value.Timestamp.Compare(b.KeyValue.Value.Timestamp)
})

const minChunkSize = 64
chunkSize := max((len(kvs)/len(lrw.bh))+1, minChunkSize)
minChunk := 64
if lrw.FlowCtx != nil {
minChunk = int(minChunkSize.Get(&lrw.FlowCtx.Cfg.Settings.SV))
}

chunkSize := max((len(kvs)/len(lrw.bh))+1, minChunk)

perChunkStats := make([]flushStats, len(lrw.bh))

todo := kvs
g := ctxgroup.WithContext(ctx)
for worker := range lrw.bh {
if len(todo) == 0 {
perChunkStats = perChunkStats[:worker]
break
}
// The chunk should end after the first new key after chunk size.
Expand Down Expand Up @@ -779,6 +809,9 @@ func (lrw *logicalReplicationWriterProcessor) flushChunk(
) (flushStats, error) {
batchSize := lrw.getBatchSize()

lrw.debug.RecordChunkStart()
defer lrw.debug.RecordChunkComplete()

var stats flushStats
// TODO: The batching here in production would need to be much
// smarter. Namely, we don't want to include updates to the
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/crosscluster/producer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,9 @@ func streamPartition(
if len(spec.Spans) == 0 {
return nil, errors.AssertionFailedf("expected at least one span, got none")
}
spec.Config.BatchByteSize = defaultBatchSize
if spec.Config.BatchByteSize == 0 {
spec.Config.BatchByteSize = defaultBatchSize
}
spec.Config.MinCheckpointFrequency = crosscluster.StreamReplicationMinCheckpointFrequency.Get(&evalCtx.Settings.SV)

execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/crosscluster/producer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/repstream"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -172,6 +173,13 @@ func getUDTs(
return typeDescriptors, nil, nil
}

var useStreaksInLDR = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"logical_replication.producer.group_adjacent_spans.enabled",
"controls whether to attempt adjacent spans in the same stream",
true,
)

func (r *replicationStreamManagerImpl) PlanLogicalReplication(
ctx context.Context, req streampb.LogicalReplicationPlanRequest,
) (*streampb.ReplicationStreamSpec, error) {
Expand Down Expand Up @@ -199,7 +207,7 @@ func (r *replicationStreamManagerImpl) PlanLogicalReplication(
return nil, err
}
}
spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans)
spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans, useStreaksInLDR.Get(&r.evalCtx.Settings.SV))
if err != nil {
return nil, err
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/ccl/crosscluster/producer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func getPhysicalReplicationStreamSpec(
if j.Status() != jobs.StatusRunning {
return nil, jobIsNotRunningError(jobID, j.Status(), "create stream spec")
}
return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans)
return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans, true)

}

Expand All @@ -293,16 +293,21 @@ func buildReplicationStreamSpec(
tenantID roachpb.TenantID,
forSpanConfigs bool,
targetSpans roachpb.Spans,
useStreaks bool,
) (*streampb.ReplicationStreamSpec, error) {
jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext)

// Partition the spans with SQLPlanner
dsp := jobExecCtx.DistSQLPlanner()
noLoc := roachpb.Locality{}
oracle := kvfollowerreadsccl.NewBulkOracle(
dsp.ReplicaOracleConfig(evalCtx.Locality), noLoc, kvfollowerreadsccl.StreakConfig{
var streaks kvfollowerreadsccl.StreakConfig
if useStreaks {
streaks = kvfollowerreadsccl.StreakConfig{
Min: 10, SmallPlanMin: 3, SmallPlanThreshold: 3, MaxSkew: 0.95,
},
}
}
oracle := kvfollowerreadsccl.NewBulkOracle(
dsp.ReplicaOracleConfig(evalCtx.Locality), noLoc, streaks,
)

planCtx := dsp.NewPlanningCtxWithOracle(
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/crosscluster/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ type subscribeConfig struct {
// NB: Callers should note that initial scan results will not
// contain a diff.
withDiff bool

// batchByteSize requests the producer emit batches up to the specified size.
batchByteSize int64
}

type SubscribeOption func(*subscribeConfig)
Expand All @@ -150,6 +153,13 @@ func WithDiff(enableDiff bool) SubscribeOption {
}
}

// WithBatchSize requests the producer emit batches up to the specified size.
func WithBatchSize(bytes int64) SubscribeOption {
return func(cfg *subscribeConfig) {
cfg.batchByteSize = bytes
}
}

// Topology is a configuration of stream partitions. These are particular to a
// stream. It specifies the number and addresses of partitions of the stream.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func (p *partitionedStreamClient) Subscribe(
sps.WithDiff = cfg.withDiff
sps.WithFiltering = cfg.withFiltering
sps.Type = streampb.ReplicationType_PHYSICAL
sps.Config.BatchByteSize = cfg.batchByteSize
if p.logical {
sps.Type = streampb.ReplicationType_LOGICAL
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/zip_table_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,8 @@ var zipInternalTablesPerNode = DebugZipTableRegistry{
"flush_bytes",
"flush_batches",
"last_flush_time",
"chunks_running",
"chunks_done",
"last_kvs_done",
"last_kvs_todo",
"last_batches",
Expand Down
6 changes: 3 additions & 3 deletions pkg/cmd/roachtest/tests/rebalance_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func makeStoreCPUFn(
tsQueries := make([]tsQuery, numNodes)
for i := range tsQueries {
tsQueries[i] = tsQuery{
name: "cr.node.sys.cpu.combined.percent-normalized",
name: "cr.node.sys.cpu.host.combined.percent-normalized",
queryType: total,
sources: []string{fmt.Sprintf("%d", i+1)},
tenantID: roachpb.SystemTenantID,
Expand Down Expand Up @@ -376,8 +376,8 @@ func makeStoreCPUFn(
// as much to avoid any surprises.
if cpu < 0 || cpu > 1 {
return nil, errors.Newf(
"node %d has core count normalized CPU utilization ts datapoint "+
"not in [0%,100%] (impossible!): %f [resp=%+v]", node, cpu, resp)
"node idx %d has core count normalized CPU utilization ts datapoint "+
"not in [0\\%,100\\%] (impossible!): %v [resp=%+v]", node, cpu, resp)
}

nodeIdx := node * storesPerNode
Expand Down
62 changes: 15 additions & 47 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,19 +924,6 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
// below.
ltk.storeKnobs.DisableRaftSnapshotQueue = true

// Synchronize on the moment before the snapshot gets sent so we can measure
// the state at that time & gather metrics.
blockUntilSnapshotSendCh := make(chan struct{})
blockSnapshotSendCh := make(chan struct{})
ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) {
close(blockUntilSnapshotSendCh)
select {
case <-blockSnapshotSendCh:
case <-time.After(10 * time.Second):
return
}
}

tc := testcluster.StartTestCluster(
t, 2, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
Expand Down Expand Up @@ -1013,46 +1000,27 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
return nil
})

// Wait until the snapshot is about to be sent before calculating what the
// snapshot size should be. This allows our snapshot measurement to account
// for any state changes that happen between calling AddNonVoters and the
// snapshot being sent.
<-blockUntilSnapshotSendCh
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl)
require.NoError(t, err)

close(blockSnapshotSendCh)
// AddNonVoter will return after the snapshot is sent. Wait for it to do so
// before checking asserting on snapshot sent/received metrics.
require.NoError(t, g.Wait())

// Record the snapshot metrics for the sender after the raft snapshot was sent.
// Record metrics.
senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0, metrics)
receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1, metrics)

// Asserts that the raft snapshot (aka recovery snapshot) bytes sent have been
// recorded and that it was not double counted in a different metric.
// Assert that the raft snapshot (aka recovery snapshot) bytes sent have been
// recorded and that they were not double counted in the rebalancing metric.
senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter)
require.Greater(t, senderMapDelta[".recovery"].sentBytes, int64(0))
require.Equal(t, int64(0), senderMapDelta[".rebalancing"].sentBytes)
require.Equal(t, senderMapDelta[""], senderMapDelta[".recovery"])

senderMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: snapshotLength, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
}
require.Equal(t, senderMapExpected, senderMapDelta)

// Record the snapshot metrics for the receiver after the raft snapshot was
// received.
receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1, metrics)

// Asserts that the raft snapshot (aka recovery snapshot) bytes received have
// been recorded and that it was not double counted in a different metric.
// Assert that the raft snapshot (aka recovery snapshot) bytes received have
// been recorded and that they were not double counted in the rebalancing
// metric.
receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter)

receiverMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: snapshotLength},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
}
require.Equal(t, receiverMapExpected, receiverMapDelta)
require.Greater(t, receiverMapDelta[".recovery"].rcvdBytes, int64(0))
require.Equal(t, int64(0), receiverMapDelta[".rebalancing"].rcvdBytes)
require.Equal(t, receiverMapDelta[""], receiverMapDelta[".recovery"])
}

func drain(ctx context.Context, t *testing.T, client serverpb.AdminClient, drainingNodeID int) {
Expand Down
22 changes: 16 additions & 6 deletions pkg/repstream/streampb/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,11 @@ type DebugLogicalConsumerStats struct {

Flushes struct {
Count, Nanos, KVs, Bytes, Batches int64

Last struct {
CurFlushStart time.Time
LastFlushNanos, ProcessedKVs, TotalKVs, Batches, SlowestBatchNanos int64
// TODO(dt): BatchErrors atomic.Int64
// TODO(dt): LastBatchErr atomic.Value
Last struct {
ProcessedKVs, TotalKVs, Batches int64
CurFlushStart time.Time
LastFlushNanos, SlowestBatchNanos int64
ChunksRunning, ChunksDone int64
}
}
Checkpoints struct {
Expand Down Expand Up @@ -247,10 +246,21 @@ func (d *DebugLogicalConsumerStatus) RecordFlushStart(start time.Time, keyCount
d.mu.stats.CurrentState = Flushing
d.mu.stats.Flushes.Last.CurFlushStart = start
d.mu.stats.Flushes.Last.TotalKVs = keyCount
d.mu.stats.Flushes.Last.ChunksRunning = 0
d.mu.stats.Flushes.Last.ChunksDone = 0
failPercent := d.mu.injectFailurePercent
return failPercent
}

func (d *DebugLogicalConsumerStatus) RecordChunkStart() {
atomic.AddInt64(&d.mu.stats.Flushes.Last.ChunksRunning, 1)
}

func (d *DebugLogicalConsumerStatus) RecordChunkComplete() {
atomic.AddInt64(&d.mu.stats.Flushes.Last.ChunksRunning, -1)
atomic.AddInt64(&d.mu.stats.Flushes.Last.ChunksDone, 1)
}

func (d *DebugLogicalConsumerStatus) RecordBatchApplied(t time.Duration, keyCount int64) {
nanos := t.Nanoseconds()
d.mu.Lock()
Expand Down
4 changes: 3 additions & 1 deletion pkg/server/license/enforcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,10 +614,12 @@ func (e *Enforcer) getGracePeriodDuration(defaultAndMaxLength time.Duration) tim
return newLength
}

var maxOpenTxnsDuringThrottle = envutil.EnvOrDefaultInt64("COCKROACH_MAX_OPEN_TXNS_DURING_THROTTLE", defaultMaxOpenTransactions)

// getMaxOpenTransactions returns the number of open transactions allowed before
// throttling takes affect.
func (e *Enforcer) getMaxOpenTransactions() int64 {
newLimit := envutil.EnvOrDefaultInt64("COCKROACH_MAX_OPEN_TXNS_DURING_THROTTLE", defaultMaxOpenTransactions)
newLimit := maxOpenTxnsDuringThrottle
if tk := e.GetTestingKnobs(); tk != nil && tk.OverrideMaxOpenTransactions != nil {
newLimit = *tk.OverrideMaxOpenTransactions
}
Expand Down
Loading

0 comments on commit ab2f9cf

Please sign in to comment.