diff --git a/src/storage-types/src/dyncfgs.rs b/src/storage-types/src/dyncfgs.rs index 9b2fc851d6033..85cbc916f2135 100644 --- a/src/storage-types/src/dyncfgs.rs +++ b/src/storage-types/src/dyncfgs.rs @@ -139,6 +139,13 @@ pub const PG_OFFSET_KNOWN_INTERVAL: Config = Config::new( "Interval to fetch `offset_known`, from `pg_current_wal_lsn`", ); +/// Interval to re-validate the schemas of ingested tables. +pub const PG_SCHEMA_VALIDATION_INTERVAL: Config = Config::new( + "pg_schema_validation_interval", + Duration::from_secs(15), + "Interval to re-validate the schemas of ingested tables.", +); + // Networking /// Whether or not to enforce that external connection addresses are global @@ -236,6 +243,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&MYSQL_OFFSET_KNOWN_INTERVAL) .add(&PG_FETCH_SLOT_RESUME_LSN_INTERVAL) .add(&PG_OFFSET_KNOWN_INTERVAL) + .add(&PG_SCHEMA_VALIDATION_INTERVAL) .add(&ENFORCE_EXTERNAL_ADDRESSES) .add(&STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING) .add(&STORAGE_ROCKSDB_USE_MERGE_OPERATOR) diff --git a/src/storage/src/source/postgres/replication.rs b/src/storage/src/source/postgres/replication.rs index 9d7d5a9fc9108..0346c27619372 100644 --- a/src/storage/src/source/postgres/replication.rs +++ b/src/storage/src/source/postgres/replication.rs @@ -76,6 +76,7 @@ use std::rc::Rc; use std::str::FromStr; use std::sync::Arc; use std::sync::LazyLock; +use std::time::Instant; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use bytes::Bytes; @@ -86,10 +87,10 @@ use mz_ore::collections::HashSet; use mz_ore::future::InTask; use mz_ore::iter::IteratorExt; use mz_postgres_util::tunnel::PostgresFlavor; +use mz_postgres_util::PostgresError; use mz_postgres_util::{simple_query_opt, Client}; use mz_repr::{Datum, DatumVec, Diff, Row}; use mz_sql_parser::ast::{display::AstDisplay, Ident}; -use mz_ssh_util::tunnel_manager::SshTunnelManager; use mz_storage_types::errors::DataflowError; use mz_storage_types::sources::SourceTimestamp; use mz_storage_types::sources::{MzOffset, PostgresSourceConnection}; @@ -234,6 +235,7 @@ pub(crate) fn render>( &config.config.connection_context.ssh_tunnel_manager, ) .await?; + let metadata_client = Arc::new(metadata_client); while let Some(_) = slot_ready_input.next().await { // Wait for the slot to be created @@ -241,7 +243,7 @@ pub(crate) fn render>( tracing::info!(%id, "ensuring replication slot {slot} exists"); super::ensure_replication_slot(&replication_client, slot).await?; let slot_metadata = super::fetch_slot_metadata( - &metadata_client, + &*metadata_client, slot, mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL .get(config.config.config_set()), @@ -366,7 +368,7 @@ pub(crate) fn render>( let stream_result = raw_stream( &config, replication_client, - metadata_client, + Arc::clone(&metadata_client), &connection.publication_details.slot, &connection.publication_details.timeline_id, &connection.publication, @@ -417,6 +419,7 @@ pub(crate) fn render>( // compatible with `columnation`, to Vec data that is. let mut col_temp: Vec> = vec![]; let mut row_temp = vec![]; + let mut last_schema_validation = Instant::now(); while let Some(event) = stream.as_mut().next().await { use LogicalReplicationMessage::*; use ReplicationMessage::*; @@ -427,10 +430,9 @@ pub(crate) fn render>( let mut tx = pin!(extract_transaction( stream.by_ref(), + &*metadata_client, commit_lsn, &table_info, - &connection_config, - &config.config.connection_context.ssh_tunnel_manager, &metrics, &connection.publication, &mut errored @@ -492,12 +494,75 @@ pub(crate) fn render>( _ => return Err(TransientError::BareTransactionEvent), }, Ok(PrimaryKeepAlive(keepalive)) => { - trace!( - %id, - "timely-{worker_id} received \ - keepalive lsn={}", + trace!( %id, + "timely-{worker_id} received keepalive lsn={}", keepalive.wal_end() ); + let validation_interval = mz_storage_types::dyncfgs::PG_SCHEMA_VALIDATION_INTERVAL.get(config.config.config_set()); + if last_schema_validation.elapsed() > validation_interval { + trace!(%id, "timely-{worker_id} validating schemas"); + let upstream_info = { + match mz_postgres_util::publication_info(&*metadata_client, &connection.publication) + .await + { + Ok(info) => info.into_iter().map(|t| (t.oid, t)).collect(), + // If the replication stream cannot be obtained in a definite way there is + // nothing else to do. These errors are not retractable. + Err(PostgresError::PublicationMissing(publication)) => { + let err = DefiniteError::PublicationDropped(publication); + // If the publication is missing there is nothing else to + // do. These errors are not retractable. + for (oid, outputs) in table_info.iter() { + for output_index in outputs.keys() { + let update = ( + ( + *oid, + *output_index, + Err(DataflowError::from(err.clone())), + ), + data_cap_set[0].time().clone(), + 1, + ); + data_output.give_fueled(&data_cap_set[0], update).await; + } + } + definite_error_handle.give( + &definite_error_cap_set[0], + ReplicationError::Definite(Rc::new(err)), + ); + return Ok(()); + } + Err(e) => Err(TransientError::from(e))?, + } + }; + for (&oid, outputs) in table_info.iter() { + for (output_index, info) in outputs { + if errored.contains(output_index) { + trace!(%id, "timely-{worker_id} output index {output_index} \ + for oid {oid} skipped"); + continue; + } + match verify_schema(oid, &info.desc, &upstream_info, &*info.casts) { + Ok(()) => { + trace!(%id, "timely-{worker_id} schema of output \ + index {output_index} for oid {oid} valid"); + } + Err(err) => { + trace!(%id, "timely-{worker_id} schema of output \ + index {output_index} for oid {oid} invalid"); + let update = ( + (oid, *output_index, Err(err.into())), + data_cap_set[0].time().clone(), + 1, + ); + data_output.give_fueled(&data_cap_set[0], update).await; + errored.insert(*output_index); + } + } + } + } + last_schema_validation = Instant::now(); + } data_upper = std::cmp::max(data_upper, keepalive.wal_end().into()); } Ok(_) => return Err(TransientError::UnknownReplicationMessage), @@ -570,7 +635,7 @@ pub(crate) fn render>( async fn raw_stream<'a>( config: &'a RawSourceCreationConfig, replication_client: Client, - metadata_client: Client, + metadata_client: Arc, slot: &'a str, timeline_id: &'a Option, publication: &'a str, @@ -597,7 +662,7 @@ async fn raw_stream<'a>( >, TransientError, > { - if let Err(err) = ensure_publication_exists(&metadata_client, publication).await? { + if let Err(err) = ensure_publication_exists(&*metadata_client, publication).await? { // If the publication gets deleted there is nothing else to do. These errors // are not retractable. return Ok(Err(err)); @@ -628,7 +693,7 @@ async fn raw_stream<'a>( // Note: We must use the metadata client here which is NOT in replication mode. Some Aurora // Postgres versions disallow SHOW commands from within replication connection. // See: https://github.com/readysettech/readyset/discussions/28#discussioncomment-4405671 - let row = simple_query_opt(&metadata_client, "SHOW wal_sender_timeout;") + let row = simple_query_opt(&*metadata_client, "SHOW wal_sender_timeout;") .await? .unwrap(); let wal_sender_timeout = match row.get("wal_sender_timeout") { @@ -680,7 +745,7 @@ async fn raw_stream<'a>( // cannot use the replication client to do that because it's already in CopyBoth mode. // [1] https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION-SLOT-LOGICAL let slot_metadata = super::fetch_slot_metadata( - &metadata_client, + &*metadata_client, slot, mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL .get(config.config.config_set()), @@ -704,7 +769,7 @@ async fn raw_stream<'a>( while !probe_tx.is_closed() { interval.tick().await; let probe_ts = mz_repr::Timestamp::try_from((now_fn)()).expect("must fit"); - let probe_or_err = super::fetch_max_lsn(&metadata_client) + let probe_or_err = super::fetch_max_lsn(&*metadata_client) .await .map(|lsn| Probe { probe_ts, @@ -800,10 +865,9 @@ async fn raw_stream<'a>( fn extract_transaction<'a>( stream: impl AsyncStream, TransientError>> + 'a, + metadata_client: &'a Client, commit_lsn: MzOffset, table_info: &'a BTreeMap>, - connection_config: &'a mz_postgres_util::Config, - ssh_tunnel_manager: &'a SshTunnelManager, metrics: &'a PgSourceMetrics, publication: &'a str, errored_outputs: &'a mut HashSet, @@ -919,11 +983,9 @@ fn extract_transaction<'a>( // to check the current local schema against the current remote schema to // ensure e.g. we haven't received a schema update with the same terminal // column name which is actually a different column. - let client = connection_config - .connect("replication schema verification", ssh_tunnel_manager) - .await?; let upstream_info = - mz_postgres_util::publication_info(&client, publication).await?; + mz_postgres_util::publication_info(metadata_client, publication) + .await?; let upstream_info = upstream_info.into_iter().map(|t| (t.oid, t)).collect(); for (output_index, output) in valid_outputs { diff --git a/test/pg-cdc-old-syntax/alter-table-after-source.td b/test/pg-cdc-old-syntax/alter-table-after-source.td index ff7b0e83627c3..578b06a93bd0a 100644 --- a/test/pg-cdc-old-syntax/alter-table-after-source.td +++ b/test/pg-cdc-old-syntax/alter-table-after-source.td @@ -11,6 +11,9 @@ # Test ALTER TABLE -- source will error out for tables which existed when the source was created # +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET pg_schema_validation_interval = '2s'; + > CREATE SECRET pgpass AS 'postgres' > CREATE CONNECTION pg TO POSTGRES ( HOST postgres, @@ -132,7 +135,6 @@ INSERT INTO add_columns VALUES (2, 'ab'); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE remove_column DROP COLUMN f2; -INSERT INTO remove_column VALUES (3); ! SELECT * from remove_column; contains:altered @@ -146,7 +148,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_column ALTER COLUMN f2 TYPE CHAR(2); -INSERT INTO alter_column VALUES (3, 'bc'); ! SELECT * from alter_column; contains:altered @@ -160,7 +161,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_drop_nullability ALTER COLUMN f1 DROP NOT NULL; -INSERT INTO alter_drop_nullability VALUES (NULL); ! SELECT * FROM alter_drop_nullability WHERE f1 IS NOT NULL; contains:altered @@ -193,7 +193,6 @@ INSERT INTO alter_add_nullability VALUES (1); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_drop_pk DROP CONSTRAINT alter_drop_pk_pkey; -INSERT INTO alter_drop_pk VALUES (1); ! SELECT f1 FROM alter_drop_pk; contains:altered @@ -223,7 +222,6 @@ INSERT INTO alter_add_pk VALUES (2); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_cycle_pk DROP CONSTRAINT alter_cycle_pk_pkey; ALTER TABLE alter_cycle_pk ADD PRIMARY KEY(f1); -INSERT INTO alter_cycle_pk VALUES (2); ! SELECT * FROM alter_cycle_pk; contains:altered @@ -254,7 +252,6 @@ INSERT INTO alter_cycle_pk_off VALUES (1); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_drop_unique DROP CONSTRAINT alter_drop_unique_f1_key; -INSERT INTO alter_drop_unique VALUES (1); ! SELECT f1 FROM alter_drop_unique; contains:altered @@ -283,7 +280,6 @@ ab $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_extend_column ALTER COLUMN f1 TYPE VARCHAR(20); -INSERT INTO alter_extend_column VALUES ('abcd'); ! SELECT * FROM alter_extend_column; contains:altered @@ -296,7 +292,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_decimal ALTER COLUMN f1 TYPE DECIMAL(6,1); -INSERT INTO alter_decimal VALUES (12345.6); ! SELECT * FROM alter_decimal; contains:altered @@ -310,20 +305,12 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_rename RENAME TO alter_table_renamed; -INSERT INTO alter_table_renamed VALUES (2); ! SELECT * FROM alter_table_rename; contains:altered -$ postgres-execute connection=postgres://postgres:postgres@postgres -INSERT INTO alter_table_renamed VALUES (3); - -! SELECT * FROM alter_table_renamed; -contains:unknown - - # -# Alter table rename colum +# Alter table rename column > SELECT * FROM alter_table_rename_column; f1_orig f2_orig @@ -332,7 +319,6 @@ $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_rename_column RENAME COLUMN f1 TO f3; ALTER TABLE alter_table_rename_column RENAME COLUMN f2 TO f1; ALTER TABLE alter_table_rename_column RENAME COLUMN f3 TO f2; -INSERT INTO alter_table_rename_column (f1, f2) VALUES ('f1_renamed', 'f2_renamed'); ! SELECT * FROM alter_table_rename_column; contains:altered @@ -348,7 +334,6 @@ f1_orig f2_orig $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_change_attnum DROP COLUMN f2; ALTER TABLE alter_table_change_attnum ADD COLUMN f2 VARCHAR(10); -INSERT INTO alter_table_change_attnum (f1, f2) VALUES ('f1_changed', 'f2_changed'); ! SELECT * FROM alter_table_change_attnum; contains:altered @@ -375,7 +360,6 @@ INSERT INTO alter_table_supported (f1, f2) VALUES (3, 3); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_supported DROP COLUMN f2; -INSERT INTO alter_table_supported (f1) VALUES (1); ! SELECT * from alter_table_supported; contains:altered @@ -389,7 +373,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres TRUNCATE truncate_table; -INSERT INTO truncate_table VALUES (1, 1); ! SELECT * FROM truncate_table; contains:table was truncated @@ -400,17 +383,8 @@ contains:table was truncated > SELECT * from drop_table; 1 1 -# Drop a table and then trigger a `Relation` message $ postgres-execute connection=postgres://postgres:postgres@postgres DROP TABLE drop_table; -ALTER TABLE add_columns ADD COLUMN f3 int; -INSERT INTO add_columns VALUES (3, 'cd', 1); - -# Ensure we've seen the relation message. -> SELECT * from add_columns; -1 -2 -3 # Table is dropped ! SELECT * FROM drop_table; diff --git a/test/pg-cdc/alter-source.td b/test/pg-cdc/alter-source.td index 6a1d2e3a02208..760dd43ef3549 100644 --- a/test/pg-cdc/alter-source.td +++ b/test/pg-cdc/alter-source.td @@ -63,11 +63,8 @@ CREATE TABLE table_a (pk INTEGER PRIMARY KEY, f2 TEXT); ALTER TABLE table_a REPLICA IDENTITY FULL; INSERT INTO table_a VALUES (9, 'nine'); -# Current table_a is not new table_a. Note that this only works right now -# because we are bad at detecting dropped tables. -> SELECT * FROM table_a; -1 one -2 two +! SELECT * FROM table_a; +contains:table was dropped # We are not aware that the new table_a is different ! CREATE TABLE table_a FROM SOURCE mz_source (REFERENCE table_a); @@ -285,7 +282,6 @@ tb "" # If your schema change breaks the subsource, you can fix it. $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE table_a DROP COLUMN f2; -INSERT INTO table_a VALUES (3); ! SELECT * FROM table_a; contains:incompatible schema change @@ -303,7 +299,6 @@ true > SELECT * FROM table_a; 1 2 -3 # If you add columns you can re-ingest them $ postgres-execute connection=postgres://postgres:postgres@postgres diff --git a/test/pg-cdc/alter-table-after-source.td b/test/pg-cdc/alter-table-after-source.td index cb35eae4a9551..56dfa291ef31f 100644 --- a/test/pg-cdc/alter-table-after-source.td +++ b/test/pg-cdc/alter-table-after-source.td @@ -11,6 +11,9 @@ # Test ALTER TABLE -- source will error out for tables which existed when the source was created # +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET pg_schema_validation_interval = '2s'; + > CREATE SECRET pgpass AS 'postgres' > CREATE CONNECTION pg TO POSTGRES ( HOST postgres, @@ -150,7 +153,6 @@ INSERT INTO add_columns VALUES (2, 'ab'); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE remove_column DROP COLUMN f2; -INSERT INTO remove_column VALUES (3); ! SELECT * from remove_column; contains:altered @@ -164,7 +166,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_column ALTER COLUMN f2 TYPE CHAR(2); -INSERT INTO alter_column VALUES (3, 'bc'); ! SELECT * from alter_column; contains:altered @@ -178,7 +179,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_drop_nullability ALTER COLUMN f1 DROP NOT NULL; -INSERT INTO alter_drop_nullability VALUES (NULL); ! SELECT * FROM alter_drop_nullability WHERE f1 IS NOT NULL; contains:altered @@ -211,7 +211,6 @@ INSERT INTO alter_add_nullability VALUES (1); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_drop_pk DROP CONSTRAINT alter_drop_pk_pkey; -INSERT INTO alter_drop_pk VALUES (1); ! SELECT f1 FROM alter_drop_pk; contains:altered @@ -241,7 +240,6 @@ INSERT INTO alter_add_pk VALUES (2); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_cycle_pk DROP CONSTRAINT alter_cycle_pk_pkey; ALTER TABLE alter_cycle_pk ADD PRIMARY KEY(f1); -INSERT INTO alter_cycle_pk VALUES (2); ! SELECT * FROM alter_cycle_pk; contains:altered @@ -272,7 +270,6 @@ INSERT INTO alter_cycle_pk_off VALUES (1); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_drop_unique DROP CONSTRAINT alter_drop_unique_f1_key; -INSERT INTO alter_drop_unique VALUES (1); ! SELECT f1 FROM alter_drop_unique; contains:altered @@ -301,7 +298,6 @@ ab $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_extend_column ALTER COLUMN f1 TYPE VARCHAR(20); -INSERT INTO alter_extend_column VALUES ('abcd'); ! SELECT * FROM alter_extend_column; contains:altered @@ -314,7 +310,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_decimal ALTER COLUMN f1 TYPE DECIMAL(6,1); -INSERT INTO alter_decimal VALUES (12345.6); ! SELECT * FROM alter_decimal; contains:altered @@ -328,18 +323,10 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_rename RENAME TO alter_table_renamed; -INSERT INTO alter_table_renamed VALUES (2); ! SELECT * FROM alter_table_rename; contains:altered -$ postgres-execute connection=postgres://postgres:postgres@postgres -INSERT INTO alter_table_renamed VALUES (3); - -! SELECT * FROM alter_table_renamed; -contains:unknown - - # # Alter table rename colum @@ -350,7 +337,6 @@ $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_rename_column RENAME COLUMN f1 TO f3; ALTER TABLE alter_table_rename_column RENAME COLUMN f2 TO f1; ALTER TABLE alter_table_rename_column RENAME COLUMN f3 TO f2; -INSERT INTO alter_table_rename_column (f1, f2) VALUES ('f1_renamed', 'f2_renamed'); ! SELECT * FROM alter_table_rename_column; contains:altered @@ -366,7 +352,6 @@ f1_orig f2_orig $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_change_attnum DROP COLUMN f2; ALTER TABLE alter_table_change_attnum ADD COLUMN f2 VARCHAR(10); -INSERT INTO alter_table_change_attnum (f1, f2) VALUES ('f1_changed', 'f2_changed'); ! SELECT * FROM alter_table_change_attnum; contains:altered @@ -393,7 +378,6 @@ INSERT INTO alter_table_supported (f1, f2) VALUES (3, 3); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_supported DROP COLUMN f2; -INSERT INTO alter_table_supported (f1) VALUES (1); ! SELECT * from alter_table_supported; contains:altered @@ -407,7 +391,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres TRUNCATE truncate_table; -INSERT INTO truncate_table VALUES (1, 1); ! SELECT * FROM truncate_table; contains:table was truncated @@ -418,17 +401,8 @@ contains:table was truncated > SELECT * from drop_table; 1 1 -# Drop a table and then trigger a `Relation` message $ postgres-execute connection=postgres://postgres:postgres@postgres DROP TABLE drop_table; -ALTER TABLE add_columns ADD COLUMN f3 int; -INSERT INTO add_columns VALUES (3, 'cd', 1); - -# Ensure we've seen the relation message. -> SELECT * from add_columns; -1 -2 -3 # Table is dropped ! SELECT * FROM drop_table;