Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(secret): add a secret ref column to source #17269

Merged
merged 9 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;

// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, uint32> secret_ref = 16;
// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, SecretRef> secret_ref = 16;

Check failure on line 87 in proto/catalog.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "value" on message "SecretRefEntry" changed type from "uint32" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message Source {
Expand Down Expand Up @@ -180,8 +180,8 @@
// Whether it should use background ddl or block until backfill finishes.
CreateType create_type = 24;

// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, uint32> secret_ref = 25;
// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, SecretRef> secret_ref = 25;

Check failure on line 184 in proto/catalog.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "value" on message "SecretRefEntry" changed type from "uint32" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message Subscription {
Expand Down Expand Up @@ -450,3 +450,14 @@
uint32 owner = 5;
uint32 schema_id = 6;
}

message SecretRef {
enum RefAsType {
UNSPECIFIED = 0;
TEXT = 1;
// AS FILE
FILE = 2;
}
uint32 secret_id = 1;
RefAsType ref_as = 2;
}
3 changes: 2 additions & 1 deletion src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::catalog::{
ColumnCatalog, ConnectionId, CreateType, DatabaseId, SchemaId, TableId, UserId,
};
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::PbSecretRef;
use risingwave_pb::stream_plan::PbSinkDesc;

use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType};
Expand Down Expand Up @@ -83,7 +84,7 @@ impl SinkDesc {
owner: UserId,
connection_id: Option<ConnectionId>,
dependent_relations: Vec<TableId>,
secret_ref: BTreeMap<String, u32>,
secret_ref: BTreeMap<String, PbSecretRef>,
) -> SinkCatalog {
SinkCatalog {
id: self.id,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::catalog::{
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{
PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
PbCreateType, PbSecretRef, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
};

use super::{
Expand Down Expand Up @@ -339,7 +339,7 @@ pub struct SinkCatalog {
pub create_type: CreateType,

/// The secret reference for the sink, mapping from property name to secret id.
pub secret_ref: BTreeMap<String, u32>,
pub secret_ref: BTreeMap<String, PbSecretRef>,
}

impl SinkCatalog {
Expand Down
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,12 +555,12 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
*id = *connection_rewrite.get(id).unwrap();
}
for secret_id in s.secret_ref.values_mut() {
*secret_id = *secret_rewrite.get(secret_id).unwrap();
secret_id.secret_id = *secret_rewrite.get(&secret_id.secret_id).unwrap();
}
object_dependencies.extend(s.secret_ref.values().map(|id| {
object_dependency::ActiveModel {
id: NotSet,
oid: Set(*id as _),
oid: Set(id.secret_id as _),
used_by: Set(s.id as _),
}
}));
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_connector::source::kafka::private_link::{
insert_privatelink_broker_rewrite_map, CONNECTION_NAME_KEY, PRIVATELINK_ENDPOINT_KEY,
};
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::PbSecretRef;
use risingwave_sqlparser::ast::{
CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement,
CreateSubscriptionStatement, SqlOption, Statement, Value,
Expand Down Expand Up @@ -119,7 +120,7 @@ impl WithOptions {
pub(crate) fn resolve_secret_in_with_options(
_with_options: &mut WithOptions,
_session: &SessionImpl,
) -> RwResult<BTreeMap<String, u32>> {
) -> RwResult<BTreeMap<String, PbSecretRef>> {
// todo: implement the function and take `resolve_privatelink_in_with_option` as reference

Ok(BTreeMap::new())
Expand Down
28 changes: 26 additions & 2 deletions src/meta/model_v2/migration/src/m20240525_090457_secret.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should not modify an existing migration, just create a new one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yezizp2012 said it's ok since it's not released

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are more to change. secret_ref should not be map<string, u32>. We need more than a secret id like if it's an as file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are more to change. secret_ref should not be map<string, u32>.

Here is a little diff with my assumption, ref secret <secret-name> means the source/sink relies on the secret binary. Seeing it as a string, or a file, is something more related to the exec itself rather than spec in catalog.
Just share my thoughts, it is okay you impl in your way.

Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,22 @@ impl MigrationTrait for Migration {
)
.await?;

// Add a new column to the table
// Add a new column to the `sink` table
manager
.alter_table(
MigrationTable::alter()
.table(Sink::Table)
.add_column(ColumnDef::new(Sink::SecretRef).json_binary())
.add_column(ColumnDef::new(Sink::SecretRef).binary())
.to_owned(),
)
.await?;

// Add a new column to the `source` table
manager
.alter_table(
MigrationTable::alter()
.table(Source::Table)
.add_column(ColumnDef::new(Source::SecretRef).binary())
.to_owned(),
)
.await?;
Expand All @@ -60,6 +70,14 @@ impl MigrationTrait for Migration {
.to_owned(),
)
.await?;
manager
.alter_table(
MigrationTable::alter()
.table(Source::Table)
.drop_column(Source::SecretRef)
.to_owned(),
)
.await?;
Ok(())
}
}
Expand All @@ -77,3 +95,9 @@ enum Sink {
Table,
SecretRef,
}

#[derive(DeriveIden)]
enum Source {
Table,
SecretRef,
}
53 changes: 51 additions & 2 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::BTreeMap;

use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
use risingwave_pb::catalog::{PbCreateType, PbSecretRef, PbStreamJobStatus};
use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
use risingwave_pb::stream_plan::PbStreamNode;
use sea_orm::entity::prelude::*;
Expand Down Expand Up @@ -258,6 +258,55 @@ macro_rules! derive_array_from_blob {
};
}

macro_rules! derive_btreemap_from_blob {
($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
#[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
pub struct $struct_name(#[sea_orm] Vec<u8>);

#[derive(Clone, PartialEq, ::prost::Message)]
pub struct $field_type {
#[prost(btree_map = "string, message")]
inner: BTreeMap<$key_type, $value_type>,
}
impl Eq for $field_type {}

impl $struct_name {
pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
data.inner
}

fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
}
}

impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
Self::from_protobuf(value)
}
}

impl std::fmt::Debug for $struct_name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.to_protobuf().fmt(f)
}
}

impl Default for $struct_name {
fn default() -> Self {
Self(vec![])
}
}

impl sea_orm::sea_query::Nullable for $struct_name {
fn null() -> Value {
Value::Bytes(None)
}
}
};
}

