Skip to content

Commit

Permalink
feat: support notification syntax with rpc, and support task error no…
Browse files Browse the repository at this point in the history
…tification integration (databendlabs#14845)

* feat: support notification rpc and task notification

* feat: remove external library protoc dependencies

* chore: apply comment rename utils.proto to timestamp.proto

---------

Co-authored-by: Bohu <[email protected]>
  • Loading branch information
ZhiHanZ and BohuTANG authored Mar 6, 2024
1 parent 134eb9c commit 6547e14
Show file tree
Hide file tree
Showing 71 changed files with 3,354 additions and 556 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,7 @@ clean:

genproto:
python -m grpc_tools.protoc -Isrc/common/cloud_control/proto/ --python_out=tests/cloud_control_server/ --grpc_python_out=tests/cloud_control_server/ src/common/cloud_control/proto/task.proto
python -m grpc_tools.protoc -Isrc/common/cloud_control/proto/ --python_out=tests/cloud_control_server/ --grpc_python_out=tests/cloud_control_server/ src/common/cloud_control/proto/notification.proto
python -m grpc_tools.protoc -Isrc/common/cloud_control/proto/ --python_out=tests/cloud_control_server/ --grpc_python_out=tests/cloud_control_server/ src/common/cloud_control/proto/timestamp.proto

.PHONY: setup test run build fmt lint clean docs
2 changes: 1 addition & 1 deletion src/common/cloud_control/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ chrono = { workspace = true }
databend-common-base = { path = "../base" }
databend-common-exception = { path = "../exception" }
prost = { workspace = true }
serde = { workspace = true }
tonic = { workspace = true }

[build-dependencies]
lenient_semver = "0.4.2"
prost-build = { workspace = true }
Expand Down
132 changes: 132 additions & 0 deletions src/common/cloud_control/proto/notification.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
syntax = "proto3";
option go_package = "databend.com/cloudcontrol/notification/proto";
import "timestamp.proto";

package notificationproto;

message NotificationError {
string kind = 1;
string message = 2;
int32 code = 3;
}
enum NotificationType {
WEBHOOK = 0;
}
message CreateNotificationRequest {

string tenant_id = 1;
bool if_not_exists = 2;
string name = 3;
NotificationType notification_type = 4;
bool enabled = 6;
string webhook_url = 7;
optional string webhook_method = 8;
optional string webhook_authorization_header = 9;

optional string comments = 90;
}

message CreateNotificationResponse {
optional NotificationError error = 1;
uint64 notification_id = 2;
}

message DropNotificationRequest {
string tenant_id = 1;
string name = 2;
bool if_exists = 3;
}

message DropNotificationResponse {
optional NotificationError error = 1;
}

message Notification {
uint64 notification_id = 1;
string tenant_id = 2;
string name = 3;
NotificationType notification_type = 4;
bool enabled = 5;
string webhook_url = 6;
optional string webhook_method = 7;
optional string webhook_authorization_header = 8;

optional string comments = 90;
utils.Timestamp created_time = 91;
string created_by = 92;
utils.Timestamp updated_time = 93;
string updated_by = 94;
}

message ListNotificationRequest {
string tenant_id = 1;
}

message ListNotificationResponse {
optional NotificationError error = 1;
repeated Notification notifications = 5;
}

message GetNotificationRequest {
string tenant_id = 1;
string name = 2;
}

message GetNotificationResponse {
optional NotificationError error = 1;
Notification notification = 5;
}

message AlterNotificationRequest {
string tenant_id = 1;
string name = 2;
string operation_type = 3;
optional bool enabled = 4;
optional string webhook_url = 5;
optional string webhook_method = 6;
optional string webhook_authorization_header = 7;
optional string comments = 8;
}

message AlterNotificationResponse {
optional NotificationError error = 1;
uint64 notification_id = 2;
}

message NotificationHistory {
utils.Timestamp created_time = 1;
utils.Timestamp processed_time = 2;
string message_source = 3;
string name = 4;
string message = 5;
string status = 6;
string error_message = 7; // if notification failed, provide failed message
}

message ListNotificationHistoryRequest {
string tenant_id = 1;
optional string notification_name = 2;
optional utils.Timestamp start_time = 3;
optional utils.Timestamp end_time = 4;
optional int32 result_limit = 5;
optional int32 page_size = 6; // 100 by default
optional int64 next_page_token = 7;
optional int64 previous_page_token = 8;
}

message ListNotificationHistoryResponse {
optional NotificationError error = 1;
repeated NotificationHistory notification_histories = 5;
// you can pass the next_page_token to the list request to get the next page. so does previous_page_token.
int64 next_page_token = 6;
int64 previous_page_token = 7;
}

service NotificationService {
rpc CreateNotification(CreateNotificationRequest) returns (CreateNotificationResponse);
rpc DropNotification(DropNotificationRequest) returns (DropNotificationResponse);
rpc ListNotification(ListNotificationRequest) returns (ListNotificationResponse);
rpc GetNotification(GetNotificationRequest) returns (GetNotificationResponse);
rpc AlterNotification(AlterNotificationRequest) returns (AlterNotificationResponse);
rpc ListNotificationHistory(ListNotificationHistoryRequest) returns (ListNotificationHistoryResponse);
}
3 changes: 3 additions & 0 deletions src/common/cloud_control/proto/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ message CreateTaskRequest {
repeated string after = 11; //
optional string when_condition = 12;
map<string, string> session_parameters = 13;
optional string error_integration = 14;
}

message TaskError {
Expand Down Expand Up @@ -92,6 +93,7 @@ message Task {
repeated string after = 17;
optional string when_condition = 18;
map<string, string> session_parameters = 19;
optional string error_integration = 20;
}


Expand Down Expand Up @@ -138,6 +140,7 @@ message AlterTaskRequest {
repeated string remove_after = 13;
bool set_session_parameters = 14;
map<string, string> session_parameters = 15;
optional string error_integration = 16;
}

message AlterTaskResponse {
Expand Down
16 changes: 16 additions & 0 deletions src/common/cloud_control/proto/timestamp.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";
package utils;


message Timestamp {
// Represents seconds of UTC time since Unix epoch
// 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to
// 9999-12-31T23:59:59Z inclusive.
int64 seconds = 1;

// Non-negative fractions of a second at nanosecond resolution. Negative
// second values with fractions must still have non-negative nanos values
// that count forward in time. Must be from 0 to 999,999,999
// inclusive.
int32 nanos = 2;
}
37 changes: 37 additions & 0 deletions src/common/cloud_control/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::fmt::Debug;
use std::time::Duration;

use tonic::Request;

use crate::cloud_api::CLOUD_REQUEST_TIMEOUT_SEC;
use crate::cloud_api::QUERY_ID;
use crate::cloud_api::REQUESTER;
Expand Down Expand Up @@ -42,6 +44,19 @@ impl ClientConfig {
self.metadata.push((key.into(), value.into()));
}

pub fn add_task_version_info(&mut self) {
self.add_metadata(
crate::task_client::TASK_CLIENT_VERSION_NAME,
crate::task_client::TASK_CLIENT_VERSION,
);
}

pub fn add_notification_version_info(&mut self) {
self.add_metadata(
crate::notification_client::NOTIFICATION_CLIENT_VERSION_NAME,
crate::notification_client::NOTIFICATION_CLIENT_VERSION,
);
}
pub fn get_metadata(&self) -> &Vec<(String, String)> {
&self.metadata
}
Expand Down Expand Up @@ -72,3 +87,25 @@ pub fn build_client_config(
config.add_metadata(QUERY_ID, query_id);
config
}

// add necessary metadata and client request setup for auditing and tracing purpose
pub fn make_request<T>(t: T, config: ClientConfig) -> Request<T> {
let mut request = Request::new(t);
request.set_timeout(config.get_timeout());
let metadata = request.metadata_mut();
let config_meta = config.get_metadata().clone();
for (k, v) in config_meta {
let key = k
.parse::<tonic::metadata::MetadataKey<tonic::metadata::Ascii>>()
.unwrap();
metadata.insert(key, v.parse().unwrap());
}
// metadata.insert(
// TASK_CLIENT_VERSION_NAME
// .to_string()
// .parse::<tonic::metadata::MetadataKey<tonic::metadata::Ascii>>()
// .unwrap(),
// TASK_CLIENT_VERSION.to_string().parse().unwrap(),
// );
request
}
16 changes: 13 additions & 3 deletions src/common/cloud_control/src/cloud_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_base::base::GlobalInstance;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

use crate::notification_client::NotificationClient;
use crate::task_client::TaskClient;

pub const CLOUD_REQUEST_TIMEOUT_SEC: u64 = 5; // 5 seconds
Expand All @@ -30,6 +31,7 @@ pub const QUERY_ID: &str = "X-DATABEND-QUERY-ID";

pub struct CloudControlApiProvider {
pub task_client: Arc<TaskClient>,
pub notification_client: Arc<NotificationClient>,
pub timeout: Duration,
}

Expand All @@ -42,10 +44,12 @@ impl CloudControlApiProvider {
};

let endpoint = Self::get_endpoint(endpoint, timeout).await?;
let task_client = TaskClient::new(endpoint).await?;

let channel = endpoint.connect_lazy();
let task_client = TaskClient::new(channel.clone()).await?;
let notification_client = NotificationClient::new(channel).await?;
Ok(Arc::new(CloudControlApiProvider {
task_client,
notification_client,
timeout,
}))
}
Expand All @@ -60,7 +64,9 @@ impl CloudControlApiProvider {
"Invalid cloud control Server address: {err}"
))
})?
.connect_timeout(timeout);
.connect_timeout(timeout)
.tcp_nodelay(true)
.tcp_keepalive(None);

Ok(endpoint)
}
Expand All @@ -79,6 +85,10 @@ impl CloudControlApiProvider {
pub fn get_task_client(&self) -> Arc<TaskClient> {
self.task_client.clone()
}

pub fn get_notification_client(&self) -> Arc<NotificationClient> {
self.notification_client.clone()
}
pub fn get_timeout(&self) -> Duration {
self.timeout
}
Expand Down
7 changes: 7 additions & 0 deletions src/common/cloud_control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

pub mod client_config;
pub mod cloud_api;
pub mod notification_client;
pub mod notification_utils;
pub mod task_client;
pub mod task_utils;

Expand All @@ -24,6 +26,11 @@ pub mod task_utils;
pub mod pb {
// taskproto is proto package name.
tonic::include_proto!("taskproto");
tonic::include_proto!("notificationproto");
}

pub mod utils {
tonic::include_proto!("utils");
}

pub use prost;
Loading

0 comments on commit 6547e14

Please sign in to comment.