Skip to content

Commit

Permalink
fix(meta): include secrets in metadata backup (#17225)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Jun 13, 2024
1 parent 51fccb7 commit 3e1ee0d
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 296 deletions.
18 changes: 18 additions & 0 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ steps:
timeout_in_minutes: 8
retry: *auto-retry

- label: "meta backup test"
key: "e2e-meta-backup-test"
command: "ci/scripts/run-meta-backup-test.sh -p ci-dev -m ci-3streaming-2serving-3fe"
if: |
build.pull_request.labels includes "ci/run-e2e-meta-backup-test"
depends_on:
- "build"
- "build-other"
- "docslt"
plugins:
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 45
retry: *auto-retry

- label: "end-to-end test (parallel)"
command: "ci/scripts/e2e-test-parallel.sh -p ci-dev"
if: |
Expand Down
1 change: 1 addition & 0 deletions proto/backup_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message MetaSnapshotMetadata {
uint64 safe_epoch = 4;
optional uint32 format_version = 5;
optional string remarks = 6;
optional string rw_version = 7;
}

service BackupService {
Expand Down
3 changes: 2 additions & 1 deletion src/meta/model_v2/src/secret.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
use risingwave_pb::catalog::PbSecret;
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "secret")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
Expand Down
168 changes: 25 additions & 143 deletions src/meta/src/backup_restore/meta_snapshot_builder_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,25 @@ fn map_db_err(e: DbErr) -> BackupError {
BackupError::MetaStorage(e.into())
}

macro_rules! define_set_metadata {
($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
pub async fn set_metadata(
metadata: &mut MetadataV2,
txn: &sea_orm::DatabaseTransaction,
) -> BackupResult<()> {
$(
metadata.$name = $mod_path::$mod_name::Entity::find()
.all(txn)
.await
.map_err(map_db_err)?;
)*
Ok(())
}
};
}

risingwave_backup::for_all_metadata_models_v2!(define_set_metadata);

pub struct MetaSnapshotV2Builder {
snapshot: MetaSnapshotV2,
meta_store: SqlMetaStore,
Expand Down Expand Up @@ -91,151 +110,14 @@ impl MetaSnapshotV2Builder {
}
redo_state
};
let version_stats = model_v2::prelude::HummockVersionStats::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let compaction_configs = model_v2::prelude::CompactionConfig::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let actors = model_v2::prelude::Actor::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let clusters = model_v2::prelude::Cluster::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let actor_dispatchers = model_v2::prelude::ActorDispatcher::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let catalog_versions = model_v2::prelude::CatalogVersion::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let connections = model_v2::prelude::Connection::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let databases = model_v2::prelude::Database::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let fragments = model_v2::prelude::Fragment::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let functions = model_v2::prelude::Function::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let indexes = model_v2::prelude::Index::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let objects = model_v2::prelude::Object::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let object_dependencies = model_v2::prelude::ObjectDependency::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let schemas = model_v2::prelude::Schema::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let sinks = model_v2::prelude::Sink::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let sources = model_v2::prelude::Source::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let streaming_jobs = model_v2::prelude::StreamingJob::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let subscriptions = model_v2::prelude::Subscription::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let system_parameters = model_v2::prelude::SystemParameter::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let tables = model_v2::prelude::Table::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let users = model_v2::prelude::User::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let user_privileges = model_v2::prelude::UserPrivilege::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let views = model_v2::prelude::View::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let workers = model_v2::prelude::Worker::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let worker_properties = model_v2::prelude::WorkerProperty::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let hummock_sequences = model_v2::prelude::HummockSequence::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let seaql_migrations = model_v2::serde_seaql_migration::Entity::find()
.all(&txn)
.await
.map_err(map_db_err)?;
let session_parameters = model_v2::prelude::SessionParameter::find()
.all(&txn)
.await
.map_err(map_db_err)?;

txn.commit().await.map_err(map_db_err)?;
self.snapshot.metadata = MetadataV2 {
seaql_migrations,
let mut metadata = MetadataV2 {
hummock_version,
version_stats,
compaction_configs,
actors,
clusters,
actor_dispatchers,
catalog_versions,
connections,
databases,
fragments,
functions,
indexes,
objects,
object_dependencies,
schemas,
sinks,
sources,
streaming_jobs,
subscriptions,
system_parameters,
tables,
users,
user_privileges,
views,
workers,
worker_properties,
hummock_sequences,
session_parameters,
..Default::default()
};
set_metadata(&mut metadata, &txn).await?;

txn.commit().await.map_err(map_db_err)?;
self.snapshot.metadata = metadata;
Ok(())
}

Expand Down
46 changes: 17 additions & 29 deletions src/meta/src/backup_restore/restore_impl/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,40 +93,28 @@ impl WriterModelV2ToMetaStoreV2 {
}
}

