From 6547e147e921bd6e88e8ba1beb2cda6eed183ed3 Mon Sep 17 00:00:00 2001 From: zhihanz Date: Wed, 6 Mar 2024 10:48:42 +0800 Subject: [PATCH] feat: support notification syntax with rpc, and support task error notification integration (#14845) * feat: support notification rpc and task notification * feat: remove external library protoc dependencies * chore: apply comment rename utils.proto to timestamp.proto --------- Co-authored-by: Bohu --- Cargo.lock | 1 + Makefile | 2 + src/common/cloud_control/Cargo.toml | 2 +- .../cloud_control/proto/notification.proto | 132 +++ src/common/cloud_control/proto/task.proto | 3 + .../cloud_control/proto/timestamp.proto | 16 + src/common/cloud_control/src/client_config.rs | 37 + src/common/cloud_control/src/cloud_api.rs | 16 +- src/common/cloud_control/src/lib.rs | 7 + .../cloud_control/src/notification_client.rs | 99 ++ .../cloud_control/src/notification_utils.rs | 127 +++ src/common/cloud_control/src/task_client.rs | 31 +- src/common/cloud_control/src/task_utils.rs | 2 + .../cloud_control/tests/it/task_client.rs | 2 + src/query/ast/src/ast/statements/mod.rs | 2 + .../ast/src/ast/statements/notification.rs | 209 +++++ src/query/ast/src/ast/statements/statement.rs | 10 + src/query/ast/src/ast/statements/task.rs | 20 +- src/query/ast/src/ast/visitors/visitor.rs | 4 + src/query/ast/src/ast/visitors/visitor_mut.rs | 6 + src/query/ast/src/ast/visitors/walk.rs | 4 + src/query/ast/src/ast/visitors/walk_mut.rs | 4 + src/query/ast/src/parser/statement.rs | 173 +++- src/query/ast/src/parser/token.rs | 15 + src/query/ast/tests/it/parser.rs | 11 +- .../ast/tests/it/testdata/statement-error.txt | 4 +- src/query/ast/tests/it/testdata/statement.txt | 194 +++- .../src/databases/system/system_database.rs | 4 + .../interpreters/access/privilege_access.rs | 4 + .../service/src/interpreters/common/mod.rs | 4 +- .../src/interpreters/common/notification.rs | 35 + .../service/src/interpreters/common/task.rs | 12 +- .../src/interpreters/interpreter_factory.rs | 20 + .../interpreter_notification_alter.rs | 104 +++ .../interpreter_notification_create.rs | 101 +++ .../interpreter_notification_desc.rs | 86 ++ .../interpreter_notification_drop.rs | 80 ++ .../interpreters/interpreter_task_alter.rs | 10 +- .../interpreters/interpreter_task_create.rs | 7 +- .../interpreters/interpreter_task_describe.rs | 6 +- .../src/interpreters/interpreter_task_drop.rs | 6 +- .../interpreters/interpreter_task_execute.rs | 6 +- .../interpreters/interpreter_tasks_show.rs | 6 +- src/query/service/src/interpreters/mod.rs | 4 + .../table_functions/cloud/task_dependents.rs | 2 +- .../cloud/task_dependents_enable.rs | 2 +- .../it/servers/http/http_query_handlers.rs | 8 +- .../it/storages/testdata/columns_table.txt | 849 +++++++++--------- src/query/sql/src/planner/binder/binder.rs | 13 +- src/query/sql/src/planner/binder/ddl/mod.rs | 1 + .../src/planner/binder/ddl/notification.rs | 170 ++++ src/query/sql/src/planner/binder/ddl/task.rs | 4 + .../sql/src/planner/format/display_plan.rs | 6 + src/query/sql/src/planner/plans/ddl/mod.rs | 2 + .../sql/src/planner/plans/ddl/notification.rs | 122 +++ src/query/sql/src/planner/plans/ddl/task.rs | 2 + src/query/sql/src/planner/plans/plan.rs | 12 +- src/query/storages/system/src/lib.rs | 5 + .../system/src/notification_history_table.rs | 166 ++++ .../system/src/notifications_table.rs | 147 +++ .../storages/system/src/task_history_table.rs | 2 +- src/query/storages/system/src/tasks_table.rs | 5 +- .../cloud_control_server/notification_pb2.py | 62 ++ .../notification_pb2_grpc.py | 304 +++++++ tests/cloud_control_server/simple_server.py | 147 ++- tests/cloud_control_server/task_pb2.py | 116 +-- .../notification_history/sample1.json | 15 + .../notification_history/sample2.json | 13 + tests/cloud_control_server/timestamp_pb2.py | 26 + .../timestamp_pb2_grpc.py | 3 + .../suites/task/task_notification_test.test | 78 ++ 71 files changed, 3354 insertions(+), 556 deletions(-) create mode 100644 src/common/cloud_control/proto/notification.proto create mode 100644 src/common/cloud_control/proto/timestamp.proto create mode 100644 src/common/cloud_control/src/notification_client.rs create mode 100644 src/common/cloud_control/src/notification_utils.rs create mode 100644 src/query/ast/src/ast/statements/notification.rs create mode 100644 src/query/service/src/interpreters/common/notification.rs create mode 100644 src/query/service/src/interpreters/interpreter_notification_alter.rs create mode 100644 src/query/service/src/interpreters/interpreter_notification_create.rs create mode 100644 src/query/service/src/interpreters/interpreter_notification_desc.rs create mode 100644 src/query/service/src/interpreters/interpreter_notification_drop.rs create mode 100644 src/query/sql/src/planner/binder/ddl/notification.rs create mode 100644 src/query/sql/src/planner/plans/ddl/notification.rs create mode 100644 src/query/storages/system/src/notification_history_table.rs create mode 100644 src/query/storages/system/src/notifications_table.rs create mode 100644 tests/cloud_control_server/notification_pb2.py create mode 100644 tests/cloud_control_server/notification_pb2_grpc.py create mode 100644 tests/cloud_control_server/testdata/notification_history/sample1.json create mode 100644 tests/cloud_control_server/testdata/notification_history/sample2.json create mode 100644 tests/cloud_control_server/timestamp_pb2.py create mode 100644 tests/cloud_control_server/timestamp_pb2_grpc.py create mode 100644 tests/sqllogictests/suites/task/task_notification_test.test diff --git a/Cargo.lock b/Cargo.lock index 65b236a78b76..2e08a780bcaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2745,6 +2745,7 @@ dependencies = [ "prost 0.12.3", "prost-build", "semver", + "serde", "tonic 0.10.2", "tonic-build", "tower", diff --git a/Makefile b/Makefile index d570a882d862..fbf18a5d3353 100644 --- a/Makefile +++ b/Makefile @@ -114,5 +114,7 @@ clean: genproto: python -m grpc_tools.protoc -Isrc/common/cloud_control/proto/ --python_out=tests/cloud_control_server/ --grpc_python_out=tests/cloud_control_server/ src/common/cloud_control/proto/task.proto + python -m grpc_tools.protoc -Isrc/common/cloud_control/proto/ --python_out=tests/cloud_control_server/ --grpc_python_out=tests/cloud_control_server/ src/common/cloud_control/proto/notification.proto + python -m grpc_tools.protoc -Isrc/common/cloud_control/proto/ --python_out=tests/cloud_control_server/ --grpc_python_out=tests/cloud_control_server/ src/common/cloud_control/proto/timestamp.proto .PHONY: setup test run build fmt lint clean docs diff --git a/src/common/cloud_control/Cargo.toml b/src/common/cloud_control/Cargo.toml index 66026a736fe8..be86a378c765 100644 --- a/src/common/cloud_control/Cargo.toml +++ b/src/common/cloud_control/Cargo.toml @@ -16,8 +16,8 @@ chrono = { workspace = true } databend-common-base = { path = "../base" } databend-common-exception = { path = "../exception" } prost = { workspace = true } +serde = { workspace = true } tonic = { workspace = true } - [build-dependencies] lenient_semver = "0.4.2" prost-build = { workspace = true } diff --git a/src/common/cloud_control/proto/notification.proto b/src/common/cloud_control/proto/notification.proto new file mode 100644 index 000000000000..98828ee8347a --- /dev/null +++ b/src/common/cloud_control/proto/notification.proto @@ -0,0 +1,132 @@ +syntax = "proto3"; +option go_package = "databend.com/cloudcontrol/notification/proto"; +import "timestamp.proto"; + +package notificationproto; + +message NotificationError { + string kind = 1; + string message = 2; + int32 code = 3; +} +enum NotificationType { + WEBHOOK = 0; +} +message CreateNotificationRequest { + + string tenant_id = 1; + bool if_not_exists = 2; + string name = 3; + NotificationType notification_type = 4; + bool enabled = 6; + string webhook_url = 7; + optional string webhook_method = 8; + optional string webhook_authorization_header = 9; + + optional string comments = 90; +} + +message CreateNotificationResponse { + optional NotificationError error = 1; + uint64 notification_id = 2; +} + +message DropNotificationRequest { + string tenant_id = 1; + string name = 2; + bool if_exists = 3; +} + +message DropNotificationResponse { + optional NotificationError error = 1; +} + +message Notification { + uint64 notification_id = 1; + string tenant_id = 2; + string name = 3; + NotificationType notification_type = 4; + bool enabled = 5; + string webhook_url = 6; + optional string webhook_method = 7; + optional string webhook_authorization_header = 8; + + optional string comments = 90; + utils.Timestamp created_time = 91; + string created_by = 92; + utils.Timestamp updated_time = 93; + string updated_by = 94; +} + +message ListNotificationRequest { + string tenant_id = 1; +} + +message ListNotificationResponse { + optional NotificationError error = 1; + repeated Notification notifications = 5; +} + +message GetNotificationRequest { + string tenant_id = 1; + string name = 2; +} + +message GetNotificationResponse { + optional NotificationError error = 1; + Notification notification = 5; +} + +message AlterNotificationRequest { + string tenant_id = 1; + string name = 2; + string operation_type = 3; + optional bool enabled = 4; + optional string webhook_url = 5; + optional string webhook_method = 6; + optional string webhook_authorization_header = 7; + optional string comments = 8; +} + +message AlterNotificationResponse { + optional NotificationError error = 1; + uint64 notification_id = 2; +} + +message NotificationHistory { + utils.Timestamp created_time = 1; + utils.Timestamp processed_time = 2; + string message_source = 3; + string name = 4; + string message = 5; + string status = 6; + string error_message = 7; // if notification failed, provide failed message +} + +message ListNotificationHistoryRequest { + string tenant_id = 1; + optional string notification_name = 2; + optional utils.Timestamp start_time = 3; + optional utils.Timestamp end_time = 4; + optional int32 result_limit = 5; + optional int32 page_size = 6; // 100 by default + optional int64 next_page_token = 7; + optional int64 previous_page_token = 8; +} + +message ListNotificationHistoryResponse { + optional NotificationError error = 1; + repeated NotificationHistory notification_histories = 5; + // you can pass the next_page_token to the list request to get the next page. so does previous_page_token. + int64 next_page_token = 6; + int64 previous_page_token = 7; +} + +service NotificationService { + rpc CreateNotification(CreateNotificationRequest) returns (CreateNotificationResponse); + rpc DropNotification(DropNotificationRequest) returns (DropNotificationResponse); + rpc ListNotification(ListNotificationRequest) returns (ListNotificationResponse); + rpc GetNotification(GetNotificationRequest) returns (GetNotificationResponse); + rpc AlterNotification(AlterNotificationRequest) returns (AlterNotificationResponse); + rpc ListNotificationHistory(ListNotificationHistoryRequest) returns (ListNotificationHistoryResponse); +} \ No newline at end of file diff --git a/src/common/cloud_control/proto/task.proto b/src/common/cloud_control/proto/task.proto index dda4f3776405..189bba67f876 100644 --- a/src/common/cloud_control/proto/task.proto +++ b/src/common/cloud_control/proto/task.proto @@ -34,6 +34,7 @@ message CreateTaskRequest { repeated string after = 11; // optional string when_condition = 12; map session_parameters = 13; + optional string error_integration = 14; } message TaskError { @@ -92,6 +93,7 @@ message Task { repeated string after = 17; optional string when_condition = 18; map session_parameters = 19; + optional string error_integration = 20; } @@ -138,6 +140,7 @@ message AlterTaskRequest { repeated string remove_after = 13; bool set_session_parameters = 14; map session_parameters = 15; + optional string error_integration = 16; } message AlterTaskResponse { diff --git a/src/common/cloud_control/proto/timestamp.proto b/src/common/cloud_control/proto/timestamp.proto new file mode 100644 index 000000000000..3c46d69194fd --- /dev/null +++ b/src/common/cloud_control/proto/timestamp.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; +package utils; + + +message Timestamp { + // Represents seconds of UTC time since Unix epoch + // 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to + // 9999-12-31T23:59:59Z inclusive. + int64 seconds = 1; + + // Non-negative fractions of a second at nanosecond resolution. Negative + // second values with fractions must still have non-negative nanos values + // that count forward in time. Must be from 0 to 999,999,999 + // inclusive. + int32 nanos = 2; +} diff --git a/src/common/cloud_control/src/client_config.rs b/src/common/cloud_control/src/client_config.rs index aeb87052962f..7f7c6e6dd62d 100644 --- a/src/common/cloud_control/src/client_config.rs +++ b/src/common/cloud_control/src/client_config.rs @@ -15,6 +15,8 @@ use std::fmt::Debug; use std::time::Duration; +use tonic::Request; + use crate::cloud_api::CLOUD_REQUEST_TIMEOUT_SEC; use crate::cloud_api::QUERY_ID; use crate::cloud_api::REQUESTER; @@ -42,6 +44,19 @@ impl ClientConfig { self.metadata.push((key.into(), value.into())); } + pub fn add_task_version_info(&mut self) { + self.add_metadata( + crate::task_client::TASK_CLIENT_VERSION_NAME, + crate::task_client::TASK_CLIENT_VERSION, + ); + } + + pub fn add_notification_version_info(&mut self) { + self.add_metadata( + crate::notification_client::NOTIFICATION_CLIENT_VERSION_NAME, + crate::notification_client::NOTIFICATION_CLIENT_VERSION, + ); + } pub fn get_metadata(&self) -> &Vec<(String, String)> { &self.metadata } @@ -72,3 +87,25 @@ pub fn build_client_config( config.add_metadata(QUERY_ID, query_id); config } + +// add necessary metadata and client request setup for auditing and tracing purpose +pub fn make_request(t: T, config: ClientConfig) -> Request { + let mut request = Request::new(t); + request.set_timeout(config.get_timeout()); + let metadata = request.metadata_mut(); + let config_meta = config.get_metadata().clone(); + for (k, v) in config_meta { + let key = k + .parse::>() + .unwrap(); + metadata.insert(key, v.parse().unwrap()); + } + // metadata.insert( + // TASK_CLIENT_VERSION_NAME + // .to_string() + // .parse::>() + // .unwrap(), + // TASK_CLIENT_VERSION.to_string().parse().unwrap(), + // ); + request +} diff --git a/src/common/cloud_control/src/cloud_api.rs b/src/common/cloud_control/src/cloud_api.rs index c708eaaa99de..e82b9c169e86 100644 --- a/src/common/cloud_control/src/cloud_api.rs +++ b/src/common/cloud_control/src/cloud_api.rs @@ -19,6 +19,7 @@ use databend_common_base::base::GlobalInstance; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use crate::notification_client::NotificationClient; use crate::task_client::TaskClient; pub const CLOUD_REQUEST_TIMEOUT_SEC: u64 = 5; // 5 seconds @@ -30,6 +31,7 @@ pub const QUERY_ID: &str = "X-DATABEND-QUERY-ID"; pub struct CloudControlApiProvider { pub task_client: Arc, + pub notification_client: Arc, pub timeout: Duration, } @@ -42,10 +44,12 @@ impl CloudControlApiProvider { }; let endpoint = Self::get_endpoint(endpoint, timeout).await?; - let task_client = TaskClient::new(endpoint).await?; - + let channel = endpoint.connect_lazy(); + let task_client = TaskClient::new(channel.clone()).await?; + let notification_client = NotificationClient::new(channel).await?; Ok(Arc::new(CloudControlApiProvider { task_client, + notification_client, timeout, })) } @@ -60,7 +64,9 @@ impl CloudControlApiProvider { "Invalid cloud control Server address: {err}" )) })? - .connect_timeout(timeout); + .connect_timeout(timeout) + .tcp_nodelay(true) + .tcp_keepalive(None); Ok(endpoint) } @@ -79,6 +85,10 @@ impl CloudControlApiProvider { pub fn get_task_client(&self) -> Arc { self.task_client.clone() } + + pub fn get_notification_client(&self) -> Arc { + self.notification_client.clone() + } pub fn get_timeout(&self) -> Duration { self.timeout } diff --git a/src/common/cloud_control/src/lib.rs b/src/common/cloud_control/src/lib.rs index 3676a4128369..be22875de39d 100644 --- a/src/common/cloud_control/src/lib.rs +++ b/src/common/cloud_control/src/lib.rs @@ -14,6 +14,8 @@ pub mod client_config; pub mod cloud_api; +pub mod notification_client; +pub mod notification_utils; pub mod task_client; pub mod task_utils; @@ -24,6 +26,11 @@ pub mod task_utils; pub mod pb { // taskproto is proto package name. tonic::include_proto!("taskproto"); + tonic::include_proto!("notificationproto"); +} + +pub mod utils { + tonic::include_proto!("utils"); } pub use prost; diff --git a/src/common/cloud_control/src/notification_client.rs b/src/common/cloud_control/src/notification_client.rs new file mode 100644 index 000000000000..3490eae6bf3d --- /dev/null +++ b/src/common/cloud_control/src/notification_client.rs @@ -0,0 +1,99 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use tonic::transport::Channel; +use tonic::Request; + +use crate::pb::notification_service_client::NotificationServiceClient; +use crate::pb::CreateNotificationRequest; +use crate::pb::CreateNotificationResponse; +use crate::pb::DropNotificationRequest; +use crate::pb::DropNotificationResponse; +use crate::pb::GetNotificationRequest; +use crate::pb::GetNotificationResponse; +use crate::pb::ListNotificationRequest; +use crate::pb::ListNotificationResponse; + +pub(crate) const NOTIFICATION_CLIENT_VERSION: &str = "v1"; +pub(crate) const NOTIFICATION_CLIENT_VERSION_NAME: &str = "NOTIFICATION_CLIENT_VERSION"; +pub struct NotificationClient { + pub client: NotificationServiceClient, +} + +impl NotificationClient { + // TODO: add auth interceptor + pub async fn new( + channel: Channel, + ) -> databend_common_exception::Result> { + let client = NotificationServiceClient::new(channel); + Ok(Arc::new(NotificationClient { client })) + } + + // TODO: richer error handling on Task Error + pub async fn create_notification( + &self, + req: Request, + ) -> databend_common_exception::Result { + let mut client = self.client.clone(); + let resp = client.create_notification(req).await?; + Ok(resp.into_inner()) + } + + pub async fn drop_notification( + &self, + req: Request, + ) -> databend_common_exception::Result { + let mut client = self.client.clone(); + let resp = client.drop_notification(req).await?; + Ok(resp.into_inner()) + } + + pub async fn desc_notification( + &self, + req: Request, + ) -> databend_common_exception::Result { + let mut client = self.client.clone(); + let resp = client.get_notification(req).await?; + Ok(resp.into_inner()) + } + + pub async fn alter_notification( + &self, + req: Request, + ) -> databend_common_exception::Result { + let mut client = self.client.clone(); + let resp = client.alter_notification(req).await?; + Ok(resp.into_inner()) + } + + pub async fn list_notifications( + &self, + req: Request, + ) -> databend_common_exception::Result { + let mut client = self.client.clone(); + let resp = client.list_notification(req).await?; + Ok(resp.into_inner()) + } + + pub async fn list_notification_histories( + &self, + req: Request, + ) -> databend_common_exception::Result { + let mut client = self.client.clone(); + let resp = client.list_notification_history(req).await?; + Ok(resp.into_inner()) + } +} diff --git a/src/common/cloud_control/src/notification_utils.rs b/src/common/cloud_control/src/notification_utils.rs new file mode 100644 index 000000000000..ef696eaed9e0 --- /dev/null +++ b/src/common/cloud_control/src/notification_utils.rs @@ -0,0 +1,127 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::DateTime; +use chrono::Utc; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use serde::Deserialize; +use serde::Serialize; + +pub fn get_notification_type(raw_type: &str) -> Result { + match raw_type.to_lowercase().as_str() { + "webhook" => Ok(crate::pb::NotificationType::Webhook), + _ => Err(ErrorCode::IllegalCloudControlMessageFormat( + "Invalid notification type", + )), + } +} +pub enum NotificationParams { + Webhook(WebhookNotification), +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct WebhookNotification { + pub url: String, + pub method: Option, + pub authorization_header: Option, +} + +pub struct Notification { + pub tenant_id: String, + pub name: String, + pub id: u64, + pub enabled: bool, + pub params: NotificationParams, + pub comments: Option, + pub created_time: DateTime, + pub updated_time: DateTime, +} + +pub fn parse_timestamp(timestamp: Option) -> Result> { + match timestamp { + Some(ts) => { + let seconds = ts.seconds; + let nanos = ts.nanos; + let dt = DateTime::::from_timestamp(seconds, nanos as u32); + if dt.is_none() { + return Err(ErrorCode::IllegalCloudControlMessageFormat( + "Invalid timestamp, parsed datetime is none", + )); + } + Ok(dt.unwrap()) + } + None => Err(ErrorCode::IllegalCloudControlMessageFormat( + "Invalid timestamp", + )), + } +} + +impl TryFrom for Notification { + type Error = ErrorCode; + + fn try_from(notification: crate::pb::Notification) -> Result { + match crate::pb::NotificationType::try_from(notification.notification_type) { + Ok(crate::pb::NotificationType::Webhook) => { + Ok(Notification { + tenant_id: notification.tenant_id, + name: notification.name, + id: notification.notification_id, + enabled: notification.enabled, + params: NotificationParams::Webhook(WebhookNotification { + url: notification.webhook_url, + method: notification.webhook_method, + authorization_header: notification.webhook_authorization_header, + }), + comments: notification.comments, + // convert timestamp to DateTime + created_time: parse_timestamp(notification.created_time)?, + updated_time: parse_timestamp(notification.updated_time)?, + }) + } + _ => Err(ErrorCode::IllegalCloudControlMessageFormat( + "Unimplemented notification type", + )), + } + } +} + +pub struct NotificationHistory { + pub created_time: DateTime, + pub processed_time: Option>, + pub message_source: String, + pub name: String, + pub message: String, + pub status: String, + pub error_message: String, +} + +impl TryFrom for NotificationHistory { + type Error = ErrorCode; + + fn try_from(history: crate::pb::NotificationHistory) -> Result { + Ok(NotificationHistory { + created_time: parse_timestamp(history.created_time)?, + processed_time: match history.processed_time { + Some(ts) => Some(parse_timestamp(Some(ts))?), + None => None, + }, + message_source: history.message_source, + name: history.name, + message: history.message, + status: history.status, + error_message: history.error_message, + }) + } +} diff --git a/src/common/cloud_control/src/task_client.rs b/src/common/cloud_control/src/task_client.rs index 7df70332f922..cd5a66bb4f33 100644 --- a/src/common/cloud_control/src/task_client.rs +++ b/src/common/cloud_control/src/task_client.rs @@ -16,10 +16,8 @@ use std::sync::Arc; use databend_common_exception::Result; use tonic::transport::Channel; -use tonic::transport::Endpoint; use tonic::Request; -use crate::client_config::ClientConfig; use crate::pb::task_service_client::TaskServiceClient; use crate::pb::AlterTaskRequest; use crate::pb::AlterTaskResponse; @@ -34,39 +32,16 @@ use crate::pb::ExecuteTaskResponse; use crate::pb::ShowTasksRequest; use crate::pb::ShowTasksResponse; -const TASK_CLIENT_VERSION: &str = "v1"; -const TASK_CLIENT_VERSION_NAME: &str = "TASK_CLIENT_VERSION"; +pub(crate) const TASK_CLIENT_VERSION: &str = "v1"; +pub(crate) const TASK_CLIENT_VERSION_NAME: &str = "TASK_CLIENT_VERSION"; pub struct TaskClient { pub task_client: TaskServiceClient, } -// add necessary metadata and client request setup for auditing and tracing purpose -pub fn make_request(t: T, config: ClientConfig) -> Request { - let mut request = Request::new(t); - request.set_timeout(config.get_timeout()); - let metadata = request.metadata_mut(); - let config_meta = config.get_metadata().clone(); - for (k, v) in config_meta { - let key = k - .parse::>() - .unwrap(); - metadata.insert(key, v.parse().unwrap()); - } - metadata.insert( - TASK_CLIENT_VERSION_NAME - .to_string() - .parse::>() - .unwrap(), - TASK_CLIENT_VERSION.to_string().parse().unwrap(), - ); - request -} - impl TaskClient { // TODO: add auth interceptor - pub async fn new(endpoint: Endpoint) -> Result> { - let channel = endpoint.connect_lazy(); + pub async fn new(channel: Channel) -> Result> { let task_client = TaskServiceClient::new(channel); Ok(Arc::new(TaskClient { task_client })) } diff --git a/src/common/cloud_control/src/task_utils.rs b/src/common/cloud_control/src/task_utils.rs index be8b17cfcad5..936aeba013eb 100644 --- a/src/common/cloud_control/src/task_utils.rs +++ b/src/common/cloud_control/src/task_utils.rs @@ -74,6 +74,7 @@ pub struct Task { pub warehouse_options: Option, pub next_scheduled_at: Option>, pub suspend_task_after_num_failures: Option, + pub error_integration: Option, pub status: Status, pub created_at: DateTime, pub updated_at: DateTime, @@ -202,6 +203,7 @@ impl TryFrom for Task { next_scheduled_at, last_suspended_at, suspend_task_after_num_failures: value.suspend_task_after_num_failures, + error_integration: value.error_integration, status, created_at, updated_at, diff --git a/src/common/cloud_control/tests/it/task_client.rs b/src/common/cloud_control/tests/it/task_client.rs index 51f0e0d12f96..3dfcf3bff64e 100644 --- a/src/common/cloud_control/tests/it/task_client.rs +++ b/src/common/cloud_control/tests/it/task_client.rs @@ -81,6 +81,7 @@ impl TaskService for MockTaskService { created_at: Default::default(), updated_at: Default::default(), last_suspended_at: None, + error_integration: None, after: vec![], when_condition: None, session_parameters: Default::default(), @@ -193,6 +194,7 @@ async fn test_task_client_success_cases() -> Result<()> { owner: "".to_string(), comment: None, schedule_options: None, + error_integration: None, warehouse_options: None, suspend_task_after_num_failures: None, if_not_exist: false, diff --git a/src/query/ast/src/ast/statements/mod.rs b/src/query/ast/src/ast/statements/mod.rs index a0482e2f349b..e4a2118d7a55 100644 --- a/src/query/ast/src/ast/statements/mod.rs +++ b/src/query/ast/src/ast/statements/mod.rs @@ -28,6 +28,7 @@ mod kill; mod lock; mod merge_into; mod network_policy; +mod notification; mod password_policy; mod pipe; mod presign; @@ -62,6 +63,7 @@ pub use kill::*; pub use lock::*; pub use merge_into::*; pub use network_policy::*; +pub use notification::*; pub use password_policy::*; pub use pipe::*; pub use presign::*; diff --git a/src/query/ast/src/ast/statements/notification.rs b/src/query/ast/src/ast/statements/notification.rs new file mode 100644 index 000000000000..7844bb3524bb --- /dev/null +++ b/src/query/ast/src/ast/statements/notification.rs @@ -0,0 +1,209 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; +use std::fmt::Formatter; + +use derive_visitor::Drive; +use derive_visitor::DriveMut; + +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct CreateNotificationStmt { + #[drive(skip)] + pub if_not_exists: bool, + #[drive(skip)] + pub name: String, + #[drive(skip)] + pub notification_type: String, + #[drive(skip)] + pub enabled: bool, + pub webhook_opts: Option, + #[drive(skip)] + pub comments: String, +} + +impl Display for CreateNotificationStmt { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "CREATE NOTIFICATION INTEGRATION")?; + if self.if_not_exists { + write!(f, " IF NOT EXISTS")?; + } + write!(f, " {}", self.name)?; + write!(f, " TYPE = {}", self.notification_type)?; + if let Some(webhook_opts) = &self.webhook_opts { + write!(f, " {}", webhook_opts)?; + } + if !self.comments.is_empty() { + write!(f, " COMMENTS = '{}'", self.comments)?; + } + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)] +pub struct NotificationWebhookOptions { + #[drive(skip)] + pub url: Option, + #[drive(skip)] + pub method: Option, + #[drive(skip)] + pub authorization_header: Option, +} + +impl Display for NotificationWebhookOptions { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let NotificationWebhookOptions { + url, + method, + authorization_header, + } = self; + { + write!(f, " WEBHOOK = (")?; + if let Some(url) = url { + write!(f, "URL = '{}'", url)?; + } + if let Some(method) = method { + write!(f, " METHOD = '{}'", method)?; + } + if let Some(authorization_header) = authorization_header { + write!(f, " AUTHORIZATION_HEADER = '{}'", authorization_header)?; + } + write!(f, " )")?; + Ok(()) + } + } +} + +impl FromIterator<(String, String)> for NotificationWebhookOptions { + fn from_iter>(iter: T) -> Self { + let mut url = None; + let mut method = None; + let mut authorization_header = None; + for (k, v) in iter { + match k.to_uppercase().as_str() { + "URL" => url = Some(v), + "METHOD" => method = Some(v), + "AUTHORIZATION_HEADER" => authorization_header = Some(v), + _ => {} + } + } + NotificationWebhookOptions { + url, + method, + authorization_header, + } + } +} + +// drop notification +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct DropNotificationStmt { + #[drive(skip)] + pub if_exists: bool, + #[drive(skip)] + pub name: String, +} + +impl Display for DropNotificationStmt { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "DROP NOTIFICATION INTEGRATION")?; + if self.if_exists { + write!(f, " IF EXISTS")?; + } + write!(f, " {}", self.name) + } +} + +// alter notification +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct AlterNotificationStmt { + #[drive(skip)] + pub if_exists: bool, + #[drive(skip)] + pub name: String, + #[drive(skip)] + pub options: AlterNotificationOptions, +} +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub enum AlterNotificationOptions { + Set(AlterNotificationSetOptions), +} +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct AlterNotificationSetOptions { + #[drive(skip)] + pub enabled: Option, + pub webhook_opts: Option, + #[drive(skip)] + pub comments: Option, +} + +impl AlterNotificationSetOptions { + pub fn enabled(enabled: bool) -> Self { + AlterNotificationSetOptions { + enabled: Some(enabled), + webhook_opts: None, + comments: None, + } + } + + pub fn webhook_opts(webhook_opts: NotificationWebhookOptions) -> Self { + AlterNotificationSetOptions { + enabled: None, + webhook_opts: Some(webhook_opts), + comments: None, + } + } + + pub fn comments(comments: String) -> Self { + AlterNotificationSetOptions { + enabled: None, + webhook_opts: None, + comments: Some(comments), + } + } +} + +impl Display for AlterNotificationStmt { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "ALTER NOTIFICATION INTEGRATION {}", self.name)?; + match &self.options { + AlterNotificationOptions::Set(set_opts) => { + write!(f, " SET ")?; + if let Some(enabled) = set_opts.enabled { + write!(f, "ENABLED = {}", enabled)?; + } + if let Some(webhook_opts) = &set_opts.webhook_opts { + write!(f, " {}", webhook_opts)?; + } + if let Some(comments) = &set_opts.comments { + write!(f, " COMMENTS = '{}'", comments)?; + } + } + } + Ok(()) + } +} + +// describe notification +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct DescribeNotificationStmt { + #[drive(skip)] + pub name: String, +} + +impl Display for DescribeNotificationStmt { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "DESCRIBE NOTIFICATION INTEGRATION {}", self.name) + } +} diff --git a/src/query/ast/src/ast/statements/statement.rs b/src/query/ast/src/ast/statements/statement.rs index 8b938d7b464a..3fa1945f0b6e 100644 --- a/src/query/ast/src/ast/statements/statement.rs +++ b/src/query/ast/src/ast/statements/statement.rs @@ -314,6 +314,12 @@ pub enum Statement { Begin, Commit, Abort, + + // Notifications + CreateNotification(CreateNotificationStmt), + AlterNotification(AlterNotificationStmt), + DropNotification(DropNotificationStmt), + DescribeNotification(DescribeNotificationStmt), } #[derive(Debug, Clone, PartialEq)] @@ -702,6 +708,10 @@ impl Display for Statement { Statement::Begin => write!(f, "BEGIN")?, Statement::Commit => write!(f, "COMMIT")?, Statement::Abort => write!(f, "ABORT")?, + Statement::CreateNotification(stmt) => write!(f, "{stmt}")?, + Statement::AlterNotification(stmt) => write!(f, "{stmt}")?, + Statement::DropNotification(stmt) => write!(f, "{stmt}")?, + Statement::DescribeNotification(stmt) => write!(f, "{stmt}")?, } Ok(()) } diff --git a/src/query/ast/src/ast/statements/task.rs b/src/query/ast/src/ast/statements/task.rs index 31558ca596a3..582bc56636a8 100644 --- a/src/query/ast/src/ast/statements/task.rs +++ b/src/query/ast/src/ast/statements/task.rs @@ -33,6 +33,9 @@ pub struct CreateTaskStmt { pub session_parameters: BTreeMap, #[drive(skip)] pub suspend_task_after_num_failures: Option, + // notification_integration name for error + #[drive(skip)] + pub error_integration: Option, #[drive(skip)] pub comments: String, #[drive(skip)] @@ -71,11 +74,18 @@ impl Display for CreateTaskStmt { } if !self.after.is_empty() { - write!(f, "AFTER = '{:?}'", self.after)?; + write!(f, " AFTER = '{:?}'", self.after)?; } if self.when_condition.is_some() { - write!(f, "WHEN = '{:?}'", self.when_condition)?; + write!(f, " WHEN = '{:?}'", self.when_condition)?; + } + if self.error_integration.is_some() { + write!( + f, + " ERROR INTEGRATION = '{}'", + self.error_integration.as_ref().unwrap() + )?; } write!(f, " AS {}", self.sql)?; @@ -144,6 +154,8 @@ pub enum AlterTaskOptions { comments: Option, #[drive(skip)] session_parameters: Option>, + #[drive(skip)] + error_integration: Option, }, Unset { #[drive(skip)] @@ -166,6 +178,7 @@ impl Display for AlterTaskOptions { schedule, suspend_task_after_num_failures, session_parameters, + error_integration, comments, } => { if let Some(wh) = warehouse { @@ -174,6 +187,9 @@ impl Display for AlterTaskOptions { if let Some(schedule) = schedule { write!(f, " SET {}", schedule)?; } + if let Some(error_integration) = error_integration { + write!(f, " ERROR INTEGRATION = '{}'", error_integration)?; + } if let Some(num) = suspend_task_after_num_failures { write!(f, " SUSPEND TASK AFTER {} FAILURES", num)?; } diff --git a/src/query/ast/src/ast/visitors/visitor.rs b/src/query/ast/src/ast/visitors/visitor.rs index 408f3ef0088f..5df4293b50aa 100644 --- a/src/query/ast/src/ast/visitors/visitor.rs +++ b/src/query/ast/src/ast/visitors/visitor.rs @@ -708,6 +708,10 @@ pub trait Visitor<'ast>: Sized { fn visit_alter_task(&mut self, _stmt: &'ast AlterTaskStmt) {} + fn visit_create_notification(&mut self, _stmt: &'ast CreateNotificationStmt) {} + fn visit_drop_notification(&mut self, _stmt: &'ast DropNotificationStmt) {} + fn visit_describe_notification(&mut self, _stmt: &'ast DescribeNotificationStmt) {} + fn visit_alter_notification(&mut self, _stmt: &'ast AlterNotificationStmt) {} fn visit_with(&mut self, with: &'ast With) { let With { ctes, .. } = with; for cte in ctes.iter() { diff --git a/src/query/ast/src/ast/visitors/visitor_mut.rs b/src/query/ast/src/ast/visitors/visitor_mut.rs index de73656e2aac..3c8045c54243 100644 --- a/src/query/ast/src/ast/visitors/visitor_mut.rs +++ b/src/query/ast/src/ast/visitors/visitor_mut.rs @@ -721,6 +721,12 @@ pub trait VisitorMut: Sized { fn visit_alter_task(&mut self, _stmt: &mut AlterTaskStmt) {} + // notification + fn visit_create_notification(&mut self, _stmt: &mut CreateNotificationStmt) {} + fn visit_drop_notification(&mut self, _stmt: &mut DropNotificationStmt) {} + fn visit_alter_notification(&mut self, _stmt: &mut AlterNotificationStmt) {} + fn visit_describe_notification(&mut self, _stmt: &mut DescribeNotificationStmt) {} + fn visit_with(&mut self, with: &mut With) { let With { ctes, .. } = with; for cte in ctes.iter_mut() { diff --git a/src/query/ast/src/ast/visitors/walk.rs b/src/query/ast/src/ast/visitors/walk.rs index 00c0f5293b25..7723161d2396 100644 --- a/src/query/ast/src/ast/visitors/walk.rs +++ b/src/query/ast/src/ast/visitors/walk.rs @@ -549,6 +549,10 @@ pub fn walk_statement<'a, V: Visitor<'a>>(visitor: &mut V, statement: &'a Statem Statement::AlterPipe(_) => todo!(), Statement::DropPipe(_) => todo!(), Statement::DescribePipe(_) => todo!(), + Statement::CreateNotification(stmt) => visitor.visit_create_notification(stmt), + Statement::AlterNotification(stmt) => visitor.visit_alter_notification(stmt), + Statement::DropNotification(stmt) => visitor.visit_drop_notification(stmt), + Statement::DescribeNotification(stmt) => visitor.visit_describe_notification(stmt), Statement::Begin => {} Statement::Commit => {} Statement::Abort => {} diff --git a/src/query/ast/src/ast/visitors/walk_mut.rs b/src/query/ast/src/ast/visitors/walk_mut.rs index 28f7edfe3491..9344d54658a1 100644 --- a/src/query/ast/src/ast/visitors/walk_mut.rs +++ b/src/query/ast/src/ast/visitors/walk_mut.rs @@ -550,5 +550,9 @@ pub fn walk_statement_mut(visitor: &mut V, statement: &mut Statem Statement::Begin => {} Statement::Commit => {} Statement::Abort => {} + Statement::CreateNotification(stmt) => visitor.visit_create_notification(stmt), + Statement::AlterNotification(stmt) => visitor.visit_alter_notification(stmt), + Statement::DropNotification(stmt) => visitor.visit_drop_notification(stmt), + Statement::DescribeNotification(stmt) => visitor.visit_describe_notification(stmt), } } diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index 110fe4c296c8..d317fc73f894 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -115,6 +115,7 @@ pub fn statement(i: Input) -> IResult { ~ (AFTER ~ #comma_separated_list0(literal_string))? ~ (WHEN ~ #expr )? ~ (SUSPEND_TASK_AFTER_NUM_FAILURES ~ "=" ~ #literal_u64)? + ~ ( ERROR_INTEGRATION ~ ^"=" ~ ^#literal_string )? ~ ( (COMMENT | COMMENTS) ~ ^"=" ~ ^#literal_string )? ~ (#set_table_option)? ~ AS ~ #statement @@ -129,6 +130,7 @@ pub fn statement(i: Input) -> IResult { after_tasks, when_conditions, suspend_opt, + error_integration, comment_opt, session_opts, _, @@ -147,6 +149,7 @@ pub fn statement(i: Input) -> IResult { Some((_, tasks)) => tasks, None => Vec::new(), }, + error_integration: error_integration.map(|(_, _, name)| name.to_string()), when_condition: when_conditions.map(|(_, cond)| cond.to_string()), sql, session_parameters: session_opts, @@ -1894,6 +1897,79 @@ pub fn statement(i: Input) -> IResult { }) }, ); + let create_notification = map( + rule! { + CREATE ~ NOTIFICATION ~ INTEGRATION ~ ( IF ~ ^NOT ~ ^EXISTS )? + ~ #ident + ~ TYPE ~ "=" ~ #ident + ~ ENABLED ~ "=" ~ #literal_bool + ~ (#notification_webhook_clause)? + ~ ( (COMMENT | COMMENTS) ~ ^"=" ~ ^#literal_string )? + }, + |( + _, + _, + _, + if_not_exists, + name, + _, + _, + notification_type, + _, + _, + enabled, + webhook, + comment, + )| { + Statement::CreateNotification(CreateNotificationStmt { + if_not_exists: if_not_exists.is_some(), + name: name.to_string(), + notification_type: notification_type.to_string(), + enabled, + webhook_opts: webhook, + comments: comment.map(|v| v.2).unwrap_or_default(), + }) + }, + ); + + let drop_notification = map( + rule! { + DROP ~ NOTIFICATION ~ INTEGRATION ~ ( IF ~ ^EXISTS )? + ~ #ident + }, + |(_, _, _, if_exists, name)| { + Statement::DropNotification(DropNotificationStmt { + if_exists: if_exists.is_some(), + name: name.to_string(), + }) + }, + ); + + let alter_notification = map( + rule! { + ALTER ~ NOTIFICATION ~ INTEGRATION ~ ( IF ~ ^EXISTS )? + ~ #ident + ~ #alter_notification_options + }, + |(_, _, _, if_exists, name, options)| { + Statement::AlterNotification(AlterNotificationStmt { + if_exists: if_exists.is_some(), + name: name.to_string(), + options, + }) + }, + ); + + let desc_notification = map( + rule! { + ( DESC | DESCRIBE ) ~ NOTIFICATION ~ INTEGRATION ~ #ident + }, + |(_, _, _, name)| { + Statement::DescribeNotification(DescribeNotificationStmt { + name: name.to_string(), + }) + }, + ); let begin = value(Statement::Begin, rule! { BEGIN }); let commit = value(Statement::Commit, rule! { COMMIT }); @@ -2068,6 +2144,7 @@ pub fn statement(i: Input) -> IResult { [ AFTER , ...] [ WHEN boolean_expr ] [ SUSPEND_TASK_AFTER_NUM_FAILURES = ] + [ ERROR_INTEGRATION = ] [ COMMENT = '' ] AS `" @@ -2086,7 +2163,14 @@ AS | #drop_pipe : "`DROP PIPE [ IF EXISTS ] `" | #alter_pipe : "`ALTER PIPE [ IF EXISTS ] SET