diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index f2b40d2329a7..08ea360ba38a 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -2463,6 +2463,7 @@ Request object for issuing a query cancel request. | node_id | [string](#cockroach.server.serverpb.CancelQueryRequest-string) | | ID of gateway node for the query to be canceled.

TODO(itsbilal): use [(gogoproto.customname) = "NodeID"] below. Need to figure out how to teach grpc-gateway about custom names.

node_id is a string so that "local" can be used to specify that no forwarding is necessary. | [reserved](#support-status) | | query_id | [string](#cockroach.server.serverpb.CancelQueryRequest-string) | | ID of query to be canceled (converted to string). | [reserved](#support-status) | | username | [string](#cockroach.server.serverpb.CancelQueryRequest-string) | | Username of the user making this cancellation request. This may be omitted if the user is the same as the one issuing the CancelQueryRequest. The caller is responsible for case-folding and NFC normalization. | [reserved](#support-status) | +| message | [string](#cockroach.server.serverpb.CancelQueryRequest-string) | | The message to be sent to the clients of the canceled query. | [reserved](#support-status) | diff --git a/docs/generated/sql/bnf/cancel_query.bnf b/docs/generated/sql/bnf/cancel_query.bnf index c80b563f75a9..3307f30848f4 100644 --- a/docs/generated/sql/bnf/cancel_query.bnf +++ b/docs/generated/sql/bnf/cancel_query.bnf @@ -1,5 +1,5 @@ cancel_queries_stmt ::= - 'CANCEL' 'QUERY' query_id - | 'CANCEL' 'QUERY' 'IF' 'EXISTS' query_id - | 'CANCEL' 'QUERIES' select_stmt - | 'CANCEL' 'QUERIES' 'IF' 'EXISTS' select_stmt + 'CANCEL' 'QUERY' query_id opt_cancel_message + | 'CANCEL' 'QUERY' 'IF' 'EXISTS' query_id opt_cancel_message + | 'CANCEL' 'QUERIES' select_stmt opt_cancel_message + | 'CANCEL' 'QUERIES' 'IF' 'EXISTS' select_stmt opt_cancel_message diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index a3dd26e247d7..1164b149f4aa 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -569,10 +569,10 @@ cancel_jobs_stmt ::= | 'CANCEL' 'JOBS' for_schedules_clause cancel_queries_stmt ::= - 'CANCEL' 'QUERY' a_expr - | 'CANCEL' 'QUERY' 'IF' 'EXISTS' a_expr - | 'CANCEL' 'QUERIES' select_stmt - | 'CANCEL' 'QUERIES' 'IF' 'EXISTS' select_stmt + 'CANCEL' 'QUERY' a_expr opt_cancel_message + | 'CANCEL' 'QUERY' 'IF' 'EXISTS' a_expr opt_cancel_message + | 'CANCEL' 'QUERIES' select_stmt opt_cancel_message + | 'CANCEL' 'QUERIES' 'IF' 'EXISTS' select_stmt opt_cancel_message cancel_sessions_stmt ::= 'CANCEL' 'SESSION' a_expr @@ -1255,6 +1255,7 @@ unreserved_keyword ::= | 'MATERIALIZED' | 'MAXVALUE' | 'MERGE' + | 'MESSAGE' | 'METHOD' | 'MINUTE' | 'MINVALUE' @@ -1771,6 +1772,10 @@ for_schedules_clause ::= 'FOR' 'SCHEDULES' select_stmt | 'FOR' 'SCHEDULE' a_expr +opt_cancel_message ::= + 'WITH' 'MESSAGE' cancel_message + | + create_database_stmt ::= 'CREATE' 'DATABASE' database_name opt_with opt_template_clause opt_encoding_clause opt_lc_collate_clause opt_lc_ctype_clause opt_connection_limit opt_primary_region_clause opt_regions_list opt_survival_goal_clause opt_placement_clause opt_owner_clause opt_super_region_clause opt_secondary_region_clause | 'CREATE' 'DATABASE' 'IF' 'NOT' 'EXISTS' database_name opt_with opt_template_clause opt_encoding_clause opt_lc_collate_clause opt_lc_ctype_clause opt_connection_limit opt_primary_region_clause opt_regions_list opt_survival_goal_clause opt_placement_clause opt_owner_clause opt_super_region_clause opt_secondary_region_clause @@ -2446,6 +2451,9 @@ sub_type ::= | 'SOME' | 'ALL' +cancel_message ::= + 'SCONST' + opt_template_clause ::= 'TEMPLATE' opt_equal non_reserved_word_or_sconst | @@ -3958,6 +3966,7 @@ bare_label_keywords ::= | 'MATERIALIZED' | 'MAXVALUE' | 'MERGE' + | 'MESSAGE' | 'METHOD' | 'MINVALUE' | 'MODE' diff --git a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go index 93053f498433..32d55664b40d 100644 --- a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go +++ b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go @@ -354,6 +354,13 @@ func TestTenantLogic_bytes( runLogicTest(t, "bytes") } +func TestTenantLogic_cancel( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "cancel") +} + func TestTenantLogic_cascade( t *testing.T, ) { diff --git a/pkg/ccl/logictestccl/tests/local-read-committed/generated_test.go b/pkg/ccl/logictestccl/tests/local-read-committed/generated_test.go index 8670cf00ce44..1de25ac7dca0 100644 --- a/pkg/ccl/logictestccl/tests/local-read-committed/generated_test.go +++ b/pkg/ccl/logictestccl/tests/local-read-committed/generated_test.go @@ -359,6 +359,13 @@ func TestReadCommittedLogic_bytes( runLogicTest(t, "bytes") } +func TestReadCommittedLogic_cancel( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "cancel") +} + func TestReadCommittedLogic_cascade( t *testing.T, ) { diff --git a/pkg/ccl/logictestccl/tests/local-repeatable-read/generated_test.go b/pkg/ccl/logictestccl/tests/local-repeatable-read/generated_test.go index f27541e2ce02..fefe6e29c4e3 100644 --- a/pkg/ccl/logictestccl/tests/local-repeatable-read/generated_test.go +++ b/pkg/ccl/logictestccl/tests/local-repeatable-read/generated_test.go @@ -359,6 +359,13 @@ func TestRepeatableReadLogic_bytes( runLogicTest(t, "bytes") } +func TestRepeatableReadLogic_cancel( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "cancel") +} + func TestRepeatableReadLogic_cascade( t *testing.T, ) { diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index 2cd5adfec3ac..80e7008b7476 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -1139,6 +1139,8 @@ message CancelQueryRequest { // if the user is the same as the one issuing the CancelQueryRequest. // The caller is responsible for case-folding and NFC normalization. string username = 3; + // The message to be sent to the clients of the canceled query. + string message = 4; } // Response returned by target query's gateway node. diff --git a/pkg/server/status.go b/pkg/server/status.go index b4b13d7c988b..d1657903ba51 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -3515,7 +3515,7 @@ func (s *statusServer) CancelQuery( return nil, err } - isCanceled := session.CancelQuery(queryID) + isCanceled := session.CancelQuery(queryID, req.Message) return &serverpb.CancelQueryResponse{ Canceled: isCanceled, }, nil diff --git a/pkg/sql/cancel_queries.go b/pkg/sql/cancel_queries.go index 6a19212c98fd..27c89602e6d0 100644 --- a/pkg/sql/cancel_queries.go +++ b/pkg/sql/cancel_queries.go @@ -25,6 +25,7 @@ import ( type cancelQueriesNode struct { rows planNode ifExists bool + message string } func (n *cancelQueriesNode) startExec(runParams) error { @@ -62,6 +63,7 @@ func (n *cancelQueriesNode) Next(params runParams) (bool, error) { NodeId: fmt.Sprintf("%d", nodeID), QueryID: string(queryIDString), Username: params.SessionData().User().Normalized(), + Message: n.message, } response, err := params.extendedEvalCtx.SQLStatusServer.CancelQuery(params.ctx, request) @@ -81,3 +83,17 @@ func (*cancelQueriesNode) Values() tree.Datums { return nil } func (n *cancelQueriesNode) Close(ctx context.Context) { n.rows.Close(ctx) } + +// get the cancel message for the given query ID. If no message is found, return +// an empty string. +func getCancelMessage(s *Server, queryID clusterunique.ID) (message string) { + s.mu.Lock() + defer s.mu.Unlock() + + message, ok := s.mu.cancels[queryID] + if ok && message != "" { + delete(s.mu.cancels, queryID) + return message + } + return "" +} diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index fd97d4e30159..e7ba4af848ef 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -372,6 +372,10 @@ type Server struct { syncutil.Mutex connectionCount int64 rootConnectionCount int64 + + // The key is the ID of the query that is being canceled, and the value is + // the cancel message sent from the canceler. + cancels map[clusterunique.ID]string } } @@ -466,6 +470,8 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { idxRecommendationsCache: idxrecommendations.NewIndexRecommendationsCache(cfg.Settings), } + s.mu.cancels = make(map[clusterunique.ID]string) + telemetryLoggingMetrics := newTelemetryLoggingMetrics(cfg.TelemetryLoggingTestingKnobs, cfg.Settings) s.TelemetryLoggingMetrics = telemetryLoggingMetrics @@ -4141,12 +4147,15 @@ func (ex *connExecutor) hasQuery(queryID clusterunique.ID) bool { } // CancelQuery is part of the RegistrySession interface. -func (ex *connExecutor) CancelQuery(queryID clusterunique.ID) bool { +func (ex *connExecutor) CancelQuery(queryID clusterunique.ID, message string) bool { // RLock can be used because map deletion happens in // connExecutor.removeActiveQuery. ex.mu.RLock() defer ex.mu.RUnlock() if queryMeta, exists := ex.mu.ActiveQueries[queryID]; exists { + if message != "" { + ex.server.mu.cancels[queryID] = message + } queryMeta.cancelQuery() return true } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index cb17230286be..ab7cab06bcc4 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -851,7 +851,15 @@ func (ex *connExecutor) execStmtInOpenState( retEv = eventNonRetriableErr{ IsCommit: fsm.FromBool(isCommit(ast)), } + errToPush := cancelchecker.QueryCanceledError + + // Provide a more detailed error message if the canceler provides it. + message := getCancelMessage(ex.server, queryID) + if message != "" { + errToPush = cancelchecker.QueryCanceledErrorWithMessage(message) + } + // For pausable portal, we can arrive here after encountering a timeout // error and then perform a query-cleanup step. In this case, we don't // want to override the original timeout error with the query-cancelled diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index b679cdfa0b19..b12fdb29618d 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -1164,7 +1164,7 @@ func (e *distSQLSpecExecFactory) ConstructControlSchedules( } func (e *distSQLSpecExecFactory) ConstructCancelQueries( - input exec.Node, ifExists bool, + input exec.Node, ifExists bool, message string, ) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: cancel queries") } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index c2b94bdf8a52..ac027e7c370d 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -2365,7 +2365,7 @@ type RegistrySession interface { SessionUser() username.SQLUsername hasQuery(queryID clusterunique.ID) bool // CancelQuery cancels the query specified by queryID if it exists. - CancelQuery(queryID clusterunique.ID) bool + CancelQuery(queryID clusterunique.ID, message string) bool // CancelActiveQueries cancels all currently active queries. CancelActiveQueries() bool // CancelSession cancels the session. diff --git a/pkg/sql/logictest/testdata/logic_test/cancel b/pkg/sql/logictest/testdata/logic_test/cancel new file mode 100644 index 000000000000..caac60abfa22 --- /dev/null +++ b/pkg/sql/logictest/testdata/logic_test/cancel @@ -0,0 +1,82 @@ +statement ok +CREATE TABLE t (a INT PRIMARY KEY) + +statement ok +GRANT ALL ON t TO testuser + +# insert a row so the following transactions can request x-lock on it, no +# matter the transaction isolation level +statement ok +INSERT INTO t VALUES (1) + +subtest cancel + +# start a transaction and hold a x-lock on the row +statement ok +BEGIN; SELECT * FROM t WHERE a = 1 FOR UPDATE + +# issue the CANCEL QUERY with a 1-second delay +statement async cancelstmt +CANCEL QUERY ( + WITH delay AS ( + SELECT pg_sleep(1) + ), + stmts AS ( + SHOW CLUSTER STATEMENTS + ) + SELECT query_id + FROM stmts + WHERE query = 'SELECT * FROM t WHERE a = 1 FOR UPDATE' +) + +user testuser + +# start a second transaction that blocks on the first transaction that holding the +# x-lock, then this will be killed by the delayed CANCEL QUERY above +query error pgcode 57014 query execution canceled +SELECT * FROM t WHERE a = 1 FOR UPDATE + +user root + +awaitstatement cancelstmt + +statement ok +COMMIT + +subtest end + +subtest cancel-with-message + +# start a transaction and hold a x-lock on the row +statement ok +BEGIN; SELECT * FROM t WHERE a = 1 FOR UPDATE + +# issue the CANCEL QUERY with a 1-second delay +statement async cancelstmt +CANCEL QUERY ( + WITH delay AS ( + SELECT pg_sleep(1) + ), + stmts AS ( + SHOW CLUSTER STATEMENTS + ) + SELECT query_id + FROM stmts + WHERE query = 'SELECT * FROM t WHERE a = 1 FOR UPDATE' +) WITH MESSAGE 'cancel due to cluster maintenance' + +user testuser + +# start a second transaction that blocks on the first transaction that holding the +# x-lock, then this will be killed by the delayed CANCEL QUERY above +query error pgcode 57014 query execution canceled, message from the canceler: "cancel due to cluster maintenance" +SELECT * FROM t WHERE a = 1 FOR UPDATE + +user root + +awaitstatement cancelstmt + +statement ok +COMMIT + +subtest end \ No newline at end of file diff --git a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go index 88b3d55d6de7..ae436b5c4bf4 100644 --- a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go @@ -316,6 +316,13 @@ func TestLogic_bytes( runLogicTest(t, "bytes") } +func TestLogic_cancel( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "cancel") +} + func TestLogic_cascade( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go index 76d0acea01da..f1d7ae9b7b12 100644 --- a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go @@ -316,6 +316,13 @@ func TestLogic_bytes( runLogicTest(t, "bytes") } +func TestLogic_cancel( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "cancel") +} + func TestLogic_cascade( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist/generated_test.go b/pkg/sql/logictest/tests/fakedist/generated_test.go index a734d41d3290..083b20add0c3 100644 --- a/pkg/sql/logictest/tests/fakedist/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist/generated_test.go @@ -316,6 +316,13 @@ func TestLogic_bytes( runLogicTest(t, "bytes") } +func TestLogic_cancel( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "cancel") +} + func TestLogic_cascade( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go index 9c817e9baa9d..41b0d82eeee8 100644 --- a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go +++ b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go @@ -316,6 +316,13 @@ func TestLogic_bytes( runLogicTest(t, "bytes") } +func TestLogic_cancel( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "cancel") +} + func TestLogic_cascade( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-mixed-24.1/generated_test.go b/pkg/sql/logictest/tests/local-mixed-24.1/generated_test.go index b8aa1df514c3..a80d615e09cc 100644 --- a/pkg/sql/logictest/tests/local-mixed-24.1/generated_test.go +++ b/pkg/sql/logictest/tests/local-mixed-24.1/generated_test.go @@ -316,6 +316,13 @@ func TestLogic_bytes( runLogicTest(t, "bytes") } +func TestLogic_cancel( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "cancel") +} + func TestLogic_cascade( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-mixed-24.2/generated_test.go b/pkg/sql/logictest/tests/local-mixed-24.2/generated_test.go index 2ae275d582df..713dc9726784 100644 --- a/pkg/sql/logictest/tests/local-mixed-24.2/generated_test.go +++ b/pkg/sql/logictest/tests/local-mixed-24.2/generated_test.go @@ -316,6 +316,13 @@ func TestLogic_bytes( runLogicTest(t, "bytes") } +func TestLogic_cancel( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "cancel") +} + func TestLogic_cascade( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-vec-off/generated_test.go b/pkg/sql/logictest/tests/local-vec-off/generated_test.go index 7821e777fd51..88384796d54f 100644 --- a/pkg/sql/logictest/tests/local-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/local-vec-off/generated_test.go @@ -316,6 +316,13 @@ func TestLogic_bytes( runLogicTest(t, "bytes") } +func TestLogic_cancel( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "cancel") +} + func TestLogic_cascade( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local/generated_test.go b/pkg/sql/logictest/tests/local/generated_test.go index dc681f9bd294..06f6e4eafb97 100644 --- a/pkg/sql/logictest/tests/local/generated_test.go +++ b/pkg/sql/logictest/tests/local/generated_test.go @@ -316,6 +316,13 @@ func TestLogic_bytes( runLogicTest(t, "bytes") } +func TestLogic_cancel( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "cancel") +} + func TestLogic_cascade( t *testing.T, ) { diff --git a/pkg/sql/opt/exec/execbuilder/statement.go b/pkg/sql/opt/exec/execbuilder/statement.go index b4c9681740bf..553c4d3f489a 100644 --- a/pkg/sql/opt/exec/execbuilder/statement.go +++ b/pkg/sql/opt/exec/execbuilder/statement.go @@ -381,7 +381,7 @@ func (b *Builder) buildCancelQueries( return execPlan{}, colOrdMap{}, err } var ep execPlan - ep.root, err = b.factory.ConstructCancelQueries(input.root, cancel.IfExists) + ep.root, err = b.factory.ConstructCancelQueries(input.root, cancel.IfExists, cancel.Message) if err != nil { return execPlan{}, colOrdMap{}, err } diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index 7d323cbbfa1f..41ef904fbb12 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -739,6 +739,7 @@ define ControlSchedules { define CancelQueries { Input exec.Node IfExists bool + Message string } # CancelSessions implements CANCEL SESSIONS. diff --git a/pkg/sql/opt/ops/statement.opt b/pkg/sql/opt/ops/statement.opt index 907416494577..a5e3b0bc1252 100644 --- a/pkg/sql/opt/ops/statement.opt +++ b/pkg/sql/opt/ops/statement.opt @@ -272,6 +272,9 @@ define CancelPrivate { # IfExists is set if we should tolerate IDs that don't exist. IfExists bool + + # Message will be sent to the clients of the canceled queries. + Message string } # CancelSessions represents a `CANCEL SESSIONS` statement. diff --git a/pkg/sql/opt/optbuilder/misc_statements.go b/pkg/sql/opt/optbuilder/misc_statements.go index 830b31c719fe..cf11a0cc71c7 100644 --- a/pkg/sql/opt/optbuilder/misc_statements.go +++ b/pkg/sql/opt/optbuilder/misc_statements.go @@ -88,6 +88,7 @@ func (b *Builder) buildCancelQueries(n *tree.CancelQueries, inScope *scope) (out &memo.CancelPrivate{ Props: inputScope.makePhysicalProps(), IfExists: n.IfExists, + Message: n.Message, }, ) return outScope diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 022bc30a22d6..3ce1fb37d27d 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -2093,10 +2093,13 @@ func (ef *execFactory) ConstructShowCompletions(command *tree.ShowCompletions) ( } // ConstructCancelQueries is part of the exec.Factory interface. -func (ef *execFactory) ConstructCancelQueries(input exec.Node, ifExists bool) (exec.Node, error) { +func (ef *execFactory) ConstructCancelQueries( + input exec.Node, ifExists bool, message string, +) (exec.Node, error) { return &cancelQueriesNode{ rows: input.(planNode), ifExists: ifExists, + message: message, }, nil } diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 20f3f29aeefe..4d70ebf9887a 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -58,6 +58,7 @@ func TestParseDataDriven(t *testing.T) { datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "parse": + t.Log(d.Input) return sqlutils.VerifyParseFormat(t, d.Input, d.Pos, false /* plpgsql */) case "parse-no-verify": _, err := parser.Parse(d.Input) diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 6cf99c69b78d..6936806451b1 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -991,7 +991,7 @@ func (u *sqlSymUnion) triggerForEach() tree.TriggerForEach { %token LINESTRING LINESTRINGM LINESTRINGZ LINESTRINGZM %token LIST LOCAL LOCALITY LOCALTIME LOCALTIMESTAMP LOCKED LOGICAL LOGIN LOOKUP LOW LSHIFT -%token MATCH MATERIALIZED MERGE MINVALUE MAXVALUE METHOD MINUTE MODIFYCLUSTERSETTING MODIFYSQLCLUSTERSETTING MODE MONTH MOVE +%token MATCH MATERIALIZED MERGE MESSAGE MINVALUE MAXVALUE METHOD MINUTE MODIFYCLUSTERSETTING MODIFYSQLCLUSTERSETTING MODE MONTH MOVE %token MULTILINESTRING MULTILINESTRINGM MULTILINESTRINGZ MULTILINESTRINGZM %token MULTIPOINT MULTIPOINTM MULTIPOINTZ MULTIPOINTZM %token MULTIPOLYGON MULTIPOLYGONM MULTIPOLYGONZ MULTIPOLYGONZM @@ -1363,6 +1363,9 @@ func (u *sqlSymUnion) triggerForEach() tree.TriggerForEach { %type session_var %type <*string> comment_text +%type cancel_message +%type opt_cancel_message + %type transaction_stmt legacy_transaction_stmt legacy_begin_stmt legacy_end_stmt %type truncate_stmt %type unlisten_stmt @@ -4466,39 +4469,54 @@ cancel_jobs_stmt: // %Help: CANCEL QUERIES - cancel running queries // %Category: Misc // %Text: -// CANCEL QUERIES [IF EXISTS] -// CANCEL QUERY [IF EXISTS] +// CANCEL QUERIES [IF EXISTS] [WITH MESSAGE ] +// CANCEL QUERY [IF EXISTS] [WITH MESSAGE ] // %SeeAlso: SHOW STATEMENTS cancel_queries_stmt: - CANCEL QUERY a_expr + CANCEL QUERY a_expr opt_cancel_message { $$.val = &tree.CancelQueries{ Queries: &tree.Select{ Select: &tree.ValuesClause{Rows: []tree.Exprs{tree.Exprs{$3.expr()}}}, }, IfExists: false, + Message: $4, } } -| CANCEL QUERY IF EXISTS a_expr +| CANCEL QUERY IF EXISTS a_expr opt_cancel_message { $$.val = &tree.CancelQueries{ Queries: &tree.Select{ Select: &tree.ValuesClause{Rows: []tree.Exprs{tree.Exprs{$5.expr()}}}, }, IfExists: true, + Message: $6, } } | CANCEL QUERY error // SHOW HELP: CANCEL QUERIES -| CANCEL QUERIES select_stmt +| CANCEL QUERIES select_stmt opt_cancel_message { - $$.val = &tree.CancelQueries{Queries: $3.slct(), IfExists: false} + $$.val = &tree.CancelQueries{Queries: $3.slct(), IfExists: false, Message: $4} } -| CANCEL QUERIES IF EXISTS select_stmt +| CANCEL QUERIES IF EXISTS select_stmt opt_cancel_message { - $$.val = &tree.CancelQueries{Queries: $5.slct(), IfExists: true} + $$.val = &tree.CancelQueries{Queries: $5.slct(), IfExists: true, Message: $6} } | CANCEL QUERIES error // SHOW HELP: CANCEL QUERIES +cancel_message: + SCONST + +opt_cancel_message: + WITH MESSAGE cancel_message + { + $$ = $3 + } +| /* EMPTY */ + { + $$ = "" + } + // %Help: CANCEL SESSIONS - cancel open sessions // %Category: Misc // %Text: @@ -17690,6 +17708,7 @@ unreserved_keyword: | MATERIALIZED | MAXVALUE | MERGE +| MESSAGE | METHOD | MINUTE | MINVALUE @@ -18248,6 +18267,7 @@ bare_label_keywords: | MATERIALIZED | MAXVALUE | MERGE +| MESSAGE | METHOD | MINVALUE | MODE diff --git a/pkg/sql/schemachanger/comparator_generated_test.go b/pkg/sql/schemachanger/comparator_generated_test.go index 85ebb1234c32..e8e8207d013a 100644 --- a/pkg/sql/schemachanger/comparator_generated_test.go +++ b/pkg/sql/schemachanger/comparator_generated_test.go @@ -188,6 +188,11 @@ func TestSchemaChangeComparator_bytes(t *testing.T) { var logicTestFile = "pkg/sql/logictest/testdata/logic_test/bytes" runSchemaChangeComparatorTest(t, logicTestFile) } +func TestSchemaChangeComparator_cancel(t *testing.T) { + defer leaktest.AfterTest(t)() + var logicTestFile = "pkg/sql/logictest/testdata/logic_test/cancel" + runSchemaChangeComparatorTest(t, logicTestFile) +} func TestSchemaChangeComparator_cascade(t *testing.T) { defer leaktest.AfterTest(t)() var logicTestFile = "pkg/sql/logictest/testdata/logic_test/cascade" diff --git a/pkg/sql/sem/tree/run_control.go b/pkg/sql/sem/tree/run_control.go index 428074de0382..c63deeb6fd4a 100644 --- a/pkg/sql/sem/tree/run_control.go +++ b/pkg/sql/sem/tree/run_control.go @@ -49,6 +49,7 @@ func (n *ControlJobs) Format(ctx *FmtCtx) { type CancelQueries struct { Queries *Select IfExists bool + Message string } // Format implements the NodeFormatter interface. @@ -58,6 +59,10 @@ func (node *CancelQueries) Format(ctx *FmtCtx) { ctx.WriteString("IF EXISTS ") } ctx.FormatNode(node.Queries) + if node.Message != "" { + ctx.WriteString(" WITH MESSAGE ") + ctx.WriteString("'" + node.Message + "'") + } } // CancelSessions represents a CANCEL SESSIONS statement. diff --git a/pkg/util/cancelchecker/cancel_checker.go b/pkg/util/cancelchecker/cancel_checker.go index e3879c2d1047..e3fcd9062371 100644 --- a/pkg/util/cancelchecker/cancel_checker.go +++ b/pkg/util/cancelchecker/cancel_checker.go @@ -79,3 +79,10 @@ func (c *CancelChecker) Reset(ctx context.Context, checkInterval ...uint32) { // QueryCanceledError is an error representing query cancellation. var QueryCanceledError = pgerror.New( pgcode.QueryCanceled, "query execution canceled") + +// QueryCanceledErrorWithMessage is an error representing query cancellation +// with the canceler's information (e.g. user name, session ID, etc). +func QueryCanceledErrorWithMessage(message string) error { + return pgerror.Newf( + pgcode.QueryCanceled, "query execution canceled, message from the canceler: \"%s\"", message) +}