Skip to content

Commit

Permalink
Fix deadlock in drop_subscriptions_before_start (#10806)
Browse files Browse the repository at this point in the history
ALTER SUBSCRIPTION requires AccessExclusive lock
which conflicts with iteration over pg_subscription when multiple
databases are present
and operations are applied concurrently.

Fix by explicitly locking pg_subscription
in the beginning of the transaction in each database.

## Problem
neondatabase/cloud#24292
  • Loading branch information
lubennikovaav authored Feb 20, 2025
1 parent 07bee60 commit 7c7180a
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 8 deletions.
1 change: 1 addition & 0 deletions compute_tools/src/sql/drop_subscriptions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ DO $$
DECLARE
subname TEXT;
BEGIN
LOCK TABLE pg_subscription IN ACCESS EXCLUSIVE MODE;
FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname);
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
Expand Down
46 changes: 43 additions & 3 deletions control_plane/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use nix::sys::signal::Signal;
use pageserver_api::shard::ShardStripeSize;
use reqwest::header::CONTENT_TYPE;
use serde::{Deserialize, Serialize};
use tracing::debug;
use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};

Expand All @@ -81,8 +82,10 @@ pub struct EndpointConf {
internal_http_port: u16,
pg_version: u32,
skip_pg_catalog_updates: bool,
reconfigure_concurrency: usize,
drop_subscriptions_before_start: bool,
features: Vec<ComputeFeature>,
cluster: Option<Cluster>,
}

//
Expand Down Expand Up @@ -179,7 +182,9 @@ impl ComputeControlPlane {
// we also skip catalog updates in the cloud.
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
});

