Skip to content

Commit

Permalink
fix(secret): add a secret ref column to source (#17269)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su authored Jun 17, 2024
1 parent d555b5c commit d5e10d8
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 19 deletions.
19 changes: 15 additions & 4 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ message StreamSourceInfo {
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;

// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, uint32> secret_ref = 16;
// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, SecretRef> secret_ref = 16;
}

message Source {
Expand Down Expand Up @@ -180,8 +180,8 @@ message Sink {
// Whether it should use background ddl or block until backfill finishes.
CreateType create_type = 24;

// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, uint32> secret_ref = 25;
// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, SecretRef> secret_ref = 25;
}

message Subscription {
Expand Down Expand Up @@ -450,3 +450,14 @@ message Secret {
uint32 owner = 5;
uint32 schema_id = 6;
}

message SecretRef {
enum RefAsType {
UNSPECIFIED = 0;
TEXT = 1;
// AS FILE
FILE = 2;
}
uint32 secret_id = 1;
RefAsType ref_as = 2;
}
3 changes: 2 additions & 1 deletion src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::catalog::{
ColumnCatalog, ConnectionId, CreateType, DatabaseId, SchemaId, TableId, UserId,
};
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::PbSecretRef;
use risingwave_pb::stream_plan::PbSinkDesc;

use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType};
Expand Down Expand Up @@ -83,7 +84,7 @@ impl SinkDesc {
owner: UserId,
connection_id: Option<ConnectionId>,
dependent_relations: Vec<TableId>,
secret_ref: BTreeMap<String, u32>,
secret_ref: BTreeMap<String, PbSecretRef>,
) -> SinkCatalog {
SinkCatalog {
id: self.id,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::catalog::{
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{
PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
PbCreateType, PbSecretRef, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
};

use super::{
Expand Down Expand Up @@ -339,7 +339,7 @@ pub struct SinkCatalog {
pub create_type: CreateType,

/// The secret reference for the sink, mapping from property name to secret id.
pub secret_ref: BTreeMap<String, u32>,
pub secret_ref: BTreeMap<String, PbSecretRef>,
}

impl SinkCatalog {
Expand Down
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,12 +555,12 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
*id = *connection_rewrite.get(id).unwrap();
}
for secret_id in s.secret_ref.values_mut() {
*secret_id = *secret_rewrite.get(secret_id).unwrap();
secret_id.secret_id = *secret_rewrite.get(&secret_id.secret_id).unwrap();
}
object_dependencies.extend(s.secret_ref.values().map(|id| {
object_dependency::ActiveModel {
id: NotSet,
oid: Set(*id as _),
oid: Set(id.secret_id as _),
used_by: Set(s.id as _),
}
}));
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_connector::source::kafka::private_link::{
insert_privatelink_broker_rewrite_map, CONNECTION_NAME_KEY, PRIVATELINK_ENDPOINT_KEY,
};
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::PbSecretRef;
use risingwave_sqlparser::ast::{
CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement,
CreateSubscriptionStatement, SqlOption, Statement, Value,
Expand Down Expand Up @@ -119,7 +120,7 @@ impl WithOptions {
pub(crate) fn resolve_secret_in_with_options(
_with_options: &mut WithOptions,
_session: &SessionImpl,
) -> RwResult<BTreeMap<String, u32>> {
) -> RwResult<BTreeMap<String, PbSecretRef>> {
// todo: implement the function and take `resolve_privatelink_in_with_option` as reference

Ok(BTreeMap::new())
Expand Down
28 changes: 26 additions & 2 deletions src/meta/model_v2/migration/src/m20240525_090457_secret.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,22 @@ impl MigrationTrait for Migration {
)
.await?;

// Add a new column to the table
// Add a new column to the `sink` table
manager
.alter_table(
MigrationTable::alter()
.table(Sink::Table)
.add_column(ColumnDef::new(Sink::SecretRef).json_binary())
.add_column(ColumnDef::new(Sink::SecretRef).binary())
.to_owned(),
)
.await?;

// Add a new column to the `source` table
manager
.alter_table(
MigrationTable::alter()
.table(Source::Table)
.add_column(ColumnDef::new(Source::SecretRef).binary())
.to_owned(),
)
.await?;
Expand All @@ -60,6 +70,14 @@ impl MigrationTrait for Migration {
.to_owned(),
)
.await?;
manager
.alter_table(
MigrationTable::alter()
.table(Source::Table)
.drop_column(Source::SecretRef)
.to_owned(),
)
.await?;
Ok(())
}
}
Expand All @@ -77,3 +95,9 @@ enum Sink {
Table,
SecretRef,
}

#[derive(DeriveIden)]
enum Source {
Table,
SecretRef,
}
53 changes: 51 additions & 2 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::BTreeMap;

use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
use risingwave_pb::catalog::{PbCreateType, PbSecretRef, PbStreamJobStatus};
use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
use risingwave_pb::stream_plan::PbStreamNode;
use sea_orm::entity::prelude::*;
Expand Down Expand Up @@ -258,6 +258,55 @@ macro_rules! derive_array_from_blob {
};
}

macro_rules! derive_btreemap_from_blob {
($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
#[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
pub struct $struct_name(#[sea_orm] Vec<u8>);

#[derive(Clone, PartialEq, ::prost::Message)]
pub struct $field_type {
#[prost(btree_map = "string, message")]
inner: BTreeMap<$key_type, $value_type>,
}
impl Eq for $field_type {}

impl $struct_name {
pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
data.inner
}

fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
}
}

impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
Self::from_protobuf(value)
}
}

impl std::fmt::Debug for $struct_name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.to_protobuf().fmt(f)
}
}

impl Default for $struct_name {
fn default() -> Self {
Self(vec![])
}
}

impl sea_orm::sea_query::Nullable for $struct_name {
fn null() -> Value {
Value::Bytes(None)
}
}
};
}

pub(crate) use {derive_array_from_blob, derive_from_blob};

derive_from_json_struct!(I32Array, Vec<i32>);
Expand Down Expand Up @@ -286,7 +335,7 @@ impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
}
}

