Skip to content

Commit

Permalink
source/pg: proactively validate schemas
Browse files Browse the repository at this point in the history
The PostgreSQL logical replication protocol only notifies us that a
table has changed schema if that table participates in a transaction
*after* the schema change has occured.

This is problematic because until that table participates in a
transaction Materialize will keep advancing the upper frontier of the
table, asserting that everything is fine and potentially revealing
invalid data.

This PR puts an upper bound on this window of invalidity by proactively
re-validating the schema of all tables in an ingestion in a cadence. The
interval is set to 15s by default but configurable through LD.

Signed-off-by: Petros Angelatos <[email protected]>
  • Loading branch information
petrosagg committed Dec 12, 2024
1 parent bd39bef commit 549d9bd
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 86 deletions.
8 changes: 8 additions & 0 deletions src/storage-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ pub const PG_OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
"Interval to fetch `offset_known`, from `pg_current_wal_lsn`",
);

/// Interval to re-validate the schemas of ingested tables.
pub const PG_SCHEMA_VALIDATION_INTERVAL: Config<Duration> = Config::new(
"pg_schema_validation_interval",
Duration::from_secs(15),
"Interval to re-validate the schemas of ingested tables.",
);

// Networking

/// Whether or not to enforce that external connection addresses are global
Expand Down Expand Up @@ -236,6 +243,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&MYSQL_OFFSET_KNOWN_INTERVAL)
.add(&PG_FETCH_SLOT_RESUME_LSN_INTERVAL)
.add(&PG_OFFSET_KNOWN_INTERVAL)
.add(&PG_SCHEMA_VALIDATION_INTERVAL)
.add(&ENFORCE_EXTERNAL_ADDRESSES)
.add(&STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING)
.add(&STORAGE_ROCKSDB_USE_MERGE_OPERATOR)
Expand Down
102 changes: 82 additions & 20 deletions src/storage/src/source/postgres/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
use std::time::Instant;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use bytes::Bytes;
Expand All @@ -86,10 +87,10 @@ use mz_ore::collections::HashSet;
use mz_ore::future::InTask;
use mz_ore::iter::IteratorExt;
use mz_postgres_util::tunnel::PostgresFlavor;
use mz_postgres_util::PostgresError;
use mz_postgres_util::{simple_query_opt, Client};
use mz_repr::{Datum, DatumVec, Diff, Row};
use mz_sql_parser::ast::{display::AstDisplay, Ident};
use mz_ssh_util::tunnel_manager::SshTunnelManager;
use mz_storage_types::errors::DataflowError;
use mz_storage_types::sources::SourceTimestamp;
use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
Expand Down Expand Up @@ -234,14 +235,15 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
&config.config.connection_context.ssh_tunnel_manager,
)
.await?;
let metadata_client = Arc::new(metadata_client);