pub(crate) use {derive_array_from_blob, derive_from_blob};

derive_from_json_struct!(I32Array, Vec<i32>);
Expand Down Expand Up @@ -286,7 +335,7 @@ impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
}
}

derive_from_json_struct!(SecretRef, BTreeMap<String, u32>);
derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);

derive_from_blob!(StreamNode, PbStreamNode);
derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub struct Model {
pub sink_from_name: String,
pub sink_format_desc: Option<SinkFormatDesc>,
pub target_table: Option<TableId>,
// `secret_ref` stores a json string, mapping from property name to secret id.
// `secret_ref` stores the mapping info mapping from property name to secret id and type.
pub secret_ref: Option<SecretRef>,
}

Expand Down
7 changes: 5 additions & 2 deletions src/meta/model_v2/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

use crate::{
ColumnCatalogArray, ConnectionId, I32Array, Property, SourceId, StreamSourceInfo, TableId,
WatermarkDescArray,
ColumnCatalogArray, ConnectionId, I32Array, Property, SecretRef, SourceId, StreamSourceInfo,
TableId, WatermarkDescArray,
};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
Expand All @@ -39,6 +39,8 @@ pub struct Model {
pub optional_associated_table_id: Option<TableId>,
pub connection_id: Option<ConnectionId>,
pub version: i64,
// `secret_ref` stores the mapping info mapping from property name to secret id and type.
pub secret_ref: Option<SecretRef>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -101,6 +103,7 @@ impl From<PbSource> for ActiveModel {
optional_associated_table_id: Set(optional_associated_table_id),
connection_id: Set(source.connection_id.map(|id| id as _)),
version: Set(source.version as _),
secret_ref: Set(None),
}
}
}
4 changes: 2 additions & 2 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ impl From<ObjectModel<source::Model>> for PbSource {

impl From<ObjectModel<sink::Model>> for PbSink {
fn from(value: ObjectModel<sink::Model>) -> Self {
let mut secret_ref_map: BTreeMap<String, u32> = BTreeMap::new();
let mut secret_ref_map = BTreeMap::new();
if let Some(secret_ref) = value.0.secret_ref {
secret_ref_map = secret_ref.into_inner();
secret_ref_map = secret_ref.to_protobuf();
}
Self {
id: value.0.sink_id as _,
Expand Down
2 changes: 2 additions & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
".plan_common.ExternalTableDesc",
".hummock.CompactTask",
".catalog.StreamSourceInfo",
".catalog.SecretRef",
".catalog.Source",
".catalog.Sink",
".catalog.View",
Expand Down Expand Up @@ -111,6 +112,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// The requirement is from Source node -> SourceCatalog -> WatermarkDesc -> expr
.type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]")
.type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]")
.type_attribute("catalog.SecretRef", "#[derive(Eq, Hash)]")
.type_attribute("catalog.IndexColumnProperties", "#[derive(Eq, Hash)]")
.type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]")
.type_attribute("data.DataType", "#[derive(Eq, Hash)]")
Expand Down
Loading