ep.create_endpoint_dir()?;
Expand All @@ -196,7 +201,9 @@ impl ComputeControlPlane {
pg_version,
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
})?,
)?;
std::fs::write(
Expand Down Expand Up @@ -261,8 +268,11 @@ pub struct Endpoint {
skip_pg_catalog_updates: bool,

drop_subscriptions_before_start: bool,
reconfigure_concurrency: usize,
// Feature flags
features: Vec<ComputeFeature>,
// Cluster settings
cluster: Option<Cluster>,
}

#[derive(PartialEq, Eq)]
Expand Down Expand Up @@ -302,6 +312,8 @@ impl Endpoint {
let conf: EndpointConf =
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;

debug!("serialized endpoint conf: {:?}", conf);

Ok(Endpoint {
pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port),
external_http_address: SocketAddr::new(
Expand All @@ -319,8 +331,10 @@ impl Endpoint {
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
reconfigure_concurrency: conf.reconfigure_concurrency,
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
features: conf.features,
cluster: conf.cluster,
})
}

Expand Down Expand Up @@ -607,7 +621,7 @@ impl Endpoint {
};

// Create spec file
let spec = ComputeSpec {
let mut spec = ComputeSpec {
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
format_version: 1.0,
operation_uuid: None,
Expand Down Expand Up @@ -640,7 +654,7 @@ impl Endpoint {
Vec::new()
},
settings: None,
postgresql_conf: Some(postgresql_conf),
postgresql_conf: Some(postgresql_conf.clone()),
},
delta_operations: None,
tenant_id: Some(self.tenant_id),
Expand All @@ -653,9 +667,35 @@ impl Endpoint {
pgbouncer_settings: None,
shard_stripe_size: Some(shard_stripe_size),
local_proxy_config: None,
reconfigure_concurrency: 1,
reconfigure_concurrency: self.reconfigure_concurrency,
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
};

// this strange code is needed to support respec() in tests
if self.cluster.is_some() {
debug!("Cluster is already set in the endpoint spec, using it");
spec.cluster = self.cluster.clone().unwrap();

debug!("spec.cluster {:?}", spec.cluster);

// fill missing fields again
if create_test_user {
spec.cluster.roles.push(Role {
name: PgIdent::from_str("test").unwrap(),
encrypted_password: None,
options: None,
});
spec.cluster.databases.push(Database {
name: PgIdent::from_str("neondb").unwrap(),
owner: PgIdent::from_str("test").unwrap(),
options: None,
restrict_conn: false,
invalid: false,
});
}
spec.cluster.postgresql_conf = Some(postgresql_conf);
}

let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;

Expand Down
8 changes: 4 additions & 4 deletions libs/compute_api/src/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ pub enum ComputeMode {
Replica,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct Cluster {
pub cluster_id: Option<String>,
pub name: Option<String>,
Expand Down Expand Up @@ -283,7 +283,7 @@ pub struct DeltaOp {

/// Rust representation of Postgres role info with only those fields
/// that matter for us.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
Expand All @@ -292,7 +292,7 @@ pub struct Role {

/// Rust representation of Postgres database info with only those fields
/// that matter for us.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
Expand All @@ -308,7 +308,7 @@ pub struct Database {
/// Common type representing both SQL statement params with or without value,
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
/// options like `wal_level = logical`.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct GenericOption {
pub name: String,
pub value: Option<String>,
Expand Down
173 changes: 172 additions & 1 deletion test_runner/regress/test_subscriber_branching.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import threading
import time

from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
from fixtures.utils import query_scalar, wait_until


Expand Down Expand Up @@ -239,3 +240,173 @@ def insert_data(pub, start):
res = scur_postgres.fetchall()
assert len(res) == 1
assert str(sub_child_2_timeline_id) == res[0][0]


def test_multiple_subscription_branching(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can handle concurrent deletion of subscriptions in a multiple databases
"""
env = neon_simple_env

NUMBER_OF_DBS = 5

# Create and start endpoint so that neon_local put all the generated
# stuff into the spec.json file.
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"max_replication_slots = 10",
"max_logical_replication_workers=10",
"max_worker_processes=10",
],
)

TEST_DB_NAMES = [
{
"name": "neondb",
"owner": "cloud_admin",
},
{
"name": "publisher_db",
"owner": "cloud_admin",
},
]

for i in range(NUMBER_OF_DBS):
TEST_DB_NAMES.append(
{
"name": f"db{i}",
"owner": "cloud_admin",
}
)

# Update the spec.json file to create the databases
# and reconfigure the endpoint to apply the changes.
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"cluster": {
"databases": TEST_DB_NAMES,
},
}
)
endpoint.reconfigure()

connstr = endpoint.connstr(dbname="publisher_db").replace("'", "''")

# create table, replication and subscription for each of the databases
with endpoint.cursor(dbname="publisher_db") as publisher_cursor:
for i in range(NUMBER_OF_DBS):
publisher_cursor.execute(f"CREATE TABLE t{i}(a int)")
publisher_cursor.execute(f"CREATE PUBLICATION mypub{i} FOR TABLE t{i}")
publisher_cursor.execute(
f"select pg_catalog.pg_create_logical_replication_slot('mysub{i}', 'pgoutput');"
)
publisher_cursor.execute(f"INSERT INTO t{i} VALUES ({i})")

with endpoint.cursor(dbname=f"db{i}") as cursor:
cursor.execute(f"CREATE TABLE t{i}(a int)")
cursor.execute(
f"CREATE SUBSCRIPTION mysub{i} CONNECTION '{connstr}' PUBLICATION mypub{i} WITH (create_slot = false) "
)

# wait for the subscription to be active
for i in range(NUMBER_OF_DBS):
logical_replication_sync(
endpoint,
endpoint,
f"mysub{i}",
sub_dbname=f"db{i}",
pub_dbname="publisher_db",
)

# Check that replication is working
for i in range(NUMBER_OF_DBS):
with endpoint.cursor(dbname=f"db{i}") as cursor:
cursor.execute(f"SELECT * FROM t{i}")
rows = cursor.fetchall()
assert len(rows) == 1
assert rows[0][0] == i

last_insert_lsn = query_scalar(cursor, "select pg_current_wal_insert_lsn();")

def start_publisher_workload(table_num: int, duration: int):
start = time.time()
with endpoint.cursor(dbname="publisher_db") as cur:
while time.time() - start < duration:
cur.execute(f"INSERT INTO t{i} SELECT FROM generate_series(1,1000)")

LOAD_DURATION = 5
threads = [
threading.Thread(target=start_publisher_workload, args=(i, LOAD_DURATION))
for i in range(NUMBER_OF_DBS)
]

for thread in threads:
thread.start()

sub_child_1_timeline_id = env.create_branch(
"subscriber_child_1",
ancestor_branch_name="main",
ancestor_start_lsn=last_insert_lsn,
)

sub_child_1 = env.endpoints.create("subscriber_child_1")

sub_child_1.respec(
skip_pg_catalog_updates=False,
reconfigure_concurrency=5,
drop_subscriptions_before_start=True,
cluster={
"databases": TEST_DB_NAMES,
"roles": [],
},
)

sub_child_1.start()

# ensure that subscription deletion happened on this timeline
with sub_child_1.cursor() as scur_postgres:
scur_postgres.execute("SELECT timeline_id from neon.drop_subscriptions_done")
res = scur_postgres.fetchall()
log.info(f"res = {res}")
assert len(res) == 1
assert str(sub_child_1_timeline_id) == res[0][0]

# ensure that there are no subscriptions in the databases
for i in range(NUMBER_OF_DBS):
with sub_child_1.cursor(dbname=f"db{i}") as cursor:
cursor.execute("SELECT * FROM pg_catalog.pg_subscription")
res = cursor.fetchall()
assert len(res) == 0

# ensure that there are no unexpected rows in the tables
cursor.execute(f"SELECT * FROM t{i}")
rows = cursor.fetchall()
assert len(rows) == 1
assert rows[0][0] == i

for thread in threads:
thread.join()

# ensure that logical replication is still working in main endpoint
# wait for it to catch up
for i in range(NUMBER_OF_DBS):
logical_replication_sync(
endpoint,
endpoint,
f"mysub{i}",
sub_dbname=f"db{i}",
pub_dbname="publisher_db",
)

# verify that the data is the same in publisher and subscriber tables
with endpoint.cursor(dbname="publisher_db") as publisher_cursor:
for i in range(NUMBER_OF_DBS):
with endpoint.cursor(dbname=f"db{i}") as cursor:
publisher_cursor.execute(f"SELECT count(*) FROM t{i}")
cursor.execute(f"SELECT count(*) FROM t{i}")
pub_res = publisher_cursor.fetchone()
sub_res = cursor.fetchone()
log.info(f"for table t{i}: pub_res = {pub_res}, sub_res = {sub_res}")
assert pub_res == sub_res

1 comment on commit 7c7180a

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7031 tests run: 6657 passed, 1 failed, 373 skipped (full report)


Failures on Postgres 16

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_branch_creation_many[release-pg16-github-actions-selfhosted-random-1024]"
Flaky tests (3)

Postgres 17

Postgres 15

Code coverage* (full report)

  • functions: 32.9% (8618 of 26199 functions)
  • lines: 48.8% (72741 of 148933 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
7c7180a at 2025-02-20T19:50:24.640Z :recycle:

Please sign in to comment.