while let Some(_) = slot_ready_input.next().await {
// Wait for the slot to be created
}
tracing::info!(%id, "ensuring replication slot {slot} exists");
super::ensure_replication_slot(&replication_client, slot).await?;
let slot_metadata = super::fetch_slot_metadata(
&metadata_client,
&*metadata_client,
slot,
mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
.get(config.config.config_set()),
Expand Down Expand Up @@ -366,7 +368,7 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
let stream_result = raw_stream(
&config,
replication_client,
metadata_client,
Arc::clone(&metadata_client),
&connection.publication_details.slot,
&connection.publication_details.timeline_id,
&connection.publication,
Expand Down Expand Up @@ -417,6 +419,7 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
// compatible with `columnation`, to Vec<u8> data that is.
let mut col_temp: Vec<Vec<u8>> = vec![];
let mut row_temp = vec![];
let mut last_schema_validation = Instant::now();
while let Some(event) = stream.as_mut().next().await {
use LogicalReplicationMessage::*;
use ReplicationMessage::*;
Expand All @@ -427,10 +430,9 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(

let mut tx = pin!(extract_transaction(
stream.by_ref(),
&*metadata_client,
commit_lsn,
&table_info,
&connection_config,
&config.config.connection_context.ssh_tunnel_manager,
&metrics,
&connection.publication,
&mut errored
Expand Down Expand Up @@ -492,12 +494,75 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
_ => return Err(TransientError::BareTransactionEvent),
},
Ok(PrimaryKeepAlive(keepalive)) => {
trace!(
%id,
"timely-{worker_id} received \
keepalive lsn={}",
trace!( %id,
"timely-{worker_id} received keepalive lsn={}",
keepalive.wal_end()
);
let validation_interval = mz_storage_types::dyncfgs::PG_SCHEMA_VALIDATION_INTERVAL.get(config.config.config_set());
if last_schema_validation.elapsed() > validation_interval {
trace!(%id, "timely-{worker_id} validating schemas");
let upstream_info = {
match mz_postgres_util::publication_info(&*metadata_client, &connection.publication)
.await
{
Ok(info) => info.into_iter().map(|t| (t.oid, t)).collect(),
// If the replication stream cannot be obtained in a definite way there is
// nothing else to do. These errors are not retractable.
Err(PostgresError::PublicationMissing(publication)) => {
let err = DefiniteError::PublicationDropped(publication);
// If the publication is missing there is nothing else to
// do. These errors are not retractable.
for (oid, outputs) in table_info.iter() {
for output_index in outputs.keys() {
let update = (
(
*oid,
*output_index,
Err(DataflowError::from(err.clone())),
),
data_cap_set[0].time().clone(),
1,
);
data_output.give_fueled(&data_cap_set[0], update).await;
}
}
definite_error_handle.give(
&definite_error_cap_set[0],
ReplicationError::Definite(Rc::new(err)),
);
return Ok(());
}
Err(e) => Err(TransientError::from(e))?,
}
};
for (&oid, outputs) in table_info.iter() {
for (output_index, info) in outputs {
if errored.contains(output_index) {
trace!(%id, "timely-{worker_id} output index {output_index} \
for oid {oid} skipped");
continue;
}
match verify_schema(oid, &info.desc, &upstream_info, &*info.casts) {
Ok(()) => {
trace!(%id, "timely-{worker_id} schema of output \
index {output_index} for oid {oid} valid");
}
Err(err) => {
trace!(%id, "timely-{worker_id} schema of output \
index {output_index} for oid {oid} invalid");
let update = (
(oid, *output_index, Err(err.into())),
data_cap_set[0].time().clone(),
1,
);
data_output.give_fueled(&data_cap_set[0], update).await;
errored.insert(*output_index);
}
}
}
}
last_schema_validation = Instant::now();
}
data_upper = std::cmp::max(data_upper, keepalive.wal_end().into());
}
Ok(_) => return Err(TransientError::UnknownReplicationMessage),
Expand Down Expand Up @@ -570,7 +635,7 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
async fn raw_stream<'a>(
config: &'a RawSourceCreationConfig,
replication_client: Client,
metadata_client: Client,
metadata_client: Arc<Client>,
slot: &'a str,
timeline_id: &'a Option<u64>,
publication: &'a str,
Expand All @@ -597,7 +662,7 @@ async fn raw_stream<'a>(
>,
TransientError,
> {
if let Err(err) = ensure_publication_exists(&metadata_client, publication).await? {
if let Err(err) = ensure_publication_exists(&*metadata_client, publication).await? {
// If the publication gets deleted there is nothing else to do. These errors
// are not retractable.
return Ok(Err(err));
Expand Down Expand Up @@ -628,7 +693,7 @@ async fn raw_stream<'a>(
// Note: We must use the metadata client here which is NOT in replication mode. Some Aurora
// Postgres versions disallow SHOW commands from within replication connection.
// See: https://github.com/readysettech/readyset/discussions/28#discussioncomment-4405671
let row = simple_query_opt(&metadata_client, "SHOW wal_sender_timeout;")
let row = simple_query_opt(&*metadata_client, "SHOW wal_sender_timeout;")
.await?
.unwrap();
let wal_sender_timeout = match row.get("wal_sender_timeout") {
Expand Down Expand Up @@ -680,7 +745,7 @@ async fn raw_stream<'a>(
// cannot use the replication client to do that because it's already in CopyBoth mode.
// [1] https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION-SLOT-LOGICAL
let slot_metadata = super::fetch_slot_metadata(
&metadata_client,
&*metadata_client,
slot,
mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
.get(config.config.config_set()),
Expand All @@ -704,7 +769,7 @@ async fn raw_stream<'a>(
while !probe_tx.is_closed() {
interval.tick().await;
let probe_ts = mz_repr::Timestamp::try_from((now_fn)()).expect("must fit");
let probe_or_err = super::fetch_max_lsn(&metadata_client)
let probe_or_err = super::fetch_max_lsn(&*metadata_client)
.await
.map(|lsn| Probe {
probe_ts,
Expand Down Expand Up @@ -800,10 +865,9 @@ async fn raw_stream<'a>(
fn extract_transaction<'a>(
stream: impl AsyncStream<Item = Result<ReplicationMessage<LogicalReplicationMessage>, TransientError>>
+ 'a,
metadata_client: &'a Client,
commit_lsn: MzOffset,
table_info: &'a BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
connection_config: &'a mz_postgres_util::Config,
ssh_tunnel_manager: &'a SshTunnelManager,
metrics: &'a PgSourceMetrics,
publication: &'a str,
errored_outputs: &'a mut HashSet<usize>,
Expand Down Expand Up @@ -919,11 +983,9 @@ fn extract_transaction<'a>(
// to check the current local schema against the current remote schema to
// ensure e.g. we haven't received a schema update with the same terminal
// column name which is actually a different column.
let client = connection_config
.connect("replication schema verification", ssh_tunnel_manager)
.await?;
let upstream_info =
mz_postgres_util::publication_info(&client, publication).await?;
mz_postgres_util::publication_info(metadata_client, publication)
.await?;
let upstream_info = upstream_info.into_iter().map(|t| (t.oid, t)).collect();

for (output_index, output) in valid_outputs {
Expand Down
34 changes: 4 additions & 30 deletions test/pg-cdc-old-syntax/alter-table-after-source.td
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
# Test ALTER TABLE -- source will error out for tables which existed when the source was created
#

$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET pg_schema_validation_interval = '2s';

> CREATE SECRET pgpass AS 'postgres'
> CREATE CONNECTION pg TO POSTGRES (
HOST postgres,
Expand Down Expand Up @@ -132,7 +135,6 @@ INSERT INTO add_columns VALUES (2, 'ab');

$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE remove_column DROP COLUMN f2;
INSERT INTO remove_column VALUES (3);

! SELECT * from remove_column;
contains:altered
Expand All @@ -146,7 +148,6 @@ contains:altered

$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE alter_column ALTER COLUMN f2 TYPE CHAR(2);
INSERT INTO alter_column VALUES (3, 'bc');

! SELECT * from alter_column;
contains:altered
Expand All @@ -160,7 +161,6 @@ contains:altered

$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE alter_drop_nullability ALTER COLUMN f1 DROP NOT NULL;
INSERT INTO alter_drop_nullability VALUES (NULL);

! SELECT * FROM alter_drop_nullability WHERE f1 IS NOT NULL;
contains:altered
Expand Down Expand Up @@ -193,7 +193,6 @@ INSERT INTO alter_add_nullability VALUES (1);

$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE alter_drop_pk DROP CONSTRAINT alter_drop_pk_pkey;
INSERT INTO alter_drop_pk VALUES (1);

! SELECT f1 FROM alter_drop_pk;
contains:altered
Expand Down Expand Up @@ -223,7 +222,6 @@ INSERT INTO alter_add_pk VALUES (2);
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE alter_cycle_pk DROP CONSTRAINT alter_cycle_pk_pkey;
ALTER TABLE alter_cycle_pk ADD PRIMARY KEY(f1);
INSERT INTO alter_cycle_pk VALUES (2);

! SELECT * FROM alter_cycle_pk;
contains:altered
Expand Down Expand Up @@ -254,7 +252,6 @@ INSERT INTO alter_cycle_pk_off VALUES (1);

$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE alter_drop_unique DROP CONSTRAINT alter_drop_unique_f1_key;
INSERT INTO alter_drop_unique VALUES (1);

! SELECT f1 FROM alter_drop_unique;
contains:altered
Expand Down Expand Up @@ -283,7 +280,6 @@ ab

$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE alter_extend_column ALTER COLUMN f1 TYPE VARCHAR(20);
INSERT INTO alter_extend_column VALUES ('abcd');

! SELECT * FROM alter_extend_column;
contains:altered
Expand All @@ -296,7 +292,6 @@ contains:altered

$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE alter_decimal ALTER COLUMN f1 TYPE DECIMAL(6,1);
INSERT INTO alter_decimal VALUES (12345.6);

! SELECT * FROM alter_decimal;
contains:altered
Expand All @@ -310,20 +305,12 @@ contains:altered

$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE alter_table_rename RENAME TO alter_table_renamed;
INSERT INTO alter_table_renamed VALUES (2);

! SELECT * FROM alter_table_rename;
contains:altered

$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO alter_table_renamed VALUES (3);

! SELECT * FROM alter_table_renamed;
contains:unknown


#
# Alter table rename colum
# Alter table rename column

> SELECT * FROM alter_table_rename_column;
f1_orig f2_orig
Expand All @@ -332,7 +319,6 @@ $ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE alter_table_rename_column RENAME COLUMN f1 TO f3;
ALTER TABLE alter_table_rename_column RENAME COLUMN f2 TO f1;
ALTER TABLE alter_table_rename_column RENAME COLUMN f3 TO f2;
INSERT INTO alter_table_rename_column (f1, f2) VALUES ('f1_renamed', 'f2_renamed');

! SELECT * FROM alter_table_rename_column;
contains:altered
Expand All @@ -348,7 +334,6 @@ f1_orig f2_orig
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE alter_table_change_attnum DROP COLUMN f2;
ALTER TABLE alter_table_change_attnum ADD COLUMN f2 VARCHAR(10);
INSERT INTO alter_table_change_attnum (f1, f2) VALUES ('f1_changed', 'f2_changed');

! SELECT * FROM alter_table_change_attnum;
contains:altered
Expand All @@ -375,7 +360,6 @@ INSERT INTO alter_table_supported (f1, f2) VALUES (3, 3);

$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE alter_table_supported DROP COLUMN f2;
INSERT INTO alter_table_supported (f1) VALUES (1);

! SELECT * from alter_table_supported;
contains:altered
Expand All @@ -389,7 +373,6 @@ contains:altered

$ postgres-execute connection=postgres://postgres:postgres@postgres
TRUNCATE truncate_table;
INSERT INTO truncate_table VALUES (1, 1);

! SELECT * FROM truncate_table;
contains:table was truncated
Expand All @@ -400,17 +383,8 @@ contains:table was truncated
> SELECT * from drop_table;
1 1

# Drop a table and then trigger a `Relation` message
$ postgres-execute connection=postgres://postgres:postgres@postgres
DROP TABLE drop_table;
ALTER TABLE add_columns ADD COLUMN f3 int;
INSERT INTO add_columns VALUES (3, 'cd', 1);

# Ensure we've seen the relation message.
> SELECT * from add_columns;
1
2
3

# Table is dropped
! SELECT * FROM drop_table;
Expand Down
Loading

0 comments on commit 549d9bd

Please sign in to comment.