Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add optional message to CANCEL commands #130776

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -2463,6 +2463,7 @@ Request object for issuing a query cancel request.
| node_id | [string](#cockroach.server.serverpb.CancelQueryRequest-string) | | ID of gateway node for the query to be canceled.<br><br>TODO(itsbilal): use [(gogoproto.customname) = "NodeID"] below. Need to figure out how to teach grpc-gateway about custom names.<br><br>node_id is a string so that "local" can be used to specify that no forwarding is necessary. | [reserved](#support-status) |
| query_id | [string](#cockroach.server.serverpb.CancelQueryRequest-string) | | ID of query to be canceled (converted to string). | [reserved](#support-status) |
| username | [string](#cockroach.server.serverpb.CancelQueryRequest-string) | | Username of the user making this cancellation request. This may be omitted if the user is the same as the one issuing the CancelQueryRequest. The caller is responsible for case-folding and NFC normalization. | [reserved](#support-status) |
| message | [string](#cockroach.server.serverpb.CancelQueryRequest-string) | | The message to be sent to the clients of the canceled query. | [reserved](#support-status) |



Expand Down
8 changes: 4 additions & 4 deletions docs/generated/sql/bnf/cancel_query.bnf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cancel_queries_stmt ::=
'CANCEL' 'QUERY' query_id
| 'CANCEL' 'QUERY' 'IF' 'EXISTS' query_id
| 'CANCEL' 'QUERIES' select_stmt
| 'CANCEL' 'QUERIES' 'IF' 'EXISTS' select_stmt
'CANCEL' 'QUERY' query_id opt_cancel_message
| 'CANCEL' 'QUERY' 'IF' 'EXISTS' query_id opt_cancel_message
| 'CANCEL' 'QUERIES' select_stmt opt_cancel_message
| 'CANCEL' 'QUERIES' 'IF' 'EXISTS' select_stmt opt_cancel_message
17 changes: 13 additions & 4 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -569,10 +569,10 @@ cancel_jobs_stmt ::=
| 'CANCEL' 'JOBS' for_schedules_clause

cancel_queries_stmt ::=
'CANCEL' 'QUERY' a_expr
| 'CANCEL' 'QUERY' 'IF' 'EXISTS' a_expr
| 'CANCEL' 'QUERIES' select_stmt
| 'CANCEL' 'QUERIES' 'IF' 'EXISTS' select_stmt
'CANCEL' 'QUERY' a_expr opt_cancel_message
| 'CANCEL' 'QUERY' 'IF' 'EXISTS' a_expr opt_cancel_message
| 'CANCEL' 'QUERIES' select_stmt opt_cancel_message
| 'CANCEL' 'QUERIES' 'IF' 'EXISTS' select_stmt opt_cancel_message

cancel_sessions_stmt ::=
'CANCEL' 'SESSION' a_expr
Expand Down Expand Up @@ -1255,6 +1255,7 @@ unreserved_keyword ::=
| 'MATERIALIZED'
| 'MAXVALUE'
| 'MERGE'
| 'MESSAGE'
| 'METHOD'
| 'MINUTE'
| 'MINVALUE'
Expand Down Expand Up @@ -1771,6 +1772,10 @@ for_schedules_clause ::=
'FOR' 'SCHEDULES' select_stmt
| 'FOR' 'SCHEDULE' a_expr

opt_cancel_message ::=
'WITH' 'MESSAGE' cancel_message
|

create_database_stmt ::=
'CREATE' 'DATABASE' database_name opt_with opt_template_clause opt_encoding_clause opt_lc_collate_clause opt_lc_ctype_clause opt_connection_limit opt_primary_region_clause opt_regions_list opt_survival_goal_clause opt_placement_clause opt_owner_clause opt_super_region_clause opt_secondary_region_clause
| 'CREATE' 'DATABASE' 'IF' 'NOT' 'EXISTS' database_name opt_with opt_template_clause opt_encoding_clause opt_lc_collate_clause opt_lc_ctype_clause opt_connection_limit opt_primary_region_clause opt_regions_list opt_survival_goal_clause opt_placement_clause opt_owner_clause opt_super_region_clause opt_secondary_region_clause
Expand Down Expand Up @@ -2446,6 +2451,9 @@ sub_type ::=
| 'SOME'
| 'ALL'

cancel_message ::=
'SCONST'

opt_template_clause ::=
'TEMPLATE' opt_equal non_reserved_word_or_sconst
|
Expand Down Expand Up @@ -3958,6 +3966,7 @@ bare_label_keywords ::=
| 'MATERIALIZED'
| 'MAXVALUE'
| 'MERGE'
| 'MESSAGE'
| 'METHOD'
| 'MINVALUE'
| 'MODE'
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,8 @@ message CancelQueryRequest {
// if the user is the same as the one issuing the CancelQueryRequest.
// The caller is responsible for case-folding and NFC normalization.
string username = 3;
// The message to be sent to the clients of the canceled query.
string message = 4;
}

// Response returned by target query's gateway node.
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3515,7 +3515,7 @@ func (s *statusServer) CancelQuery(
return nil, err
}

isCanceled := session.CancelQuery(queryID)
isCanceled := session.CancelQuery(queryID, req.Message)
return &serverpb.CancelQueryResponse{
Canceled: isCanceled,
}, nil
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/cancel_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type cancelQueriesNode struct {
rows planNode
ifExists bool
message string
}

func (n *cancelQueriesNode) startExec(runParams) error {
Expand Down Expand Up @@ -62,6 +63,7 @@ func (n *cancelQueriesNode) Next(params runParams) (bool, error) {
NodeId: fmt.Sprintf("%d", nodeID),
QueryID: string(queryIDString),
Username: params.SessionData().User().Normalized(),
Message: n.message,
}

response, err := params.extendedEvalCtx.SQLStatusServer.CancelQuery(params.ctx, request)
Expand All @@ -81,3 +83,17 @@ func (*cancelQueriesNode) Values() tree.Datums { return nil }
func (n *cancelQueriesNode) Close(ctx context.Context) {
n.rows.Close(ctx)
}

// get the cancel message for the given query ID. If no message is found, return
// an empty string.
func getCancelMessage(s *Server, queryID clusterunique.ID) (message string) {
s.mu.Lock()
defer s.mu.Unlock()

message, ok := s.mu.cancels[queryID]
if ok && message != "" {
delete(s.mu.cancels, queryID)
return message
}
return ""
}
11 changes: 10 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ type Server struct {
syncutil.Mutex
connectionCount int64
rootConnectionCount int64

// The key is the ID of the query that is being canceled, and the value is
// the cancel message sent from the canceler.
cancels map[clusterunique.ID]string
}
}

Expand Down Expand Up @@ -466,6 +470,8 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
idxRecommendationsCache: idxrecommendations.NewIndexRecommendationsCache(cfg.Settings),
}

s.mu.cancels = make(map[clusterunique.ID]string)

telemetryLoggingMetrics := newTelemetryLoggingMetrics(cfg.TelemetryLoggingTestingKnobs, cfg.Settings)
s.TelemetryLoggingMetrics = telemetryLoggingMetrics

Expand Down Expand Up @@ -4141,12 +4147,15 @@ func (ex *connExecutor) hasQuery(queryID clusterunique.ID) bool {
}

// CancelQuery is part of the RegistrySession interface.
func (ex *connExecutor) CancelQuery(queryID clusterunique.ID) bool {
func (ex *connExecutor) CancelQuery(queryID clusterunique.ID, message string) bool {
// RLock can be used because map deletion happens in
// connExecutor.removeActiveQuery.
ex.mu.RLock()
defer ex.mu.RUnlock()
if queryMeta, exists := ex.mu.ActiveQueries[queryID]; exists {
if message != "" {
ex.server.mu.cancels[queryID] = message
}
queryMeta.cancelQuery()
return true
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,15 @@ func (ex *connExecutor) execStmtInOpenState(
retEv = eventNonRetriableErr{
IsCommit: fsm.FromBool(isCommit(ast)),
}

errToPush := cancelchecker.QueryCanceledError

// Provide a more detailed error message if the canceler provides it.
message := getCancelMessage(ex.server, queryID)
if message != "" {
errToPush = cancelchecker.QueryCanceledErrorWithMessage(message)
}

// For pausable portal, we can arrive here after encountering a timeout
// error and then perform a query-cleanup step. In this case, we don't
// want to override the original timeout error with the query-cancelled
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ func (e *distSQLSpecExecFactory) ConstructControlSchedules(
}

func (e *distSQLSpecExecFactory) ConstructCancelQueries(
input exec.Node, ifExists bool,
input exec.Node, ifExists bool, message string,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: cancel queries")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2365,7 +2365,7 @@ type RegistrySession interface {
SessionUser() username.SQLUsername
hasQuery(queryID clusterunique.ID) bool
// CancelQuery cancels the query specified by queryID if it exists.
CancelQuery(queryID clusterunique.ID) bool
CancelQuery(queryID clusterunique.ID, message string) bool
// CancelActiveQueries cancels all currently active queries.
CancelActiveQueries() bool
// CancelSession cancels the session.
Expand Down
82 changes: 82 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/cancel
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
statement ok
CREATE TABLE t (a INT PRIMARY KEY)

statement ok
GRANT ALL ON t TO testuser

# insert a row so the following transactions can request x-lock on it, no
# matter the transaction isolation level
statement ok
INSERT INTO t VALUES (1)

subtest cancel

# start a transaction and hold a x-lock on the row
statement ok
BEGIN; SELECT * FROM t WHERE a = 1 FOR UPDATE

# issue the CANCEL QUERY with a 1-second delay
statement async cancelstmt
CANCEL QUERY (
WITH delay AS (
SELECT pg_sleep(1)
),
stmts AS (
SHOW CLUSTER STATEMENTS
)
SELECT query_id
FROM stmts
WHERE query = 'SELECT * FROM t WHERE a = 1 FOR UPDATE'
)

user testuser

# start a second transaction that blocks on the first transaction that holding the
# x-lock, then this will be killed by the delayed CANCEL QUERY above
query error pgcode 57014 query execution canceled
SELECT * FROM t WHERE a = 1 FOR UPDATE

user root

awaitstatement cancelstmt

statement ok
COMMIT

subtest end

subtest cancel-with-message

# start a transaction and hold a x-lock on the row
statement ok
BEGIN; SELECT * FROM t WHERE a = 1 FOR UPDATE

# issue the CANCEL QUERY with a 1-second delay
statement async cancelstmt
CANCEL QUERY (
WITH delay AS (
SELECT pg_sleep(1)
),
stmts AS (
SHOW CLUSTER STATEMENTS
)
SELECT query_id
FROM stmts
WHERE query = 'SELECT * FROM t WHERE a = 1 FOR UPDATE'
) WITH MESSAGE 'cancel due to cluster maintenance'

user testuser

# start a second transaction that blocks on the first transaction that holding the
# x-lock, then this will be killed by the delayed CANCEL QUERY above
query error pgcode 57014 query execution canceled, message from the canceler: "cancel due to cluster maintenance"
SELECT * FROM t WHERE a = 1 FOR UPDATE

user root

awaitstatement cancelstmt

statement ok
COMMIT

subtest end
7 changes: 7 additions & 0 deletions pkg/sql/logictest/tests/fakedist-disk/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/sql/logictest/tests/fakedist/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/sql/logictest/tests/local-mixed-24.1/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading