Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
135027: sql: audit error creation during planning r=yuzefovich a=yuzefovich

This commit audits error creation during the execution planning phase (mostly for vectorized engine) to avoid retrieving the stack trace when creating errors on the hot path. We do so by only wrapping or creating fresh errors when "expensive log is enabled" (so that the detailed errors still show up in trace) and otherwise using the provided or a global error. In many places I also substituted miscellaneous calls to `AssertionFailedf` where I thought we'd be getting into an invalid state, to make it more clear.

Fixes: #134586.

Release note: None

135040: sql: use test rand in tests r=yuzefovich a=yuzefovich

This should make tests more deterministic.

Informs: #134742.
Epic: None

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Nov 13, 2024
3 parents 9443526 + 4463567 + 6694645 commit 2c18247
Show file tree
Hide file tree
Showing 70 changed files with 205 additions and 175 deletions.
6 changes: 3 additions & 3 deletions pkg/sql/colcontainer/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,13 +620,13 @@ func (d *diskQueue) Enqueue(ctx context.Context, b coldata.Batch) error {
}
if d.state == diskQueueStateDequeueing {
if d.cfg.CacheMode != DiskQueueCacheModeIntertwinedCalls {
return errors.Errorf(
return errors.AssertionFailedf(
"attempted to Enqueue to DiskQueue after Dequeueing "+
"in mode that disallows it: %d", d.cfg.CacheMode,
)
}
if d.rewindable {
return errors.Errorf("attempted to Enqueue to RewindableDiskQueue after Dequeue has been called")
return errors.AssertionFailedf("attempted to Enqueue to RewindableDiskQueue after Dequeue has been called")
}
}
d.state = diskQueueStateEnqueueing
Expand Down Expand Up @@ -752,7 +752,7 @@ func (d *diskQueue) maybeInitDeserializer(ctx context.Context) (bool, error) {
d.cfg.SpilledBytesRead.Inc(int64(n))
}
if n != len(d.writer.scratch.compressedBuf) {
return false, errors.Errorf("expected to read %d bytes but read %d", len(d.writer.scratch.compressedBuf), n)
return false, errors.AssertionFailedf("expected to read %d bytes but read %d", len(d.writer.scratch.compressedBuf), n)
}

blockType := d.writer.scratch.compressedBuf[0]
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colcontainer/partitionedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (p *PartitionedDiskQueue) Enqueue(
idxToClose, found := p.partitionIdxToIndex[p.lastEnqueuedPartitionIdx]
if !found {
// This would be unexpected.
return errors.New("PartitionerStrategyCloseOnNewPartition unable to find last Enqueued partition")
return errors.AssertionFailedf("PartitionerStrategyCloseOnNewPartition unable to find last Enqueued partition")
}
if p.partitions[idxToClose].state == partitionStateWriting {
// Close the last enqueued partition. No need to release or acquire a new
Expand Down Expand Up @@ -255,9 +255,9 @@ func (p *PartitionedDiskQueue) Enqueue(
}
if state := p.partitions[idx].state; state != partitionStateWriting {
if state == partitionStatePermanentlyClosed {
return errors.Errorf("partition at index %d permanently closed, cannot Enqueue", partitionIdx)
return errors.AssertionFailedf("partition at index %d permanently closed, cannot Enqueue", partitionIdx)
}
return errors.New("Enqueue illegally called after Dequeue or CloseAllOpenWriteFileDescriptors")
return errors.AssertionFailedf("Enqueue illegally called after Dequeue or CloseAllOpenWriteFileDescriptors")
}
p.lastEnqueuedPartitionIdx = partitionIdx
return p.partitions[idx].Enqueue(ctx, batch)
Expand Down Expand Up @@ -290,7 +290,7 @@ func (p *PartitionedDiskQueue) Dequeue(
case partitionStateReading:
// Do nothing.
case partitionStatePermanentlyClosed:
return errors.Errorf("partition at index %d permanently closed, cannot Dequeue", partitionIdx)
return errors.AssertionFailedf("partition at index %d permanently closed, cannot Dequeue", partitionIdx)
default:
colexecerror.InternalError(errors.AssertionFailedf("unhandled state %d", state))
}
Expand Down
75 changes: 46 additions & 29 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import (

func checkNumIn(inputs []colexecargs.OpWithMetaInfo, numIn int) error {
if len(inputs) != numIn {
return errors.Errorf("expected %d input(s), got %d", numIn, len(inputs))
return errors.AssertionFailedf("expected %d input(s), got %d", numIn, len(inputs))
}
return nil
}
Expand Down Expand Up @@ -241,6 +241,22 @@ var (
errWindowFunctionFilterClause = errors.New("window functions with FILTER clause are not supported")
errDefaultAggregateWindowFunction = errors.New("default aggregate window functions not supported")
errStreamIngestionWrap = errors.New("core.StreamIngestion{Data,Frontier} is not supported because of #55758")
errFallbackToRenderWrapping = errors.New("falling back to wrapping a row-by-row processor due to many renders and low estimated row count")
errUnhandledSelectionExpression = errors.New("unhandled selection expression")
errUnhandledProjectionExpression = errors.New("unhandled projection expression")

errBinaryExprWithDatums = unimplemented.NewWithIssue(
49780, "datum-backed arguments on both sides and not datum-backed "+
"output of a binary expression is currently not supported",
)
errMixedTypeBinaryUnsupported = unimplemented.NewWithIssue(
46198, "dates and timestamptz not supported in mixed-type binary "+
"expressions in the vectorized engine",
)
errMixedTypeComparisonUnsupported = unimplemented.NewWithIssue(
44770, "dates and timestamp(tz) not supported in mixed-type "+
"comparison expressions in the vectorized engine",
)
)

func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCoreUnion) error {
Expand Down Expand Up @@ -1199,6 +1215,7 @@ func NewColOperator(
)
unlimitedAllocator := colmem.NewAllocator(ctx, accounts[0], factory)
ehj := colexecdisk.NewExternalHashJoiner(
ctx,
unlimitedAllocator,
flowCtx,
args,
Expand Down Expand Up @@ -1253,7 +1270,7 @@ func NewColOperator(
unlimitedAllocator := colmem.NewAllocator(ctx, accounts[0], factory)
diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, spec.ProcessorID)
mj := colexecjoin.NewMergeJoinOp(
unlimitedAllocator, execinfra.GetWorkMemLimit(flowCtx),
ctx, unlimitedAllocator, execinfra.GetWorkMemLimit(flowCtx),
args.DiskQueueCfg, args.FDSemaphore,
joinType, inputs[0].Root, inputs[1].Root,
spec.Input[0].ColumnTypes, spec.Input[1].ColumnTypes,
Expand Down Expand Up @@ -1362,6 +1379,7 @@ func NewColOperator(
// When we spill to disk, we just use a combo of an external
// hash join followed by an external hash aggregation.
ehj := colexecdisk.NewExternalHashJoiner(
ctx,
colmem.NewAllocator(ctx, ehjMemAccount, factory),
flowCtx,
args,
Expand Down Expand Up @@ -1452,7 +1470,7 @@ func NewColOperator(
}
castIdx := len(result.ColumnTypes)
input, err = colexecbase.GetCastOperator(
getStreamingAllocator(ctx, args, flowCtx), input, argIdxs[i],
ctx, getStreamingAllocator(ctx, args, flowCtx), input, argIdxs[i],
castIdx, argTypes[i], typ, flowCtx.EvalCtx,
)
if err != nil {
Expand Down Expand Up @@ -1769,7 +1787,7 @@ func NewColOperator(
if !actual.Identical(expected) {
castedIdx := len(r.ColumnTypes)
r.Root, err = colexecbase.GetCastOperator(
getStreamingAllocator(ctx, args, flowCtx), r.Root, i, castedIdx,
ctx, getStreamingAllocator(ctx, args, flowCtx), r.Root, i, castedIdx,
actual, expected, flowCtx.EvalCtx,
)
if err != nil {
Expand Down Expand Up @@ -1912,8 +1930,6 @@ var renderWrappingRenderCountThreshold = settings.RegisterIntSetting(
settings.NonNegativeInt,
)

var errFallbackToRenderWrapping = errors.New("falling back to wrapping a row-by-row processor due to many renders and low estimated row count")

// planPostProcessSpec plans the post processing stage specified in post on top
// of r.Op.
func (r *postProcessResult) planPostProcessSpec(
Expand Down Expand Up @@ -1961,7 +1977,10 @@ func (r *postProcessResult) planPostProcessSpec(
ctx, flowCtx.EvalCtx, expr, r.ColumnTypes, r.Op, getStreamingAllocator(ctx, args, flowCtx), releasables,
)
if err != nil {
return errors.Wrapf(err, "unable to columnarize render expression %q", expr)
if log.ExpensiveLogEnabled(ctx, 1) {
err = errors.Wrapf(err, "unable to columnarize render expression %q", expr)
}
return err
}
if outputIdx < 0 {
return errors.AssertionFailedf("missing outputIdx")
Expand Down Expand Up @@ -2051,7 +2070,10 @@ func (r opResult) planFilterExpr(
ctx, flowCtx.EvalCtx, expr, r.ColumnTypes, r.Root, allocator, &r.Releasables,
)
if err != nil {
return errors.Wrapf(err, "unable to columnarize filter expression %q", filter)
if log.ExpensiveLogEnabled(ctx, 1) {
err = errors.Wrapf(err, "unable to columnarize filter expression %q", filter)
}
return err
}
r.Root = op
if len(filterColumnTypes) > len(r.ColumnTypes) {
Expand Down Expand Up @@ -2241,14 +2263,19 @@ func planSelectionOperators(
op, err = colexecutils.BoolOrUnknownToSelOp(op, typs, resultIdx)
return op, resultIdx, typs, err
default:
return nil, resultIdx, nil, errors.Errorf("unhandled selection expression type: %s", reflect.TypeOf(t))
err = errUnhandledSelectionExpression
if log.ExpensiveLogEnabled(ctx, 1) {
err = errors.Newf("unhandled selection expression type: %s", reflect.TypeOf(t))
}
return nil, resultIdx, nil, err
}
}

// planCastOperator plans a CAST operator that casts the column at index
// 'inputIdx' coming from input of type 'fromType' into a column of type
// 'toType' that will be output at index 'resultIdx'.
func planCastOperator(
ctx context.Context,
columnTypes []*types.T,
input colexecop.Operator,
inputIdx int,
Expand All @@ -2258,7 +2285,7 @@ func planCastOperator(
evalCtx *eval.Context,
) (op colexecop.Operator, resultIdx int, typs []*types.T, err error) {
outputIdx := len(columnTypes)
op, err = colexecbase.GetCastOperator(allocator, input, inputIdx, outputIdx, fromType, toType, evalCtx)
op, err = colexecbase.GetCastOperator(ctx, allocator, input, inputIdx, outputIdx, fromType, toType, evalCtx)
typs = append(columnTypes, toType)
return op, outputIdx, typs, err
}
Expand Down Expand Up @@ -2315,7 +2342,7 @@ func planProjectionOperators(
leftExpr = tree.NewTypedCastExpr(leftExpr, types.String)
} else {
// This is unexpected.
return op, resultIdx, typs, errors.New("neither LHS or RHS of Concat operation is a STRING")
return op, resultIdx, typs, errors.AssertionFailedf("neither LHS or RHS of Concat operation is a STRING")
}
}
}
Expand Down Expand Up @@ -2394,7 +2421,7 @@ func planProjectionOperators(
// is given). In such case, we need to plan a cast.
fromType, toType := typs[thenIdxs[i]], typs[caseOutputIdx]
caseOps[i], thenIdxs[i], typs, err = planCastOperator(
typs, caseOps[i], thenIdxs[i], fromType, toType, allocator, evalCtx,
ctx, typs, caseOps[i], thenIdxs[i], fromType, toType, allocator, evalCtx,
)
if err != nil {
return nil, resultIdx, typs, err
Expand All @@ -2420,7 +2447,7 @@ func planProjectionOperators(
elseIdx := thenIdxs[len(t.Whens)]
fromType, toType := typs[elseIdx], typs[caseOutputIdx]
elseOp, thenIdxs[len(t.Whens)], typs, err = planCastOperator(
typs, elseOp, elseIdx, fromType, toType, allocator, evalCtx,
ctx, typs, elseOp, elseIdx, fromType, toType, allocator, evalCtx,
)
if err != nil {
return nil, resultIdx, typs, err
Expand All @@ -2438,7 +2465,7 @@ func planProjectionOperators(
if err != nil {
return nil, 0, nil, err
}
op, resultIdx, typs, err = planCastOperator(typs, op, resultIdx, expr.ResolvedType(), t.ResolvedType(), allocator, evalCtx)
op, resultIdx, typs, err = planCastOperator(ctx, typs, op, resultIdx, expr.ResolvedType(), t.ResolvedType(), allocator, evalCtx)
return op, resultIdx, typs, err
case *tree.CoalesceExpr:
// We handle CoalesceExpr by planning the equivalent CASE expression.
Expand Down Expand Up @@ -2592,7 +2619,11 @@ func planProjectionOperators(
typs = append(typs, outputType)
return op, resultIdx, typs, err
default:
return nil, resultIdx, nil, errors.Errorf("unhandled projection expression type: %s", reflect.TypeOf(t))
err = errUnhandledProjectionExpression
if log.ExpensiveLogEnabled(ctx, 1) {
err = errors.Newf("unhandled projection expression type: %s", reflect.TypeOf(t))
}
return nil, resultIdx, nil, err
}
}

Expand Down Expand Up @@ -2620,15 +2651,6 @@ func safeTypesForBinOrCmpExpr(leftTyp, rightTyp *types.T) bool {
return false
}

var errBinaryExprWithDatums = unimplemented.NewWithIssue(
49780, "datum-backed arguments on both sides and not datum-backed "+
"output of a binary expression is currently not supported",
)
var errMixedTypeBinaryUnsupported = unimplemented.NewWithIssue(
46198, "dates and timestamptz not supported in mixed-type binary "+
"expressions in the vectorized engine",
)

func checkSupportedBinaryExpr(left, right tree.TypedExpr, outputType *types.T) error {
leftTyp := left.ResolvedType()
rightTyp := right.ResolvedType()
Expand Down Expand Up @@ -2660,11 +2682,6 @@ func checkSupportedBinaryExpr(left, right tree.TypedExpr, outputType *types.T) e
return nil
}

var errMixedTypeComparisonUnsupported = unimplemented.NewWithIssue(
44770, "dates and timestamp(tz) not supported in mixed-type "+
"comparison expressions in the vectorized engine",
)

func checkSupportedComparisonExpr(left, right tree.TypedExpr) error {
leftTyp := left.ResolvedType()
rightTyp := right.ResolvedType()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/any_not_null_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func newAnyNotNull_AGGKINDAggAlloc(
}
// {{end}}
}
return nil, errors.Errorf("unsupported any not null agg type %s", t.Name())
return nil, errors.AssertionFailedf("unsupported any not null agg type %s", t.Name())
}

// {{range .}}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/avg_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func newAvg_AGGKINDAggAlloc(
}
// {{end}}
}
return nil, errors.Errorf("unsupported avg agg type %s", t.Name())
return nil, errors.AssertionFailedf("unsupported avg agg type %s", t.Name())
}

// {{range .}}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/hash_any_not_null_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/hash_avg_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/hash_sum_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/hash_sum_int_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/ordered_any_not_null_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/ordered_avg_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/ordered_sum_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/ordered_sum_int_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/sum_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func newSum_SUMKIND_AGGKINDAggAlloc(
}
// {{end}}
}
return nil, errors.Errorf("unsupported sum agg type %s", t.Name())
return nil, errors.AssertionFailedf("unsupported sum agg type %s", t.Name())
}

// {{range .Infos}}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/window_avg_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/window_sum_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecagg/window_sum_int_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2c18247

Please sign in to comment.