From 549d9bd861aea566f04daf11cf8e6d90fcf50f86 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Thu, 12 Dec 2024 11:01:43 -0500 Subject: [PATCH] source/pg: proactively validate schemas The PostgreSQL logical replication protocol only notifies us that a table has changed schema if that table participates in a transaction *after* the schema change has occured. This is problematic because until that table participates in a transaction Materialize will keep advancing the upper frontier of the table, asserting that everything is fine and potentially revealing invalid data. This PR puts an upper bound on this window of invalidity by proactively re-validating the schema of all tables in an ingestion in a cadence. The interval is set to 15s by default but configurable through LD. Signed-off-by: Petros Angelatos --- src/storage-types/src/dyncfgs.rs | 8 ++ .../src/source/postgres/replication.rs | 102 ++++++++++++++---- .../alter-table-after-source.td | 34 +----- test/pg-cdc/alter-source.td | 9 +- test/pg-cdc/alter-table-after-source.td | 32 +----- 5 files changed, 99 insertions(+), 86 deletions(-) diff --git a/src/storage-types/src/dyncfgs.rs b/src/storage-types/src/dyncfgs.rs index 9b2fc851d6033..85cbc916f2135 100644 --- a/src/storage-types/src/dyncfgs.rs +++ b/src/storage-types/src/dyncfgs.rs @@ -139,6 +139,13 @@ pub const PG_OFFSET_KNOWN_INTERVAL: Config = Config::new( "Interval to fetch `offset_known`, from `pg_current_wal_lsn`", ); +/// Interval to re-validate the schemas of ingested tables. +pub const PG_SCHEMA_VALIDATION_INTERVAL: Config = Config::new( + "pg_schema_validation_interval", + Duration::from_secs(15), + "Interval to re-validate the schemas of ingested tables.", +); + // Networking /// Whether or not to enforce that external connection addresses are global @@ -236,6 +243,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&MYSQL_OFFSET_KNOWN_INTERVAL) .add(&PG_FETCH_SLOT_RESUME_LSN_INTERVAL) .add(&PG_OFFSET_KNOWN_INTERVAL) + .add(&PG_SCHEMA_VALIDATION_INTERVAL) .add(&ENFORCE_EXTERNAL_ADDRESSES) .add(&STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING) .add(&STORAGE_ROCKSDB_USE_MERGE_OPERATOR) diff --git a/src/storage/src/source/postgres/replication.rs b/src/storage/src/source/postgres/replication.rs index 9d7d5a9fc9108..0346c27619372 100644 --- a/src/storage/src/source/postgres/replication.rs +++ b/src/storage/src/source/postgres/replication.rs @@ -76,6 +76,7 @@ use std::rc::Rc; use std::str::FromStr; use std::sync::Arc; use std::sync::LazyLock; +use std::time::Instant; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use bytes::Bytes; @@ -86,10 +87,10 @@ use mz_ore::collections::HashSet; use mz_ore::future::InTask; use mz_ore::iter::IteratorExt; use mz_postgres_util::tunnel::PostgresFlavor; +use mz_postgres_util::PostgresError; use mz_postgres_util::{simple_query_opt, Client}; use mz_repr::{Datum, DatumVec, Diff, Row}; use mz_sql_parser::ast::{display::AstDisplay, Ident}; -use mz_ssh_util::tunnel_manager::SshTunnelManager; use mz_storage_types::errors::DataflowError; use mz_storage_types::sources::SourceTimestamp; use mz_storage_types::sources::{MzOffset, PostgresSourceConnection}; @@ -234,6 +235,7 @@ pub(crate) fn render>( &config.config.connection_context.ssh_tunnel_manager, ) .await?; + let metadata_client = Arc::new(metadata_client); while let Some(_) = slot_ready_input.next().await { // Wait for the slot to be created @@ -241,7 +243,7 @@ pub(crate) fn render>( tracing::info!(%id, "ensuring replication slot {slot} exists"); super::ensure_replication_slot(&replication_client, slot).await?; let slot_metadata = super::fetch_slot_metadata( - &metadata_client, + &*metadata_client, slot, mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL .get(config.config.config_set()), @@ -366,7 +368,7 @@ pub(crate) fn render>( let stream_result = raw_stream( &config, replication_client, - metadata_client, + Arc::clone(&metadata_client), &connection.publication_details.slot, &connection.publication_details.timeline_id, &connection.publication, @@ -417,6 +419,7 @@ pub(crate) fn render>( // compatible with `columnation`, to Vec data that is. let mut col_temp: Vec> = vec![]; let mut row_temp = vec![]; + let mut last_schema_validation = Instant::now(); while let Some(event) = stream.as_mut().next().await { use LogicalReplicationMessage::*; use ReplicationMessage::*; @@ -427,10 +430,9 @@ pub(crate) fn render>( let mut tx = pin!(extract_transaction( stream.by_ref(), + &*metadata_client, commit_lsn, &table_info, - &connection_config, - &config.config.connection_context.ssh_tunnel_manager, &metrics, &connection.publication, &mut errored @@ -492,12 +494,75 @@ pub(crate) fn render>( _ => return Err(TransientError::BareTransactionEvent), }, Ok(PrimaryKeepAlive(keepalive)) => { - trace!( - %id, - "timely-{worker_id} received \ - keepalive lsn={}", + trace!( %id, + "timely-{worker_id} received keepalive lsn={}", keepalive.wal_end() ); + let validation_interval = mz_storage_types::dyncfgs::PG_SCHEMA_VALIDATION_INTERVAL.get(config.config.config_set()); + if last_schema_validation.elapsed() > validation_interval { + trace!(%id, "timely-{worker_id} validating schemas"); + let upstream_info = { + match mz_postgres_util::publication_info(&*metadata_client, &connection.publication) + .await + { + Ok(info) => info.into_iter().map(|t| (t.oid, t)).collect(), + // If the replication stream cannot be obtained in a definite way there is + // nothing else to do. These errors are not retractable. + Err(PostgresError::PublicationMissing(publication)) => { + let err = DefiniteError::PublicationDropped(publication); + // If the publication is missing there is nothing else to + // do. These errors are not retractable. + for (oid, outputs) in table_info.iter() { + for output_index in outputs.keys() { + let update = ( + ( + *oid, + *output_index, + Err(DataflowError::from(err.clone())), + ), + data_cap_set[0].time().clone(), + 1, + ); + data_output.give_fueled(&data_cap_set[0], update).await; + } + } + definite_error_handle.give( + &definite_error_cap_set[0], + ReplicationError::Definite(Rc::new(err)), + ); + return Ok(()); + } + Err(e) => Err(TransientError::from(e))?, + } + }; + for (&oid, outputs) in table_info.iter() { + for (output_index, info) in outputs { + if errored.contains(output_index) { + trace!(%id, "timely-{worker_id} output index {output_index} \ + for oid {oid} skipped"); + continue; + } + match verify_schema(oid, &info.desc, &upstream_info, &*info.casts) { + Ok(()) => { + trace!(%id, "timely-{worker_id} schema of output \ + index {output_index} for oid {oid} valid"); + } + Err(err) => { + trace!(%id, "timely-{worker_id} schema of output \ + index {output_index} for oid {oid} invalid"); + let update = ( + (oid, *output_index, Err(err.into())), + data_cap_set[0].time().clone(), + 1, + ); + data_output.give_fueled(&data_cap_set[0], update).await; + errored.insert(*output_index); + } + } + } + } + last_schema_validation = Instant::now(); + } data_upper = std::cmp::max(data_upper, keepalive.wal_end().into()); } Ok(_) => return Err(TransientError::UnknownReplicationMessage), @@ -570,7 +635,7 @@ pub(crate) fn render>( async fn raw_stream<'a>( config: &'a RawSourceCreationConfig, replication_client: Client, - metadata_client: Client, + metadata_client: Arc, slot: &'a str, timeline_id: &'a Option, publication: &'a str, @@ -597,7 +662,7 @@ async fn raw_stream<'a>( >, TransientError, > { - if let Err(err) = ensure_publication_exists(&metadata_client, publication).await? { + if let Err(err) = ensure_publication_exists(&*metadata_client, publication).await? { // If the publication gets deleted there is nothing else to do. These errors // are not retractable. return Ok(Err(err)); @@ -628,7 +693,7 @@ async fn raw_stream<'a>( // Note: We must use the metadata client here which is NOT in replication mode. Some Aurora // Postgres versions disallow SHOW commands from within replication connection. // See: https://github.com/readysettech/readyset/discussions/28#discussioncomment-4405671 - let row = simple_query_opt(&metadata_client, "SHOW wal_sender_timeout;") + let row = simple_query_opt(&*metadata_client, "SHOW wal_sender_timeout;") .await? .unwrap(); let wal_sender_timeout = match row.get("wal_sender_timeout") { @@ -680,7 +745,7 @@ async fn raw_stream<'a>( // cannot use the replication client to do that because it's already in CopyBoth mode. // [1] https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION-SLOT-LOGICAL let slot_metadata = super::fetch_slot_metadata( - &metadata_client, + &*metadata_client, slot, mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL .get(config.config.config_set()), @@ -704,7 +769,7 @@ async fn raw_stream<'a>( while !probe_tx.is_closed() { interval.tick().await; let probe_ts = mz_repr::Timestamp::try_from((now_fn)()).expect("must fit"); - let probe_or_err = super::fetch_max_lsn(&metadata_client) + let probe_or_err = super::fetch_max_lsn(&*metadata_client) .await .map(|lsn| Probe { probe_ts, @@ -800,10 +865,9 @@ async fn raw_stream<'a>( fn extract_transaction<'a>( stream: impl AsyncStream, TransientError>> + 'a, + metadata_client: &'a Client, commit_lsn: MzOffset, table_info: &'a BTreeMap>, - connection_config: &'a mz_postgres_util::Config, - ssh_tunnel_manager: &'a SshTunnelManager, metrics: &'a PgSourceMetrics, publication: &'a str, errored_outputs: &'a mut HashSet, @@ -919,11 +983,9 @@ fn extract_transaction<'a>( // to check the current local schema against the current remote schema to // ensure e.g. we haven't received a schema update with the same terminal // column name which is actually a different column. - let client = connection_config - .connect("replication schema verification", ssh_tunnel_manager) - .await?; let upstream_info = - mz_postgres_util::publication_info(&client, publication).await?; + mz_postgres_util::publication_info(metadata_client, publication) + .await?; let upstream_info = upstream_info.into_iter().map(|t| (t.oid, t)).collect(); for (output_index, output) in valid_outputs { diff --git a/test/pg-cdc-old-syntax/alter-table-after-source.td b/test/pg-cdc-old-syntax/alter-table-after-source.td index ff7b0e83627c3..578b06a93bd0a 100644 --- a/test/pg-cdc-old-syntax/alter-table-after-source.td +++ b/test/pg-cdc-old-syntax/alter-table-after-source.td @@ -11,6 +11,9 @@ # Test ALTER TABLE -- source will error out for tables which existed when the source was created # +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET pg_schema_validation_interval = '2s'; + > CREATE SECRET pgpass AS 'postgres' > CREATE CONNECTION pg TO POSTGRES ( HOST postgres, @@ -132,7 +135,6 @@ INSERT INTO add_columns VALUES (2, 'ab'); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE remove_column DROP COLUMN f2; -INSERT INTO remove_column VALUES (3); ! SELECT * from remove_column; contains:altered @@ -146,7 +148,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_column ALTER COLUMN f2 TYPE CHAR(2); -INSERT INTO alter_column VALUES (3, 'bc'); ! SELECT * from alter_column; contains:altered @@ -160,7 +161,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_drop_nullability ALTER COLUMN f1 DROP NOT NULL; -INSERT INTO alter_drop_nullability VALUES (NULL); ! SELECT * FROM alter_drop_nullability WHERE f1 IS NOT NULL; contains:altered @@ -193,7 +193,6 @@ INSERT INTO alter_add_nullability VALUES (1); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_drop_pk DROP CONSTRAINT alter_drop_pk_pkey; -INSERT INTO alter_drop_pk VALUES (1); ! SELECT f1 FROM alter_drop_pk; contains:altered @@ -223,7 +222,6 @@ INSERT INTO alter_add_pk VALUES (2); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_cycle_pk DROP CONSTRAINT alter_cycle_pk_pkey; ALTER TABLE alter_cycle_pk ADD PRIMARY KEY(f1); -INSERT INTO alter_cycle_pk VALUES (2); ! SELECT * FROM alter_cycle_pk; contains:altered @@ -254,7 +252,6 @@ INSERT INTO alter_cycle_pk_off VALUES (1); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_drop_unique DROP CONSTRAINT alter_drop_unique_f1_key; -INSERT INTO alter_drop_unique VALUES (1); ! SELECT f1 FROM alter_drop_unique; contains:altered @@ -283,7 +280,6 @@ ab $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_extend_column ALTER COLUMN f1 TYPE VARCHAR(20); -INSERT INTO alter_extend_column VALUES ('abcd'); ! SELECT * FROM alter_extend_column; contains:altered @@ -296,7 +292,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_decimal ALTER COLUMN f1 TYPE DECIMAL(6,1); -INSERT INTO alter_decimal VALUES (12345.6); ! SELECT * FROM alter_decimal; contains:altered @@ -310,20 +305,12 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_rename RENAME TO alter_table_renamed; -INSERT INTO alter_table_renamed VALUES (2); ! SELECT * FROM alter_table_rename; contains:altered -$ postgres-execute connection=postgres://postgres:postgres@postgres -INSERT INTO alter_table_renamed VALUES (3); - -! SELECT * FROM alter_table_renamed; -contains:unknown - - # -# Alter table rename colum +# Alter table rename column > SELECT * FROM alter_table_rename_column; f1_orig f2_orig @@ -332,7 +319,6 @@ $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_rename_column RENAME COLUMN f1 TO f3; ALTER TABLE alter_table_rename_column RENAME COLUMN f2 TO f1; ALTER TABLE alter_table_rename_column RENAME COLUMN f3 TO f2; -INSERT INTO alter_table_rename_column (f1, f2) VALUES ('f1_renamed', 'f2_renamed'); ! SELECT * FROM alter_table_rename_column; contains:altered @@ -348,7 +334,6 @@ f1_orig f2_orig $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_change_attnum DROP COLUMN f2; ALTER TABLE alter_table_change_attnum ADD COLUMN f2 VARCHAR(10); -INSERT INTO alter_table_change_attnum (f1, f2) VALUES ('f1_changed', 'f2_changed'); ! SELECT * FROM alter_table_change_attnum; contains:altered @@ -375,7 +360,6 @@ INSERT INTO alter_table_supported (f1, f2) VALUES (3, 3); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_supported DROP COLUMN f2; -INSERT INTO alter_table_supported (f1) VALUES (1); ! SELECT * from alter_table_supported; contains:altered @@ -389,7 +373,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres TRUNCATE truncate_table; -INSERT INTO truncate_table VALUES (1, 1); ! SELECT * FROM truncate_table; contains:table was truncated @@ -400,17 +383,8 @@ contains:table was truncated > SELECT * from drop_table; 1 1 -# Drop a table and then trigger a `Relation` message $ postgres-execute connection=postgres://postgres:postgres@postgres DROP TABLE drop_table; -ALTER TABLE add_columns ADD COLUMN f3 int; -INSERT INTO add_columns VALUES (3, 'cd', 1); - -# Ensure we've seen the relation message. -> SELECT * from add_columns; -1 -2 -3 # Table is dropped ! SELECT * FROM drop_table; diff --git a/test/pg-cdc/alter-source.td b/test/pg-cdc/alter-source.td index 6a1d2e3a02208..760dd43ef3549 100644 --- a/test/pg-cdc/alter-source.td +++ b/test/pg-cdc/alter-source.td @@ -63,11 +63,8 @@ CREATE TABLE table_a (pk INTEGER PRIMARY KEY, f2 TEXT); ALTER TABLE table_a REPLICA IDENTITY FULL; INSERT INTO table_a VALUES (9, 'nine'); -# Current table_a is not new table_a. Note that this only works right now -# because we are bad at detecting dropped tables. -> SELECT * FROM table_a; -1 one -2 two +! SELECT * FROM table_a; +contains:table was dropped # We are not aware that the new table_a is different ! CREATE TABLE table_a FROM SOURCE mz_source (REFERENCE table_a); @@ -285,7 +282,6 @@ tb "" # If your schema change breaks the subsource, you can fix it. $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE table_a DROP COLUMN f2; -INSERT INTO table_a VALUES (3); ! SELECT * FROM table_a; contains:incompatible schema change @@ -303,7 +299,6 @@ true > SELECT * FROM table_a; 1 2 -3 # If you add columns you can re-ingest them $ postgres-execute connection=postgres://postgres:postgres@postgres diff --git a/test/pg-cdc/alter-table-after-source.td b/test/pg-cdc/alter-table-after-source.td index cb35eae4a9551..56dfa291ef31f 100644 --- a/test/pg-cdc/alter-table-after-source.td +++ b/test/pg-cdc/alter-table-after-source.td @@ -11,6 +11,9 @@ # Test ALTER TABLE -- source will error out for tables which existed when the source was created # +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET pg_schema_validation_interval = '2s'; + > CREATE SECRET pgpass AS 'postgres' > CREATE CONNECTION pg TO POSTGRES ( HOST postgres, @@ -150,7 +153,6 @@ INSERT INTO add_columns VALUES (2, 'ab'); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE remove_column DROP COLUMN f2; -INSERT INTO remove_column VALUES (3); ! SELECT * from remove_column; contains:altered @@ -164,7 +166,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_column ALTER COLUMN f2 TYPE CHAR(2); -INSERT INTO alter_column VALUES (3, 'bc'); ! SELECT * from alter_column; contains:altered @@ -178,7 +179,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_drop_nullability ALTER COLUMN f1 DROP NOT NULL; -INSERT INTO alter_drop_nullability VALUES (NULL); ! SELECT * FROM alter_drop_nullability WHERE f1 IS NOT NULL; contains:altered @@ -211,7 +211,6 @@ INSERT INTO alter_add_nullability VALUES (1); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_drop_pk DROP CONSTRAINT alter_drop_pk_pkey; -INSERT INTO alter_drop_pk VALUES (1); ! SELECT f1 FROM alter_drop_pk; contains:altered @@ -241,7 +240,6 @@ INSERT INTO alter_add_pk VALUES (2); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_cycle_pk DROP CONSTRAINT alter_cycle_pk_pkey; ALTER TABLE alter_cycle_pk ADD PRIMARY KEY(f1); -INSERT INTO alter_cycle_pk VALUES (2); ! SELECT * FROM alter_cycle_pk; contains:altered @@ -272,7 +270,6 @@ INSERT INTO alter_cycle_pk_off VALUES (1); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_drop_unique DROP CONSTRAINT alter_drop_unique_f1_key; -INSERT INTO alter_drop_unique VALUES (1); ! SELECT f1 FROM alter_drop_unique; contains:altered @@ -301,7 +298,6 @@ ab $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_extend_column ALTER COLUMN f1 TYPE VARCHAR(20); -INSERT INTO alter_extend_column VALUES ('abcd'); ! SELECT * FROM alter_extend_column; contains:altered @@ -314,7 +310,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_decimal ALTER COLUMN f1 TYPE DECIMAL(6,1); -INSERT INTO alter_decimal VALUES (12345.6); ! SELECT * FROM alter_decimal; contains:altered @@ -328,18 +323,10 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_rename RENAME TO alter_table_renamed; -INSERT INTO alter_table_renamed VALUES (2); ! SELECT * FROM alter_table_rename; contains:altered -$ postgres-execute connection=postgres://postgres:postgres@postgres -INSERT INTO alter_table_renamed VALUES (3); - -! SELECT * FROM alter_table_renamed; -contains:unknown - - # # Alter table rename colum @@ -350,7 +337,6 @@ $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_rename_column RENAME COLUMN f1 TO f3; ALTER TABLE alter_table_rename_column RENAME COLUMN f2 TO f1; ALTER TABLE alter_table_rename_column RENAME COLUMN f3 TO f2; -INSERT INTO alter_table_rename_column (f1, f2) VALUES ('f1_renamed', 'f2_renamed'); ! SELECT * FROM alter_table_rename_column; contains:altered @@ -366,7 +352,6 @@ f1_orig f2_orig $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_change_attnum DROP COLUMN f2; ALTER TABLE alter_table_change_attnum ADD COLUMN f2 VARCHAR(10); -INSERT INTO alter_table_change_attnum (f1, f2) VALUES ('f1_changed', 'f2_changed'); ! SELECT * FROM alter_table_change_attnum; contains:altered @@ -393,7 +378,6 @@ INSERT INTO alter_table_supported (f1, f2) VALUES (3, 3); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER TABLE alter_table_supported DROP COLUMN f2; -INSERT INTO alter_table_supported (f1) VALUES (1); ! SELECT * from alter_table_supported; contains:altered @@ -407,7 +391,6 @@ contains:altered $ postgres-execute connection=postgres://postgres:postgres@postgres TRUNCATE truncate_table; -INSERT INTO truncate_table VALUES (1, 1); ! SELECT * FROM truncate_table; contains:table was truncated @@ -418,17 +401,8 @@ contains:table was truncated > SELECT * from drop_table; 1 1 -# Drop a table and then trigger a `Relation` message $ postgres-execute connection=postgres://postgres:postgres@postgres DROP TABLE drop_table; -ALTER TABLE add_columns ADD COLUMN f3 int; -INSERT INTO add_columns VALUES (3, 'cd', 1); - -# Ensure we've seen the relation message. -> SELECT * from add_columns; -1 -2 -3 # Table is dropped ! SELECT * FROM drop_table;