macro_rules! define_write_model_v2_to_meta_store_v2 {
($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
async fn write_model_v2_to_meta_store_v2(
metadata: &risingwave_backup::meta_snapshot_v2::MetadataV2,
db: &sea_orm::DatabaseConnection,
) -> BackupResult<()> {
$(
insert_models(metadata.$name.clone(), db).await?;
)*
Ok(())
}
};
}

risingwave_backup::for_all_metadata_models_v2!(define_write_model_v2_to_meta_store_v2);

#[async_trait::async_trait]
impl Writer<MetadataV2> for WriterModelV2ToMetaStoreV2 {
async fn write(&self, target_snapshot: MetaSnapshot<MetadataV2>) -> BackupResult<()> {
let metadata = target_snapshot.metadata;
let db = &self.meta_store.conn;
insert_models(metadata.seaql_migrations.clone(), db).await?;
insert_models(metadata.clusters.clone(), db).await?;
insert_models(metadata.version_stats.clone(), db).await?;
insert_models(metadata.compaction_configs.clone(), db).await?;
insert_models(metadata.hummock_sequences.clone(), db).await?;
insert_models(metadata.workers.clone(), db).await?;
insert_models(metadata.worker_properties.clone(), db).await?;
insert_models(metadata.users.clone(), db).await?;
insert_models(metadata.user_privileges.clone(), db).await?;
insert_models(metadata.objects.clone(), db).await?;
insert_models(metadata.object_dependencies.clone(), db).await?;
insert_models(metadata.databases.clone(), db).await?;
insert_models(metadata.schemas.clone(), db).await?;
insert_models(metadata.streaming_jobs.clone(), db).await?;
insert_models(metadata.fragments.clone(), db).await?;
insert_models(metadata.actors.clone(), db).await?;
insert_models(metadata.actor_dispatchers.clone(), db).await?;
insert_models(metadata.connections.clone(), db).await?;
insert_models(metadata.sources.clone(), db).await?;
insert_models(metadata.tables.clone(), db).await?;
insert_models(metadata.sinks.clone(), db).await?;
insert_models(metadata.views.clone(), db).await?;
insert_models(metadata.indexes.clone(), db).await?;
insert_models(metadata.functions.clone(), db).await?;
insert_models(metadata.system_parameters.clone(), db).await?;
insert_models(metadata.catalog_versions.clone(), db).await?;
insert_models(metadata.subscriptions.clone(), db).await?;
insert_models(metadata.session_parameters.clone(), db).await?;

write_model_v2_to_meta_store_v2(&metadata, db).await?;
// update_auto_inc must be called last.
update_auto_inc(&metadata, db).await?;
Ok(())
Expand Down
4 changes: 4 additions & 0 deletions src/storage/backup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use std::collections::HashSet;
use std::hash::Hasher;

use itertools::Itertools;
use risingwave_common::RW_VERSION;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{HummockSstableObjectId, HummockVersionId};
use risingwave_pb::backup_service::{PbMetaSnapshotManifest, PbMetaSnapshotMetadata};
Expand All @@ -57,6 +58,7 @@ pub struct MetaSnapshotMetadata {
#[serde(default)]
pub format_version: u32,
pub remarks: Option<String>,
pub rw_version: Option<String>,
}

impl MetaSnapshotMetadata {
Expand All @@ -74,6 +76,7 @@ impl MetaSnapshotMetadata {
safe_epoch: v.visible_table_safe_epoch(),
format_version,
remarks,
rw_version: Some(RW_VERSION.to_owned()),
}
}
}
Expand Down Expand Up @@ -112,6 +115,7 @@ impl From<&MetaSnapshotMetadata> for PbMetaSnapshotMetadata {
safe_epoch: m.safe_epoch,
format_version: Some(m.format_version),
remarks: m.remarks.clone(),
rw_version: m.rw_version.clone(),
}
}
}
Expand Down
Loading

0 comments on commit 3e1ee0d

Please sign in to comment.