diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 20f0ef412824..cc5e670fe078 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -121,6 +121,24 @@ steps: timeout_in_minutes: 8 retry: *auto-retry + - label: "meta backup test" + key: "e2e-meta-backup-test" + command: "ci/scripts/run-meta-backup-test.sh -p ci-dev -m ci-3streaming-2serving-3fe" + if: | + build.pull_request.labels includes "ci/run-e2e-meta-backup-test" + depends_on: + - "build" + - "build-other" + - "docslt" + plugins: + - docker-compose#v5.1.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 45 + retry: *auto-retry + - label: "end-to-end test (parallel)" command: "ci/scripts/e2e-test-parallel.sh -p ci-dev" if: | diff --git a/proto/backup_service.proto b/proto/backup_service.proto index 75420149042d..48fe46ed7eac 100644 --- a/proto/backup_service.proto +++ b/proto/backup_service.proto @@ -49,6 +49,7 @@ message MetaSnapshotMetadata { uint64 safe_epoch = 4; optional uint32 format_version = 5; optional string remarks = 6; + optional string rw_version = 7; } service BackupService { diff --git a/src/meta/model_v2/src/secret.rs b/src/meta/model_v2/src/secret.rs index af3590dd0de5..0d122267bb4b 100644 --- a/src/meta/model_v2/src/secret.rs +++ b/src/meta/model_v2/src/secret.rs @@ -15,8 +15,9 @@ use risingwave_pb::catalog::PbSecret; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "secret")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs b/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs index c2bafcd071c6..4a81ae9d6ad0 100644 --- a/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs +++ b/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs @@ -31,6 +31,25 @@ fn map_db_err(e: DbErr) -> BackupError { BackupError::MetaStorage(e.into()) } +macro_rules! define_set_metadata { + ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => { + pub async fn set_metadata( + metadata: &mut MetadataV2, + txn: &sea_orm::DatabaseTransaction, + ) -> BackupResult<()> { + $( + metadata.$name = $mod_path::$mod_name::Entity::find() + .all(txn) + .await + .map_err(map_db_err)?; + )* + Ok(()) + } + }; +} + +risingwave_backup::for_all_metadata_models_v2!(define_set_metadata); + pub struct MetaSnapshotV2Builder { snapshot: MetaSnapshotV2, meta_store: SqlMetaStore, @@ -91,151 +110,14 @@ impl MetaSnapshotV2Builder { } redo_state }; - let version_stats = model_v2::prelude::HummockVersionStats::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let compaction_configs = model_v2::prelude::CompactionConfig::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let actors = model_v2::prelude::Actor::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let clusters = model_v2::prelude::Cluster::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let actor_dispatchers = model_v2::prelude::ActorDispatcher::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let catalog_versions = model_v2::prelude::CatalogVersion::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let connections = model_v2::prelude::Connection::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let databases = model_v2::prelude::Database::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let fragments = model_v2::prelude::Fragment::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let functions = model_v2::prelude::Function::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let indexes = model_v2::prelude::Index::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let objects = model_v2::prelude::Object::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let object_dependencies = model_v2::prelude::ObjectDependency::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let schemas = model_v2::prelude::Schema::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let sinks = model_v2::prelude::Sink::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let sources = model_v2::prelude::Source::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let streaming_jobs = model_v2::prelude::StreamingJob::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let subscriptions = model_v2::prelude::Subscription::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let system_parameters = model_v2::prelude::SystemParameter::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let tables = model_v2::prelude::Table::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let users = model_v2::prelude::User::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let user_privileges = model_v2::prelude::UserPrivilege::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let views = model_v2::prelude::View::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let workers = model_v2::prelude::Worker::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let worker_properties = model_v2::prelude::WorkerProperty::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let hummock_sequences = model_v2::prelude::HummockSequence::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let seaql_migrations = model_v2::serde_seaql_migration::Entity::find() - .all(&txn) - .await - .map_err(map_db_err)?; - let session_parameters = model_v2::prelude::SessionParameter::find() - .all(&txn) - .await - .map_err(map_db_err)?; - - txn.commit().await.map_err(map_db_err)?; - self.snapshot.metadata = MetadataV2 { - seaql_migrations, + let mut metadata = MetadataV2 { hummock_version, - version_stats, - compaction_configs, - actors, - clusters, - actor_dispatchers, - catalog_versions, - connections, - databases, - fragments, - functions, - indexes, - objects, - object_dependencies, - schemas, - sinks, - sources, - streaming_jobs, - subscriptions, - system_parameters, - tables, - users, - user_privileges, - views, - workers, - worker_properties, - hummock_sequences, - session_parameters, + ..Default::default() }; + set_metadata(&mut metadata, &txn).await?; + + txn.commit().await.map_err(map_db_err)?; + self.snapshot.metadata = metadata; Ok(()) } diff --git a/src/meta/src/backup_restore/restore_impl/v2.rs b/src/meta/src/backup_restore/restore_impl/v2.rs index cccb5c9b641d..13492c56316a 100644 --- a/src/meta/src/backup_restore/restore_impl/v2.rs +++ b/src/meta/src/backup_restore/restore_impl/v2.rs @@ -93,40 +93,28 @@ impl WriterModelV2ToMetaStoreV2 { } } +macro_rules! define_write_model_v2_to_meta_store_v2 { + ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => { + async fn write_model_v2_to_meta_store_v2( + metadata: &risingwave_backup::meta_snapshot_v2::MetadataV2, + db: &sea_orm::DatabaseConnection, + ) -> BackupResult<()> { + $( + insert_models(metadata.$name.clone(), db).await?; + )* + Ok(()) + } + }; +} + +risingwave_backup::for_all_metadata_models_v2!(define_write_model_v2_to_meta_store_v2); + #[async_trait::async_trait] impl Writer for WriterModelV2ToMetaStoreV2 { async fn write(&self, target_snapshot: MetaSnapshot) -> BackupResult<()> { let metadata = target_snapshot.metadata; let db = &self.meta_store.conn; - insert_models(metadata.seaql_migrations.clone(), db).await?; - insert_models(metadata.clusters.clone(), db).await?; - insert_models(metadata.version_stats.clone(), db).await?; - insert_models(metadata.compaction_configs.clone(), db).await?; - insert_models(metadata.hummock_sequences.clone(), db).await?; - insert_models(metadata.workers.clone(), db).await?; - insert_models(metadata.worker_properties.clone(), db).await?; - insert_models(metadata.users.clone(), db).await?; - insert_models(metadata.user_privileges.clone(), db).await?; - insert_models(metadata.objects.clone(), db).await?; - insert_models(metadata.object_dependencies.clone(), db).await?; - insert_models(metadata.databases.clone(), db).await?; - insert_models(metadata.schemas.clone(), db).await?; - insert_models(metadata.streaming_jobs.clone(), db).await?; - insert_models(metadata.fragments.clone(), db).await?; - insert_models(metadata.actors.clone(), db).await?; - insert_models(metadata.actor_dispatchers.clone(), db).await?; - insert_models(metadata.connections.clone(), db).await?; - insert_models(metadata.sources.clone(), db).await?; - insert_models(metadata.tables.clone(), db).await?; - insert_models(metadata.sinks.clone(), db).await?; - insert_models(metadata.views.clone(), db).await?; - insert_models(metadata.indexes.clone(), db).await?; - insert_models(metadata.functions.clone(), db).await?; - insert_models(metadata.system_parameters.clone(), db).await?; - insert_models(metadata.catalog_versions.clone(), db).await?; - insert_models(metadata.subscriptions.clone(), db).await?; - insert_models(metadata.session_parameters.clone(), db).await?; - + write_model_v2_to_meta_store_v2(&metadata, db).await?; // update_auto_inc must be called last. update_auto_inc(&metadata, db).await?; Ok(()) diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 56c3ed551a28..ed8dccf8d1e4 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -36,6 +36,7 @@ use std::collections::HashSet; use std::hash::Hasher; use itertools::Itertools; +use risingwave_common::RW_VERSION; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{HummockSstableObjectId, HummockVersionId}; use risingwave_pb::backup_service::{PbMetaSnapshotManifest, PbMetaSnapshotMetadata}; @@ -57,6 +58,7 @@ pub struct MetaSnapshotMetadata { #[serde(default)] pub format_version: u32, pub remarks: Option, + pub rw_version: Option, } impl MetaSnapshotMetadata { @@ -74,6 +76,7 @@ impl MetaSnapshotMetadata { safe_epoch: v.visible_table_safe_epoch(), format_version, remarks, + rw_version: Some(RW_VERSION.to_owned()), } } } @@ -112,6 +115,7 @@ impl From<&MetaSnapshotMetadata> for PbMetaSnapshotMetadata { safe_epoch: m.safe_epoch, format_version: Some(m.format_version), remarks: m.remarks.clone(), + rw_version: m.rw_version.clone(), } } } diff --git a/src/storage/backup/src/meta_snapshot_v2.rs b/src/storage/backup/src/meta_snapshot_v2.rs index feb5f12540d0..bec07a80cf19 100644 --- a/src/storage/backup/src/meta_snapshot_v2.rs +++ b/src/storage/backup/src/meta_snapshot_v2.rs @@ -16,7 +16,6 @@ use std::fmt::{Display, Formatter}; use bytes::{Buf, BufMut}; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_meta_model_v2 as model_v2; use serde::{Deserialize, Serialize}; use crate::meta_snapshot::{MetaSnapshot, Metadata}; @@ -29,39 +28,100 @@ impl From for BackupError { } } -#[derive(Default)] -pub struct MetadataV2 { - pub seaql_migrations: Vec, - pub hummock_version: HummockVersion, - pub version_stats: Vec, - pub compaction_configs: Vec, - pub actors: Vec, - pub clusters: Vec, - pub actor_dispatchers: Vec, - pub catalog_versions: Vec, - pub connections: Vec, - pub databases: Vec, - pub fragments: Vec, - pub functions: Vec, - pub indexes: Vec, - pub objects: Vec, - pub object_dependencies: Vec, - pub schemas: Vec, - pub sinks: Vec, - pub sources: Vec, - pub streaming_jobs: Vec, - pub subscriptions: Vec, - pub system_parameters: Vec, - pub tables: Vec, - pub users: Vec, - pub user_privileges: Vec, - pub views: Vec, - pub workers: Vec, - pub worker_properties: Vec, - pub hummock_sequences: Vec, - pub session_parameters: Vec, +/// Add new item in the end. Do not change the order. +#[macro_export] +macro_rules! for_all_metadata_models_v2 { + ($macro:ident) => { + $macro! { + {seaql_migrations, risingwave_meta_model_v2::serde_seaql_migration}, + {version_stats, risingwave_meta_model_v2::hummock_version_stats}, + {compaction_configs, risingwave_meta_model_v2::compaction_config}, + {actors, risingwave_meta_model_v2::actor}, + {clusters, risingwave_meta_model_v2::cluster}, + {actor_dispatchers, risingwave_meta_model_v2::actor_dispatcher}, + {catalog_versions, risingwave_meta_model_v2::catalog_version}, + {connections, risingwave_meta_model_v2::connection}, + {databases, risingwave_meta_model_v2::database}, + {fragments, risingwave_meta_model_v2::fragment}, + {functions, risingwave_meta_model_v2::function}, + {indexes, risingwave_meta_model_v2::index}, + {objects, risingwave_meta_model_v2::object}, + {object_dependencies, risingwave_meta_model_v2::object_dependency}, + {schemas, risingwave_meta_model_v2::schema}, + {sinks, risingwave_meta_model_v2::sink}, + {sources, risingwave_meta_model_v2::source}, + {streaming_jobs, risingwave_meta_model_v2::streaming_job}, + {subscriptions, risingwave_meta_model_v2::subscription}, + {system_parameters, risingwave_meta_model_v2::system_parameter}, + {tables, risingwave_meta_model_v2::table}, + {users, risingwave_meta_model_v2::user}, + {user_privileges, risingwave_meta_model_v2::user_privilege}, + {views, risingwave_meta_model_v2::view}, + {workers, risingwave_meta_model_v2::worker}, + {worker_properties, risingwave_meta_model_v2::worker_property}, + {hummock_sequences, risingwave_meta_model_v2::hummock_sequence}, + {session_parameters, risingwave_meta_model_v2::session_parameter}, + {secrets, risingwave_meta_model_v2::secret} + } + }; } +macro_rules! define_metadata_v2 { + ($({ $name:ident, $mod_path:ident::$mod_name:ident }),*) => { + #[derive(Default)] + pub struct MetadataV2 { + pub hummock_version: HummockVersion, + $( + pub $name: Vec<$mod_path::$mod_name::Model>, + )* + } + }; +} + +for_all_metadata_models_v2!(define_metadata_v2); + +macro_rules! define_encode_metadata { + ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => { + fn encode_metadata( + metadata: &MetadataV2, + buf: &mut Vec, + ) -> BackupResult<()> { + let mut _idx = 0; + $( + if _idx == 1 { + put_1(buf, &metadata.hummock_version.to_protobuf())?; + } + put_n(buf, &metadata.$name)?; + _idx += 1; + )* + Ok(()) + } + }; +} + +for_all_metadata_models_v2!(define_encode_metadata); + +macro_rules! define_decode_metadata { + ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => { + fn decode_metadata( + metadata: &mut MetadataV2, + mut buf: &[u8], + ) -> BackupResult<()> { + let mut _idx = 0; + $( + if _idx == 1 { + metadata.hummock_version = HummockVersion::from_persisted_protobuf(&get_1(&mut buf)?); + } + metadata.$name = get_n(&mut buf)?; + _idx += 1; + )* + Ok(()) + } + }; +} + +for_all_metadata_models_v2!(define_decode_metadata); + impl Display for MetadataV2 { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { writeln!(f, "clusters: {:#?}", self.clusters)?; @@ -77,102 +137,16 @@ impl Display for MetadataV2 { impl Metadata for MetadataV2 { fn encode_to(&self, buf: &mut Vec) -> BackupResult<()> { - put_n(buf, &self.seaql_migrations)?; - put_1(buf, &self.hummock_version.to_protobuf())?; - put_n(buf, &self.version_stats)?; - put_n(buf, &self.compaction_configs)?; - put_n(buf, &self.actors)?; - put_n(buf, &self.clusters)?; - put_n(buf, &self.actor_dispatchers)?; - put_n(buf, &self.catalog_versions)?; - put_n(buf, &self.connections)?; - put_n(buf, &self.databases)?; - put_n(buf, &self.fragments)?; - put_n(buf, &self.functions)?; - put_n(buf, &self.indexes)?; - put_n(buf, &self.objects)?; - put_n(buf, &self.object_dependencies)?; - put_n(buf, &self.schemas)?; - put_n(buf, &self.sinks)?; - put_n(buf, &self.sources)?; - put_n(buf, &self.streaming_jobs)?; - put_n(buf, &self.subscriptions)?; - put_n(buf, &self.system_parameters)?; - put_n(buf, &self.tables)?; - put_n(buf, &self.users)?; - put_n(buf, &self.user_privileges)?; - put_n(buf, &self.views)?; - put_n(buf, &self.workers)?; - put_n(buf, &self.worker_properties)?; - put_n(buf, &self.hummock_sequences)?; - put_n(buf, &self.session_parameters)?; - Ok(()) + encode_metadata(self, buf) } - fn decode(mut buf: &[u8]) -> BackupResult + fn decode(buf: &[u8]) -> BackupResult where Self: Sized, { - let seaql_migrations = get_n(&mut buf)?; - let pb_hummock_version = get_1(&mut buf)?; - let version_stats = get_n(&mut buf)?; - let compaction_configs = get_n(&mut buf)?; - let actors = get_n(&mut buf)?; - let clusters = get_n(&mut buf)?; - let actor_dispatchers = get_n(&mut buf)?; - let catalog_versions = get_n(&mut buf)?; - let connections = get_n(&mut buf)?; - let databases = get_n(&mut buf)?; - let fragments = get_n(&mut buf)?; - let functions = get_n(&mut buf)?; - let indexes = get_n(&mut buf)?; - let objects = get_n(&mut buf)?; - let object_dependencies = get_n(&mut buf)?; - let schemas = get_n(&mut buf)?; - let sinks = get_n(&mut buf)?; - let sources = get_n(&mut buf)?; - let streaming_jobs = get_n(&mut buf)?; - let subscriptions = get_n(&mut buf)?; - let system_parameters = get_n(&mut buf)?; - let tables = get_n(&mut buf)?; - let users = get_n(&mut buf)?; - let user_privileges = get_n(&mut buf)?; - let views = get_n(&mut buf)?; - let workers = get_n(&mut buf)?; - let worker_properties = get_n(&mut buf)?; - let hummock_sequences = get_n(&mut buf)?; - let session_parameters = get_n(&mut buf)?; - Ok(Self { - seaql_migrations, - hummock_version: HummockVersion::from_persisted_protobuf(&pb_hummock_version), - version_stats, - compaction_configs, - actors, - clusters, - actor_dispatchers, - catalog_versions, - connections, - databases, - fragments, - functions, - indexes, - objects, - object_dependencies, - schemas, - sinks, - sources, - streaming_jobs, - subscriptions, - system_parameters, - tables, - users, - user_privileges, - views, - workers, - worker_properties, - hummock_sequences, - session_parameters, - }) + let mut metadata = Self::default(); + decode_metadata(&mut metadata, buf)?; + Ok(metadata) } fn hummock_version_ref(&self) -> &HummockVersion {