diff --git a/Cargo.lock b/Cargo.lock index 70a8af4e7d5e..2bd828367c83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1321,6 +1321,7 @@ dependencies = [ "clap", "comfy-table", "compute_api", + "futures", "humantime", "humantime-serde", "hyper 0.14.30", diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 9ed2fc5143ad..a6b7633edade 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -10,6 +10,7 @@ use std::sync::atomic::AtomicU32; use std::sync::atomic::Ordering; use std::sync::{Condvar, Mutex, RwLock}; use std::thread; +use std::time::Duration; use std::time::Instant; use anyhow::{Context, Result}; @@ -710,7 +711,7 @@ impl ComputeNode { info!("running initdb"); let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb"); Command::new(initdb_bin) - .args(["-D", pgdata]) + .args(["--pgdata", pgdata]) .output() .expect("cannot start initdb process"); @@ -1398,6 +1399,36 @@ LIMIT 100", } Ok(remote_ext_metrics) } + + /// Waits until current thread receives a state changed notification and + /// the pageserver connection strings has changed. + /// + /// The operation will time out after a specified duration. + pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) { + let state = self.state.lock().unwrap(); + let old_pageserver_connstr = state + .pspec + .as_ref() + .expect("spec must be set") + .pageserver_connstr + .clone(); + let mut unchanged = true; + let _ = self + .state_changed + .wait_timeout_while(state, duration, |s| { + let pageserver_connstr = &s + .pspec + .as_ref() + .expect("spec must be set") + .pageserver_connstr; + unchanged = pageserver_connstr == &old_pageserver_connstr; + unchanged + }) + .unwrap(); + if !unchanged { + info!("Pageserver config changed"); + } + } } pub fn forward_termination_signal() { diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs index 7e5917c55fe3..3061d387a505 100644 --- a/compute_tools/src/lsn_lease.rs +++ b/compute_tools/src/lsn_lease.rs @@ -57,10 +57,10 @@ fn lsn_lease_bg_task( .max(valid_duration / 2); info!( - "Succeeded, sleeping for {} seconds", + "Request succeeded, sleeping for {} seconds", sleep_duration.as_secs() ); - thread::sleep(sleep_duration); + compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration); } } @@ -89,10 +89,7 @@ fn acquire_lsn_lease_with_retry( .map(|connstr| { let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr"); if let Some(storage_auth_token) = &spec.storage_auth_token { - info!("Got storage auth token from spec file"); config.password(storage_auth_token.clone()); - } else { - info!("Storage auth token not set"); } config }) @@ -108,9 +105,11 @@ fn acquire_lsn_lease_with_retry( bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff"); } Err(e) => { - warn!("Failed to acquire lsn lease: {e} (attempt {attempts}"); + warn!("Failed to acquire lsn lease: {e} (attempt {attempts})"); - thread::sleep(Duration::from_millis(retry_period_ms as u64)); + compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis( + retry_period_ms as u64, + )); retry_period_ms *= 1.5; retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS); } diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index df87c181bf47..355eca0fe5a7 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -9,6 +9,7 @@ anyhow.workspace = true camino.workspace = true clap.workspace = true comfy-table.workspace = true +futures.workspace = true humantime.workspace = true nix.workspace = true once_cell.workspace = true diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 92f609761abb..b6532ff2ac09 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -894,17 +894,27 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re // to pass these on to postgres. let storage_controller = StorageController::from_env(env); let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?; - let pageservers = locate_result - .shards - .into_iter() - .map(|shard| { - ( + let pageservers = futures::future::try_join_all( + locate_result.shards.into_iter().map(|shard| async move { + if let ComputeMode::Static(lsn) = endpoint.mode { + // Initialize LSN leases for static computes. + let conf = env.get_pageserver_conf(shard.node_id).unwrap(); + let pageserver = PageServerNode::from_env(env, conf); + + pageserver + .http_client + .timeline_init_lsn_lease(shard.shard_id, endpoint.timeline_id, lsn) + .await?; + } + + anyhow::Ok(( Host::parse(&shard.listen_pg_addr) .expect("Storage controller reported bad hostname"), shard.listen_pg_port, - ) - }) - .collect::>(); + )) + }), + ) + .await?; let stripe_size = locate_result.shard_params.stripe_size; (pageservers, stripe_size) diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 0c0e67dff057..36e5e04c86dd 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -347,7 +347,7 @@ impl StorageController { if !tokio::fs::try_exists(&pg_data_path).await? { let initdb_args = [ - "-D", + "--pgdata", pg_data_path.as_ref(), "--username", &username(), diff --git a/libs/postgres_ffi/wal_craft/src/lib.rs b/libs/postgres_ffi/wal_craft/src/lib.rs index 5c0abda52274..9524a5149bbc 100644 --- a/libs/postgres_ffi/wal_craft/src/lib.rs +++ b/libs/postgres_ffi/wal_craft/src/lib.rs @@ -93,9 +93,9 @@ impl Conf { ); let output = self .new_pg_command("initdb")? - .arg("-D") + .arg("--pgdata") .arg(&self.datadir) - .args(["-U", "postgres", "--no-instructions", "--no-sync"]) + .args(["--username", "postgres", "--no-instructions", "--no-sync"]) .output()?; debug!("initdb output: {:?}", output); ensure!( diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 2d95ac42e607..592f1ded0d0b 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -736,4 +736,22 @@ impl Client { .await .map_err(Error::ReceiveBody) } + + pub async fn timeline_init_lsn_lease( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + lsn: Lsn, + ) -> Result { + let uri = format!( + "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/lsn_lease", + self.mgmt_api_endpoint, + ); + + self.request(Method::POST, &uri, LsnLeaseRequest { lsn }) + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index ba38120bf1e6..6f0402e7b0c8 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -824,7 +824,7 @@ async fn get_lsn_by_timestamp_handler( let lease = if with_lease { timeline - .make_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), &ctx) + .init_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), &ctx) .inspect_err(|_| { warn!("fail to grant a lease to {}", lsn); }) @@ -1692,9 +1692,18 @@ async fn lsn_lease_handler( let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; - let result = timeline - .make_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx) - .map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?; + + let result = async { + timeline + .init_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx) + .map_err(|e| { + ApiError::InternalServerError( + e.context(format!("invalid lsn lease request at {lsn}")), + ) + }) + } + .instrument(info_span!("init_lsn_lease", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) + .await?; json_response(StatusCode::OK, result) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 98d718dde481..8fa6b9a7f0d4 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -833,7 +833,7 @@ impl PageServerHandler { set_tracing_field_shard_id(&timeline); let lease = timeline - .make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx) + .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx) .inspect_err(|e| { warn!("{e}"); }) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2aebf4f99932..db88303f7bdb 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -21,6 +21,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use pageserver_api::models; use pageserver_api::models::AuxFilePolicy; +use pageserver_api::models::LsnLease; use pageserver_api::models::TimelineArchivalState; use pageserver_api::models::TimelineState; use pageserver_api::models::TopTenantShardItem; @@ -182,27 +183,54 @@ pub struct TenantSharedResources { pub(super) struct AttachedTenantConf { tenant_conf: TenantConfOpt, location: AttachedLocationConfig, + /// The deadline before which we are blocked from GC so that + /// leases have a chance to be renewed. + lsn_lease_deadline: Option, } impl AttachedTenantConf { fn new(tenant_conf: TenantConfOpt, location: AttachedLocationConfig) -> Self { + // Sets a deadline before which we cannot proceed to GC due to lsn lease. + // + // We do this as the leases mapping are not persisted to disk. By delaying GC by lease + // length, we guarantee that all the leases we granted before will have a chance to renew + // when we run GC for the first time after restart / transition from AttachedMulti to AttachedSingle. + let lsn_lease_deadline = if location.attach_mode == AttachmentMode::Single { + Some( + tokio::time::Instant::now() + + tenant_conf + .lsn_lease_length + .unwrap_or(LsnLease::DEFAULT_LENGTH), + ) + } else { + // We don't use `lsn_lease_deadline` to delay GC in AttachedMulti and AttachedStale + // because we don't do GC in these modes. + None + }; + Self { tenant_conf, location, + lsn_lease_deadline, } } fn try_from(location_conf: LocationConf) -> anyhow::Result { match &location_conf.mode { - LocationMode::Attached(attach_conf) => Ok(Self { - tenant_conf: location_conf.tenant_conf, - location: *attach_conf, - }), + LocationMode::Attached(attach_conf) => { + Ok(Self::new(location_conf.tenant_conf, *attach_conf)) + } LocationMode::Secondary(_) => { anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode") } } } + + fn is_gc_blocked_by_lsn_lease_deadline(&self) -> bool { + self.lsn_lease_deadline + .map(|d| tokio::time::Instant::now() < d) + .unwrap_or(false) + } } struct TimelinePreload { timeline_id: TimelineId, @@ -1822,6 +1850,11 @@ impl Tenant { info!("Skipping GC in location state {:?}", conf.location); return Ok(GcResult::default()); } + + if conf.is_gc_blocked_by_lsn_lease_deadline() { + info!("Skipping GC because lsn lease deadline is not reached"); + return Ok(GcResult::default()); + } } let _guard = match self.gc_block.start().await { @@ -2630,6 +2663,8 @@ impl Tenant { Arc::new(AttachedTenantConf { tenant_conf: new_tenant_conf.clone(), location: inner.location, + // Attached location is not changed, no need to update lsn lease deadline. + lsn_lease_deadline: inner.lsn_lease_deadline, }) }); @@ -3887,9 +3922,9 @@ async fn run_initdb( let _permit = INIT_DB_SEMAPHORE.acquire().await; let initdb_command = tokio::process::Command::new(&initdb_bin_path) - .args(["-D", initdb_target_dir.as_ref()]) - .args(["-U", &conf.superuser]) - .args(["-E", "utf8"]) + .args(["--pgdata", initdb_target_dir.as_ref()]) + .args(["--username", &conf.superuser]) + .args(["--encoding", "utf8"]) .arg("--no-instructions") .arg("--no-sync") .env_clear() @@ -4461,13 +4496,17 @@ mod tests { tline.freeze_and_flush().await.map_err(|e| e.into()) } - #[tokio::test] + #[tokio::test(start_paused = true)] async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data") .await? .load() .await; + // Advance to the lsn lease deadline so that GC is not blocked by + // initial transition into AttachedSingle. + tokio::time::advance(tenant.get_lsn_lease_length()).await; + tokio::time::resume(); let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .await?; @@ -7244,9 +7283,17 @@ mod tests { Ok(()) } - #[tokio::test] + #[tokio::test(start_paused = true)] async fn test_lsn_lease() -> anyhow::Result<()> { - let (tenant, ctx) = TenantHarness::create("test_lsn_lease").await?.load().await; + let (tenant, ctx) = TenantHarness::create("test_lsn_lease") + .await + .unwrap() + .load() + .await; + // Advance to the lsn lease deadline so that GC is not blocked by + // initial transition into AttachedSingle. + tokio::time::advance(tenant.get_lsn_lease_length()).await; + tokio::time::resume(); let key = Key::from_hex("010000000033333333444444445500000000").unwrap(); let end_lsn = Lsn(0x100); @@ -7274,24 +7321,33 @@ mod tests { let leased_lsns = [0x30, 0x50, 0x70]; let mut leases = Vec::new(); - let _: anyhow::Result<_> = leased_lsns.iter().try_for_each(|n| { - leases.push(timeline.make_lsn_lease(Lsn(*n), timeline.get_lsn_lease_length(), &ctx)?); - Ok(()) + leased_lsns.iter().for_each(|n| { + leases.push( + timeline + .init_lsn_lease(Lsn(*n), timeline.get_lsn_lease_length(), &ctx) + .expect("lease request should succeed"), + ); }); - // Renewing with shorter lease should not change the lease. - let updated_lease_0 = - timeline.make_lsn_lease(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx)?; - assert_eq!(updated_lease_0.valid_until, leases[0].valid_until); - - // Renewing with a long lease should renew lease with later expiration time. - let updated_lease_1 = timeline.make_lsn_lease( - Lsn(leased_lsns[1]), - timeline.get_lsn_lease_length() * 2, - &ctx, - )?; + let updated_lease_0 = timeline + .renew_lsn_lease(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx) + .expect("lease renewal should succeed"); + assert_eq!( + updated_lease_0.valid_until, leases[0].valid_until, + " Renewing with shorter lease should not change the lease." + ); - assert!(updated_lease_1.valid_until > leases[1].valid_until); + let updated_lease_1 = timeline + .renew_lsn_lease( + Lsn(leased_lsns[1]), + timeline.get_lsn_lease_length() * 2, + &ctx, + ) + .expect("lease renewal should succeed"); + assert!( + updated_lease_1.valid_until > leases[1].valid_until, + "Renewing with a long lease should renew lease with later expiration time." + ); // Force set disk consistent lsn so we can get the cutoff at `end_lsn`. info!( @@ -7308,7 +7364,8 @@ mod tests { &CancellationToken::new(), &ctx, ) - .await?; + .await + .unwrap(); // Keeping everything <= Lsn(0x80) b/c leases: // 0/10: initdb layer @@ -7322,13 +7379,16 @@ mod tests { // Make lease on a already GC-ed LSN. // 0/80 does not have a valid lease + is below latest_gc_cutoff assert!(Lsn(0x80) < *timeline.get_latest_gc_cutoff_lsn()); - let res = timeline.make_lsn_lease(Lsn(0x80), timeline.get_lsn_lease_length(), &ctx); - assert!(res.is_err()); + timeline + .init_lsn_lease(Lsn(0x80), timeline.get_lsn_lease_length(), &ctx) + .expect_err("lease request on GC-ed LSN should fail"); // Should still be able to renew a currently valid lease // Assumption: original lease to is still valid for 0/50. - let _ = - timeline.make_lsn_lease(Lsn(leased_lsns[1]), timeline.get_lsn_lease_length(), &ctx)?; + // (use `Timeline::init_lsn_lease` for testing so it always does validation) + timeline + .init_lsn_lease(Lsn(leased_lsns[1]), timeline.get_lsn_lease_length(), &ctx) + .expect("lease renewal with validation should succeed"); Ok(()) } diff --git a/pageserver/src/tenant/gc_block.rs b/pageserver/src/tenant/gc_block.rs index 1271d25b7659..f7a7836a129c 100644 --- a/pageserver/src/tenant/gc_block.rs +++ b/pageserver/src/tenant/gc_block.rs @@ -1,29 +1,12 @@ -use std::{collections::HashMap, time::Duration}; +use std::collections::HashMap; -use super::remote_timeline_client::index::GcBlockingReason; -use tokio::time::Instant; use utils::id::TimelineId; -type TimelinesBlocked = HashMap>; +use super::remote_timeline_client::index::GcBlockingReason; -#[derive(Default)] -struct Storage { - timelines_blocked: TimelinesBlocked, - /// The deadline before which we are blocked from GC so that - /// leases have a chance to be renewed. - lsn_lease_deadline: Option, -} +type Storage = HashMap>; -impl Storage { - fn is_blocked_by_lsn_lease_deadline(&self) -> bool { - self.lsn_lease_deadline - .map(|d| Instant::now() < d) - .unwrap_or(false) - } -} - -/// GcBlock provides persistent (per-timeline) gc blocking and facilitates transient time based gc -/// blocking. +/// GcBlock provides persistent (per-timeline) gc blocking. #[derive(Default)] pub(crate) struct GcBlock { /// The timelines which have current reasons to block gc. @@ -66,17 +49,6 @@ impl GcBlock { } } - /// Sets a deadline before which we cannot proceed to GC due to lsn lease. - /// - /// We do this as the leases mapping are not persisted to disk. By delaying GC by lease - /// length, we guarantee that all the leases we granted before will have a chance to renew - /// when we run GC for the first time after restart / transition from AttachedMulti to AttachedSingle. - pub(super) fn set_lsn_lease_deadline(&self, lsn_lease_length: Duration) { - let deadline = Instant::now() + lsn_lease_length; - let mut g = self.reasons.lock().unwrap(); - g.lsn_lease_deadline = Some(deadline); - } - /// Describe the current gc blocking reasons. /// /// TODO: make this json serializable. @@ -102,7 +74,7 @@ impl GcBlock { ) -> anyhow::Result { let (added, uploaded) = { let mut g = self.reasons.lock().unwrap(); - let set = g.timelines_blocked.entry(timeline.timeline_id).or_default(); + let set = g.entry(timeline.timeline_id).or_default(); let added = set.insert(reason); // LOCK ORDER: intentionally hold the lock, see self.reasons. @@ -133,7 +105,7 @@ impl GcBlock { let (remaining_blocks, uploaded) = { let mut g = self.reasons.lock().unwrap(); - match g.timelines_blocked.entry(timeline.timeline_id) { + match g.entry(timeline.timeline_id) { Entry::Occupied(mut oe) => { let set = oe.get_mut(); set.remove(reason); @@ -147,7 +119,7 @@ impl GcBlock { } } - let remaining_blocks = g.timelines_blocked.len(); + let remaining_blocks = g.len(); // LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons let uploaded = timeline @@ -172,11 +144,11 @@ impl GcBlock { pub(crate) fn before_delete(&self, timeline: &super::Timeline) { let unblocked = { let mut g = self.reasons.lock().unwrap(); - if g.timelines_blocked.is_empty() { + if g.is_empty() { return; } - g.timelines_blocked.remove(&timeline.timeline_id); + g.remove(&timeline.timeline_id); BlockingReasons::clean_and_summarize(g).is_none() }; @@ -187,11 +159,10 @@ impl GcBlock { } /// Initialize with the non-deleted timelines of this tenant. - pub(crate) fn set_scanned(&self, scanned: TimelinesBlocked) { + pub(crate) fn set_scanned(&self, scanned: Storage) { let mut g = self.reasons.lock().unwrap(); - assert!(g.timelines_blocked.is_empty()); - g.timelines_blocked - .extend(scanned.into_iter().filter(|(_, v)| !v.is_empty())); + assert!(g.is_empty()); + g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty())); if let Some(reasons) = BlockingReasons::clean_and_summarize(g) { tracing::info!(summary=?reasons, "initialized with gc blocked"); @@ -205,7 +176,6 @@ pub(super) struct Guard<'a> { #[derive(Debug)] pub(crate) struct BlockingReasons { - tenant_blocked_by_lsn_lease_deadline: bool, timelines: usize, reasons: enumset::EnumSet, } @@ -214,8 +184,8 @@ impl std::fmt::Display for BlockingReasons { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "tenant_blocked_by_lsn_lease_deadline: {}, {} timelines block for {:?}", - self.tenant_blocked_by_lsn_lease_deadline, self.timelines, self.reasons + "{} timelines block for {:?}", + self.timelines, self.reasons ) } } @@ -223,15 +193,13 @@ impl std::fmt::Display for BlockingReasons { impl BlockingReasons { fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option { let mut reasons = enumset::EnumSet::empty(); - g.timelines_blocked.retain(|_key, value| { + g.retain(|_key, value| { reasons = reasons.union(*value); !value.is_empty() }); - let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline(); - if !g.timelines_blocked.is_empty() || blocked_by_lsn_lease_deadline { + if !g.is_empty() { Some(BlockingReasons { - tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline, - timelines: g.timelines_blocked.len(), + timelines: g.len(), reasons, }) } else { @@ -240,17 +208,14 @@ impl BlockingReasons { } fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option { - let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline(); - if g.timelines_blocked.is_empty() && !blocked_by_lsn_lease_deadline { + if g.is_empty() { None } else { let reasons = g - .timelines_blocked .values() .fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next)); Some(BlockingReasons { - tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline, - timelines: g.timelines_blocked.len(), + timelines: g.len(), reasons, }) } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index e159ae186da4..f6249056d830 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -949,12 +949,6 @@ impl TenantManager { (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => { match attach_conf.generation.cmp(&tenant.generation) { Ordering::Equal => { - if attach_conf.attach_mode == AttachmentMode::Single { - tenant - .gc_block - .set_lsn_lease_deadline(tenant.get_lsn_lease_length()); - } - // A transition from Attached to Attached in the same generation, we may // take our fast path and just provide the updated configuration // to the tenant. diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 3f0f8a21c8a5..547739e7734c 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -330,7 +330,6 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download); let mut first = true; - tenant.gc_block.set_lsn_lease_deadline(tenant.get_lsn_lease_length()); loop { tokio::select! { _ = cancel.cancelled() => { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 157c6ab91ec5..2113a1d72600 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -66,6 +66,7 @@ use std::{ use crate::{ aux_file::AuxFileSizeEstimator, tenant::{ + config::AttachmentMode, layer_map::{LayerMap, SearchResult}, metadata::TimelineMetadata, storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc}, @@ -1324,16 +1325,38 @@ impl Timeline { Ok(()) } + /// Initializes an LSN lease. The function will return an error if the requested LSN is less than the `latest_gc_cutoff_lsn`. + pub(crate) fn init_lsn_lease( + &self, + lsn: Lsn, + length: Duration, + ctx: &RequestContext, + ) -> anyhow::Result { + self.make_lsn_lease(lsn, length, true, ctx) + } + + /// Renews a lease at a particular LSN. The requested LSN is not validated against the `latest_gc_cutoff_lsn` when we are in the grace period. + pub(crate) fn renew_lsn_lease( + &self, + lsn: Lsn, + length: Duration, + ctx: &RequestContext, + ) -> anyhow::Result { + self.make_lsn_lease(lsn, length, false, ctx) + } + /// Obtains a temporary lease blocking garbage collection for the given LSN. /// - /// This function will error if the requesting LSN is less than the `latest_gc_cutoff_lsn` and there is also - /// no existing lease to renew. If there is an existing lease in the map, the lease will be renewed only if - /// the request extends the lease. The returned lease is therefore the maximum between the existing lease and - /// the requesting lease. - pub(crate) fn make_lsn_lease( + /// If we are in `AttachedSingle` mode and is not blocked by the lsn lease deadline, this function will error + /// if the requesting LSN is less than the `latest_gc_cutoff_lsn` and there is no existing request present. + /// + /// If there is an existing lease in the map, the lease will be renewed only if the request extends the lease. + /// The returned lease is therefore the maximum between the existing lease and the requesting lease. + fn make_lsn_lease( &self, lsn: Lsn, length: Duration, + init: bool, _ctx: &RequestContext, ) -> anyhow::Result { let lease = { @@ -1347,8 +1370,8 @@ impl Timeline { let entry = gc_info.leases.entry(lsn); - let lease = { - if let Entry::Occupied(mut occupied) = entry { + match entry { + Entry::Occupied(mut occupied) => { let existing_lease = occupied.get_mut(); if valid_until > existing_lease.valid_until { existing_lease.valid_until = valid_until; @@ -1360,20 +1383,28 @@ impl Timeline { } existing_lease.clone() - } else { - // Reject already GC-ed LSN (lsn < latest_gc_cutoff) - let latest_gc_cutoff_lsn = self.get_latest_gc_cutoff_lsn(); - if lsn < *latest_gc_cutoff_lsn { - bail!("tried to request a page version that was garbage collected. requested at {} gc cutoff {}", lsn, *latest_gc_cutoff_lsn); + } + Entry::Vacant(vacant) => { + // Reject already GC-ed LSN (lsn < latest_gc_cutoff) if we are in AttachedSingle and + // not blocked by the lsn lease deadline. + let validate = { + let conf = self.tenant_conf.load(); + conf.location.attach_mode == AttachmentMode::Single + && !conf.is_gc_blocked_by_lsn_lease_deadline() + }; + + if init || validate { + let latest_gc_cutoff_lsn = self.get_latest_gc_cutoff_lsn(); + if lsn < *latest_gc_cutoff_lsn { + bail!("tried to request a page version that was garbage collected. requested at {} gc cutoff {}", lsn, *latest_gc_cutoff_lsn); + } } let dt: DateTime = valid_until.into(); info!("lease created, valid until {}", dt); - entry.or_insert(LsnLease { valid_until }).clone() + vacant.insert(LsnLease { valid_until }).clone() } - }; - - lease + } }; Ok(lease) @@ -1950,8 +1981,6 @@ impl Timeline { .unwrap_or(self.conf.default_tenant_conf.lsn_lease_length) } - // TODO(yuchen): remove unused flag after implementing https://github.com/neondatabase/neon/issues/8072 - #[allow(unused)] pub(crate) fn get_lsn_lease_length_for_ts(&self) -> Duration { let tenant_conf = self.tenant_conf.load(); tenant_conf diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 70fe632f4947..70a038c9609e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3311,7 +3311,7 @@ def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init: bool = True) self.pg_bin = pg_bin self.running = False if init: - self.pg_bin.run_capture(["initdb", "-D", str(pgdatadir)]) + self.pg_bin.run_capture(["initdb", "--pgdata", str(pgdatadir)]) self.configure([f"port = {port}\n"]) def enable_tls(self): diff --git a/test_runner/regress/test_readonly_node.py b/test_runner/regress/test_readonly_node.py index 5e8b8d38f7e2..b08fcc0da164 100644 --- a/test_runner/regress/test_readonly_node.py +++ b/test_runner/regress/test_readonly_node.py @@ -27,7 +27,7 @@ def test_readonly_node(neon_simple_env: NeonEnv): env.pageserver.allowed_errors.extend( [ ".*basebackup .* failed: invalid basebackup lsn.*", - ".*page_service.*handle_make_lsn_lease.*.*tried to request a page version that was garbage collected", + ".*/lsn_lease.*invalid lsn lease request.*", ] ) @@ -108,7 +108,7 @@ def test_readonly_node(neon_simple_env: NeonEnv): assert cur.fetchone() == (1,) # Create node at pre-initdb lsn - with pytest.raises(Exception, match="invalid basebackup lsn"): + with pytest.raises(Exception, match="invalid lsn lease request"): # compute node startup with invalid LSN should fail env.endpoints.create_start( branch_name="main", @@ -167,6 +167,23 @@ def generate_updates_on_main( ) return last_flush_lsn + def trigger_gc_and_select(env: NeonEnv, ep_static: Endpoint): + """ + Trigger GC manually on all pageservers. Then run an `SELECT` query. + """ + for shard, ps in tenant_get_shards(env, env.initial_tenant): + client = ps.http_client() + gc_result = client.timeline_gc(shard, env.initial_timeline, 0) + log.info(f"{gc_result=}") + + assert ( + gc_result["layers_removed"] == 0 + ), "No layers should be removed, old layers are guarded by leases." + + with ep_static.cursor() as cur: + cur.execute("SELECT count(*) FROM t0") + assert cur.fetchone() == (ROW_COUNT,) + # Insert some records on main branch with env.endpoints.create_start("main") as ep_main: with ep_main.cursor() as cur: @@ -193,25 +210,31 @@ def generate_updates_on_main( generate_updates_on_main(env, ep_main, i, end=100) - # Trigger GC - for shard, ps in tenant_get_shards(env, env.initial_tenant): - client = ps.http_client() - gc_result = client.timeline_gc(shard, env.initial_timeline, 0) - log.info(f"{gc_result=}") + trigger_gc_and_select(env, ep_static) - assert ( - gc_result["layers_removed"] == 0 - ), "No layers should be removed, old layers are guarded by leases." + # Trigger Pageserver restarts + for ps in env.pageservers: + ps.stop() + # Static compute should have at least one lease request failure due to connection. + time.sleep(LSN_LEASE_LENGTH / 2) + ps.start() - with ep_static.cursor() as cur: - cur.execute("SELECT count(*) FROM t0") - assert cur.fetchone() == (ROW_COUNT,) + trigger_gc_and_select(env, ep_static) + + # Reconfigure pageservers + env.pageservers[0].stop() + env.storage_controller.node_configure( + env.pageservers[0].id, {"availability": "Offline"} + ) + env.storage_controller.reconcile_until_idle() + + trigger_gc_and_select(env, ep_static) # Do some update so we can increment latest_gc_cutoff generate_updates_on_main(env, ep_main, i, end=100) # Wait for the existing lease to expire. - time.sleep(LSN_LEASE_LENGTH) + time.sleep(LSN_LEASE_LENGTH + 1) # Now trigger GC again, layers should be removed. for shard, ps in tenant_get_shards(env, env.initial_tenant): client = ps.http_client() diff --git a/test_runner/regress/test_timeline_gc_blocking.py b/test_runner/regress/test_timeline_gc_blocking.py index 765c72cf2a2f..ddfe9b911fd8 100644 --- a/test_runner/regress/test_timeline_gc_blocking.py +++ b/test_runner/regress/test_timeline_gc_blocking.py @@ -45,10 +45,7 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool tenant_after = http.tenant_status(env.initial_tenant) assert tenant_before != tenant_after gc_blocking = tenant_after["gc_blocking"] - assert ( - gc_blocking - == "BlockingReasons { tenant_blocked_by_lsn_lease_deadline: false, timelines: 1, reasons: EnumSet(Manual) }" - ) + assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }" wait_for_another_gc_round() pss.assert_log_contains(gc_skipped_line) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index adfe292c242a..c75235a04be6 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -772,7 +772,7 @@ def sync_safekeepers(self) -> Lsn: def initdb(self): """Run initdb""" - args = ["initdb", "-U", "cloud_admin", "-D", self.pg_data_dir_path()] + args = ["initdb", "--username", "cloud_admin", "--pgdata", self.pg_data_dir_path()] self.pg_bin.run(args) def start(self):