derive_from_json_struct!(SecretRef, BTreeMap<String, u32>);
derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);

derive_from_blob!(StreamNode, PbStreamNode);
derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub struct Model {
pub sink_from_name: String,
pub sink_format_desc: Option<SinkFormatDesc>,
pub target_table: Option<TableId>,
// `secret_ref` stores a json string, mapping from property name to secret id.
// `secret_ref` stores the mapping info mapping from property name to secret id and type.
pub secret_ref: Option<SecretRef>,
}

Expand Down
7 changes: 5 additions & 2 deletions src/meta/model_v2/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

use crate::{
ColumnCatalogArray, ConnectionId, I32Array, Property, SourceId, StreamSourceInfo, TableId,
WatermarkDescArray,
ColumnCatalogArray, ConnectionId, I32Array, Property, SecretRef, SourceId, StreamSourceInfo,
TableId, WatermarkDescArray,
};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
Expand All @@ -39,6 +39,8 @@ pub struct Model {
pub optional_associated_table_id: Option<TableId>,
pub connection_id: Option<ConnectionId>,
pub version: i64,
// `secret_ref` stores the mapping info mapping from property name to secret id and type.
pub secret_ref: Option<SecretRef>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -101,6 +103,7 @@ impl From<PbSource> for ActiveModel {
optional_associated_table_id: Set(optional_associated_table_id),
connection_id: Set(source.connection_id.map(|id| id as _)),
version: Set(source.version as _),
secret_ref: Set(None),
}
}
}
4 changes: 2 additions & 2 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ impl From<ObjectModel<source::Model>> for PbSource {

impl From<ObjectModel<sink::Model>> for PbSink {
fn from(value: ObjectModel<sink::Model>) -> Self {
let mut secret_ref_map: BTreeMap<String, u32> = BTreeMap::new();
let mut secret_ref_map = BTreeMap::new();
if let Some(secret_ref) = value.0.secret_ref {
secret_ref_map = secret_ref.into_inner();
secret_ref_map = secret_ref.to_protobuf();
}
Self {
id: value.0.sink_id as _,
Expand Down
2 changes: 2 additions & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
".plan_common.ExternalTableDesc",
".hummock.CompactTask",
".catalog.StreamSourceInfo",
".catalog.SecretRef",
".catalog.Source",
".catalog.Sink",
".catalog.View",
Expand Down Expand Up @@ -111,6 +112,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// The requirement is from Source node -> SourceCatalog -> WatermarkDesc -> expr
.type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]")
.type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]")
.type_attribute("catalog.SecretRef", "#[derive(Eq, Hash)]")
.type_attribute("catalog.IndexColumnProperties", "#[derive(Eq, Hash)]")
.type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]")
.type_attribute("data.DataType", "#[derive(Eq, Hash)]")
Expand Down

0 comments on commit d5e10d8

Please sign in to comment.