Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su committed Jun 17, 2024
1 parent 7ee917c commit fef9e6b
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 14 deletions.
14 changes: 12 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ message StreamSourceInfo {
map<string, string> format_encode_options = 14;

// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, uint32> secret_ref = 16;
map<string, SecretRef> secret_ref = 16;

Check failure on line 87 in proto/catalog.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "value" on message "SecretRefEntry" changed type from "uint32" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message Source {
Expand Down Expand Up @@ -181,7 +181,7 @@ message Sink {
CreateType create_type = 24;

// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, uint32> secret_ref = 25;
map<string, SecretRef> secret_ref = 25;

Check failure on line 184 in proto/catalog.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "value" on message "SecretRefEntry" changed type from "uint32" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message Subscription {
Expand Down Expand Up @@ -441,3 +441,13 @@ message Secret {
uint32 owner = 5;
uint32 schema_id = 6;
}

message SecretRef {
enum RefAsType {
BYTES = 0;
// AS FILE
FILE = 1;
}
uint32 secret_id = 1;
RefAsType ref_as = 2;
}
3 changes: 2 additions & 1 deletion src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::catalog::{
ColumnCatalog, ConnectionId, CreateType, DatabaseId, SchemaId, TableId, UserId,
};
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::PbSecretRef;
use risingwave_pb::stream_plan::PbSinkDesc;

use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType};
Expand Down Expand Up @@ -83,7 +84,7 @@ impl SinkDesc {
owner: UserId,
connection_id: Option<ConnectionId>,
dependent_relations: Vec<TableId>,
secret_ref: BTreeMap<String, u32>,
secret_ref: BTreeMap<String, PbSecretRef>,
) -> SinkCatalog {
SinkCatalog {
id: self.id,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::catalog::{
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{
PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
PbCreateType, PbSecretRef, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
};

use super::{
Expand Down Expand Up @@ -339,7 +339,7 @@ pub struct SinkCatalog {
pub create_type: CreateType,

/// The secret reference for the sink, mapping from property name to secret id.
pub secret_ref: BTreeMap<String, u32>,
pub secret_ref: BTreeMap<String, PbSecretRef>,
}

impl SinkCatalog {
Expand Down
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,12 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
*id = *connection_rewrite.get(id).unwrap();
}
for secret_id in s.secret_ref.values_mut() {
*secret_id = *secret_rewrite.get(secret_id).unwrap();
secret_id.secret_id = *secret_rewrite.get(&secret_id.secret_id).unwrap();
}
object_dependencies.extend(s.secret_ref.values().map(|id| {
object_dependency::ActiveModel {
id: NotSet,
oid: Set(*id as _),
oid: Set(id.secret_id as _),
used_by: Set(s.id as _),
}
}));
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_connector::source::kafka::private_link::{
insert_privatelink_broker_rewrite_map, CONNECTION_NAME_KEY, PRIVATELINK_ENDPOINT_KEY,
};
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::PbSecretRef;
use risingwave_sqlparser::ast::{
CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement,
CreateSubscriptionStatement, SqlOption, Statement, Value,
Expand Down Expand Up @@ -119,7 +120,7 @@ impl WithOptions {
pub(crate) fn resolve_secret_in_with_options(
_with_options: &mut WithOptions,
_session: &SessionImpl,
) -> RwResult<BTreeMap<String, u32>> {
) -> RwResult<BTreeMap<String, PbSecretRef>> {
// todo: implement the function and take `resolve_privatelink_in_with_option` as reference

Ok(BTreeMap::new())
Expand Down
4 changes: 2 additions & 2 deletions src/meta/model_v2/migration/src/m20240525_090457_secret.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl MigrationTrait for Migration {
.alter_table(
MigrationTable::alter()
.table(Sink::Table)
.add_column(ColumnDef::new(Sink::SecretRef).json_binary())
.add_column(ColumnDef::new(Sink::SecretRef).blob(BlobSize::Long))
.to_owned(),
)
.await?;
Expand All @@ -52,7 +52,7 @@ impl MigrationTrait for Migration {
.alter_table(
MigrationTable::alter()
.table(Source::Table)
.add_column(ColumnDef::new(Source::SecretRef).json_binary())
.add_column(ColumnDef::new(Source::SecretRef).blob(BlobSize::Long))
.to_owned(),
)
.await?;
Expand Down
53 changes: 51 additions & 2 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::BTreeMap;

use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
use risingwave_pb::catalog::{PbCreateType, PbSecretRef, PbStreamJobStatus};
use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
use risingwave_pb::stream_plan::PbStreamNode;
use sea_orm::entity::prelude::*;
Expand Down Expand Up @@ -258,6 +258,55 @@ macro_rules! derive_array_from_blob {
};
}

macro_rules! derive_btreemap_from_blob {
($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
#[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
pub struct $struct_name(#[sea_orm] Vec<u8>);

#[derive(Clone, PartialEq, ::prost::Message)]
pub struct $field_type {
#[prost(btree_map = "string, message")]
inner: BTreeMap<$key_type, $value_type>,
}
impl Eq for $field_type {}

impl $struct_name {
pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
data.inner
}

fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
}
}

impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
Self::from_protobuf(value)
}
}

impl std::fmt::Debug for $struct_name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.to_protobuf().fmt(f)
}
}

impl Default for $struct_name {
fn default() -> Self {
Self(vec![])
}
}

impl sea_orm::sea_query::Nullable for $struct_name {
fn null() -> Value {
Value::Bytes(None)
}
}
};
}

pub(crate) use {derive_array_from_blob, derive_from_blob};

derive_from_json_struct!(I32Array, Vec<i32>);
Expand Down Expand Up @@ -286,7 +335,7 @@ impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
}
}

derive_from_json_struct!(SecretRef, BTreeMap<String, u32>);
derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);

derive_from_blob!(StreamNode, PbStreamNode);
derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ impl From<ObjectModel<source::Model>> for PbSource {

impl From<ObjectModel<sink::Model>> for PbSink {
fn from(value: ObjectModel<sink::Model>) -> Self {
let mut secret_ref_map: BTreeMap<String, u32> = BTreeMap::new();
let mut secret_ref_map = BTreeMap::new();
if let Some(secret_ref) = value.0.secret_ref {
secret_ref_map = secret_ref.into_inner();
secret_ref_map = secret_ref.to_protobuf();
}
Self {
id: value.0.sink_id as _,
Expand Down
2 changes: 2 additions & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
".plan_common.ExternalTableDesc",
".hummock.CompactTask",
".catalog.StreamSourceInfo",
".catalog.SecretRef",
".catalog.Source",
".catalog.Sink",
".catalog.View",
Expand Down Expand Up @@ -111,6 +112,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// The requirement is from Source node -> SourceCatalog -> WatermarkDesc -> expr
.type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]")
.type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]")
.type_attribute("catalog.SecretRef", "#[derive(Eq, Hash)]")
.type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]")
.type_attribute("data.DataType", "#[derive(Eq, Hash)]")
.type_attribute("expr.ExprNode.rex_node", "#[derive(Eq, Hash)]")
Expand Down

0 comments on commit fef9e6b

Please sign in to comment.