diff --git a/Cargo.lock b/Cargo.lock index 64231ed11cf5..f6e3f9ddb1d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4009,7 +4009,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" dependencies = [ "bytes", "fallible-iterator", @@ -4022,7 +4022,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" dependencies = [ "base64 0.20.0", "byteorder", @@ -4041,7 +4041,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" dependencies = [ "bytes", "fallible-iterator", @@ -6227,7 +6227,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" dependencies = [ "async-trait", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index 8207726caaf2..706d742f1bae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -203,21 +203,10 @@ env_logger = "0.10" log = "0.4" ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed - -# We want to use the 'neon' branch for these, but there's currently one -# incompatible change on the branch. See: -# -# - PR #8076 which contained changes that depended on the new changes in -# the rust-postgres crate, and -# - PR #8654 which reverted those changes and made the code in proxy incompatible -# with the tip of the 'neon' branch again. -# -# When those proxy changes are re-applied (see PR #8747), we can switch using -# the tip of the 'neon' branch again. -postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } +postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } ## Local libraries compute_api = { version = "0.1", path = "./libs/compute_api/" } @@ -255,7 +244,7 @@ tonic-build = "0.12" [patch.crates-io] # Needed to get `tokio-postgres-rustls` to depend on our fork. -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } ################# Binary contents sections diff --git a/libs/pageserver_api/src/record.rs b/libs/pageserver_api/src/record.rs index 5c3f3deb826e..bb62b35d36cd 100644 --- a/libs/pageserver_api/src/record.rs +++ b/libs/pageserver_api/src/record.rs @@ -41,6 +41,11 @@ pub enum NeonWalRecord { file_path: String, content: Option, }, + // Truncate visibility map page + TruncateVisibilityMap { + trunc_byte: usize, + trunc_offs: usize, + }, /// A testing record for unit testing purposes. It supports append data to an existing image, or clear it. #[cfg(feature = "testing")] diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 497d011d7a20..e343473d7732 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -243,8 +243,11 @@ const FSM_LEAF_NODES_PER_PAGE: usize = FSM_NODES_PER_PAGE - FSM_NON_LEAF_NODES_P pub const SLOTS_PER_FSM_PAGE: u32 = FSM_LEAF_NODES_PER_PAGE as u32; /* From visibilitymap.c */ -pub const VM_HEAPBLOCKS_PER_PAGE: u32 = - (BLCKSZ as usize - SIZEOF_PAGE_HEADER_DATA) as u32 * (8 / 2); // MAPSIZE * (BITS_PER_BYTE / BITS_PER_HEAPBLOCK) + +pub const VM_MAPSIZE: usize = BLCKSZ as usize - MAXALIGN_SIZE_OF_PAGE_HEADER_DATA; +pub const VM_BITS_PER_HEAPBLOCK: usize = 2; +pub const VM_HEAPBLOCKS_PER_BYTE: usize = 8 / VM_BITS_PER_HEAPBLOCK; +pub const VM_HEAPBLOCKS_PER_PAGE: usize = VM_MAPSIZE * VM_HEAPBLOCKS_PER_BYTE; /* From origin.c */ pub const REPLICATION_STATE_MAGIC: u32 = 0x1257DADE; diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index dde9c5dd0b9a..ab170679ba57 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -324,6 +324,7 @@ impl From for ApiError { .into_boxed_str(), ), a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()), + Cancelled => ApiError::ResourceUnavailable("shutting down".into()), Other(e) => ApiError::InternalServerError(e), } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e1763f418619..6af7222f3ff7 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -759,6 +759,9 @@ pub enum DeleteTimelineError { #[error("Timeline deletion is already in progress")] AlreadyInProgress(Arc>), + #[error("Cancelled")] + Cancelled, + #[error(transparent)] Other(#[from] anyhow::Error), } @@ -769,6 +772,7 @@ impl Debug for DeleteTimelineError { Self::NotFound => write!(f, "NotFound"), Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(), Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(), + Self::Cancelled => f.debug_tuple("Cancelled").finish(), Self::Other(e) => f.debug_tuple("Other").field(e).finish(), } } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 600583f6b50f..94f42c782782 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -243,7 +243,7 @@ use self::index::IndexPart; use super::metadata::MetadataUpdate; use super::storage_layer::{Layer, LayerName, ResidentLayer}; use super::upload_queue::{NotInitialized, SetDeletedFlagProgress}; -use super::Generation; +use super::{DeleteTimelineError, Generation}; pub(crate) use download::{ download_index_part, download_tenant_manifest, is_temp_download_file, @@ -1550,15 +1550,17 @@ impl RemoteTimelineClient { /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set. /// The function deletes layer files one by one, then lists the prefix to see if we leaked something /// deletes leaked files if any and proceeds with deletion of index file at the end. - pub(crate) async fn delete_all(self: &Arc) -> anyhow::Result<()> { + pub(crate) async fn delete_all(self: &Arc) -> Result<(), DeleteTimelineError> { debug_assert_current_span_has_tenant_and_timeline_id(); let layers: Vec = { let mut locked = self.upload_queue.lock().unwrap(); - let stopped = locked.stopped_mut()?; + let stopped = locked.stopped_mut().map_err(DeleteTimelineError::Other)?; if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) { - anyhow::bail!("deleted_at is not set") + return Err(DeleteTimelineError::Other(anyhow::anyhow!( + "deleted_at is not set" + ))); } debug_assert!(stopped.upload_queue_for_deletion.no_pending_work()); @@ -1593,7 +1595,10 @@ impl RemoteTimelineClient { }; let layer_deletion_count = layers.len(); - self.deletion_queue_client.push_immediate(layers).await?; + self.deletion_queue_client + .push_immediate(layers) + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; // Delete the initdb.tar.zst, which is not always present, but deletion attempts of // inexistant objects are not considered errors. @@ -1601,7 +1606,8 @@ impl RemoteTimelineClient { remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id); self.deletion_queue_client .push_immediate(vec![initdb_path]) - .await?; + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; // Do not delete index part yet, it is needed for possible retry. If we remove it first // and retry will arrive to different pageserver there wont be any traces of it on remote storage @@ -1609,7 +1615,9 @@ impl RemoteTimelineClient { // Execute all pending deletions, so that when we proceed to do a listing below, we aren't // taking the burden of listing all the layers that we already know we should delete. - self.flush_deletion_queue().await?; + self.flush_deletion_queue() + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; let cancel = shutdown_token(); @@ -1672,28 +1680,32 @@ impl RemoteTimelineClient { if !remaining_layers.is_empty() { self.deletion_queue_client .push_immediate(remaining_layers) - .await?; + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; } fail::fail_point!("timeline-delete-before-index-delete", |_| { - Err(anyhow::anyhow!( + Err(DeleteTimelineError::Other(anyhow::anyhow!( "failpoint: timeline-delete-before-index-delete" - ))? + )))? }); debug!("enqueuing index part deletion"); self.deletion_queue_client .push_immediate([latest_index].to_vec()) - .await?; + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait // for a flush to a persistent deletion list so that we may be sure deletion will occur. - self.flush_deletion_queue().await?; + self.flush_deletion_queue() + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; fail::fail_point!("timeline-delete-after-index-delete", |_| { - Err(anyhow::anyhow!( + Err(DeleteTimelineError::Other(anyhow::anyhow!( "failpoint: timeline-delete-after-index-delete" - ))? + )))? }); info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json"); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 311546659df9..2bc14ec3172c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5043,7 +5043,7 @@ impl Timeline { // 1. Is it newer than GC horizon cutoff point? if l.get_lsn_range().end > space_cutoff { - debug!( + info!( "keeping {} because it's newer than space_cutoff {}", l.layer_name(), space_cutoff, @@ -5054,7 +5054,7 @@ impl Timeline { // 2. It is newer than PiTR cutoff point? if l.get_lsn_range().end > time_cutoff { - debug!( + info!( "keeping {} because it's newer than time_cutoff {}", l.layer_name(), time_cutoff, @@ -5073,7 +5073,7 @@ impl Timeline { for retain_lsn in &retain_lsns { // start_lsn is inclusive if &l.get_lsn_range().start <= retain_lsn { - debug!( + info!( "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}", l.layer_name(), retain_lsn, @@ -5088,7 +5088,7 @@ impl Timeline { if let Some(lsn) = &max_lsn_with_valid_lease { // keep if layer start <= any of the lease if &l.get_lsn_range().start <= lsn { - debug!( + info!( "keeping {} because there is a valid lease preventing GC at {}", l.layer_name(), lsn, @@ -5120,13 +5120,13 @@ impl Timeline { if !layers .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff)) { - debug!("keeping {} because it is the latest layer", l.layer_name()); + info!("keeping {} because it is the latest layer", l.layer_name()); result.layers_not_updated += 1; continue 'outer; } // We didn't find any reason to keep this file, so remove it. - debug!( + info!( "garbage collecting {} is_dropped: xx is_incremental: {}", l.layer_name(), l.is_incremental(), diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index a5368e9bea65..13a8dfa51a2e 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -5,6 +5,7 @@ use std::{ use anyhow::Context; use pageserver_api::{models::TimelineState, shard::TenantShardId}; +use remote_storage::DownloadError; use tokio::sync::OwnedMutexGuard; use tracing::{error, info, info_span, instrument, Instrument}; use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint}; @@ -16,7 +17,7 @@ use crate::{ metadata::TimelineMetadata, remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient}, CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, Tenant, - TimelineOrOffloaded, + TenantManifestError, TimelineOrOffloaded, }, virtual_file::MaybeFatalIo, }; @@ -110,13 +111,6 @@ pub(super) async fn delete_local_timeline_directory( info!("finished deleting layer files, releasing locks"); } -/// Removes remote layers and an index file after them. -async fn delete_remote_layers_and_index( - remote_client: &Arc, -) -> anyhow::Result<()> { - remote_client.delete_all().await.context("delete_all") -} - /// It is important that this gets called when DeletionGuard is being held. /// For more context see comments in [`DeleteTimelineFlow::prepare`] async fn remove_maybe_offloaded_timeline_from_tenant( @@ -222,11 +216,24 @@ impl DeleteTimelineFlow { None => { let remote_client = tenant .build_timeline_client(timeline.timeline_id(), tenant.remote_storage.clone()); - let result = remote_client + let result = match remote_client .download_index_file(&tenant.cancel) .instrument(info_span!("download_index_file")) .await - .map_err(|e| DeleteTimelineError::Other(anyhow::anyhow!("error: {:?}", e)))?; + { + Ok(r) => r, + Err(DownloadError::NotFound) => { + // Deletion is already complete + tracing::info!("Timeline already deleted in remote storage"); + return Ok(()); + } + Err(e) => { + return Err(DeleteTimelineError::Other(anyhow::anyhow!( + "error: {:?}", + e + ))); + } + }; let index_part = match result { MaybeDeletedIndexPart::Deleted(p) => { tracing::info!("Timeline already set as deleted in remote index"); @@ -407,7 +414,12 @@ impl DeleteTimelineFlow { "timeline_delete", async move { if let Err(err) = Self::background(guard, conf, &tenant, &timeline, remote_client).await { - error!("Error: {err:#}"); + // Only log as an error if it's not a cancellation. + if matches!(err, DeleteTimelineError::Cancelled) { + info!("Shutdown during timeline deletion"); + }else { + error!("Error: {err:#}"); + } if let TimelineOrOffloaded::Timeline(timeline) = timeline { timeline.set_broken(format!("{err:#}")) } @@ -439,7 +451,7 @@ impl DeleteTimelineFlow { Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))? }); - delete_remote_layers_and_index(&remote_client).await?; + remote_client.delete_all().await?; pausable_failpoint!("in_progress_delete"); @@ -450,10 +462,10 @@ impl DeleteTimelineFlow { // So indeed, the tenant manifest might refer to an offloaded timeline which has already been deleted. // However, we handle this case in tenant loading code so the next time we attach, the issue is // resolved. - tenant - .store_tenant_manifest() - .await - .map_err(|e| DeleteTimelineError::Other(anyhow::anyhow!(e)))?; + tenant.store_tenant_manifest().await.map_err(|e| match e { + TenantManifestError::Cancelled => DeleteTimelineError::Cancelled, + _ => DeleteTimelineError::Other(e.into()), + })?; *guard = Self::Finished; diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index c3ccd8a2e4e3..84e553f33012 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -587,11 +587,29 @@ impl WalIngest { forknum: VISIBILITYMAP_FORKNUM, }; - let mut vm_page_no = blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE; - if blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 { - // Tail of last remaining vm page has to be zeroed. - // We are not precise here and instead of digging in VM bitmap format just clear the whole page. - modification.put_rel_page_image_zero(rel, vm_page_no)?; + // last remaining block, byte, and bit + let mut vm_page_no = blkno / (pg_constants::VM_HEAPBLOCKS_PER_PAGE as u32); + let trunc_byte = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_PAGE + / pg_constants::VM_HEAPBLOCKS_PER_BYTE; + let trunc_offs = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_BYTE + * pg_constants::VM_BITS_PER_HEAPBLOCK; + + // Unless the new size is exactly at a visibility map page boundary, the + // tail bits in the last remaining map page, representing truncated heap + // blocks, need to be cleared. This is not only tidy, but also necessary + // because we don't get a chance to clear the bits if the heap is extended + // again. + if (trunc_byte != 0 || trunc_offs != 0) + && self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no)) + { + modification.put_rel_wal_record( + rel, + vm_page_no, + NeonWalRecord::TruncateVisibilityMap { + trunc_byte, + trunc_offs, + }, + )?; vm_page_no += 1; } let nblocks = get_relsize(modification, rel, ctx).await?; diff --git a/pageserver/src/walredo/apply_neon.rs b/pageserver/src/walredo/apply_neon.rs index 78601d87af04..d62e325310dc 100644 --- a/pageserver/src/walredo/apply_neon.rs +++ b/pageserver/src/walredo/apply_neon.rs @@ -42,6 +42,34 @@ pub(crate) fn apply_in_neon( } => { anyhow::bail!("tried to pass postgres wal record to neon WAL redo"); } + // + // Code copied from PostgreSQL `visibilitymap_prepare_truncate` function in `visibilitymap.c` + // + NeonWalRecord::TruncateVisibilityMap { + trunc_byte, + trunc_offs, + } => { + // sanity check that this is modifying the correct relation + let (rel, _) = key.to_rel_block().context("invalid record")?; + assert!( + rel.forknum == VISIBILITYMAP_FORKNUM, + "TruncateVisibilityMap record on unexpected rel {}", + rel + ); + let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; + map[*trunc_byte + 1..].fill(0u8); + /*---- + * Mask out the unwanted bits of the last remaining byte. + * + * ((1 << 0) - 1) = 00000000 + * ((1 << 1) - 1) = 00000001 + * ... + * ((1 << 6) - 1) = 00111111 + * ((1 << 7) - 1) = 01111111 + *---- + */ + map[*trunc_byte] &= (1 << *trunc_offs) - 1; + } NeonWalRecord::ClearVisibilityMapFlags { new_heap_blkno, old_heap_blkno, diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 11f372bceb1c..c3bb6cd12c26 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -127,23 +127,29 @@ pub struct PhysicalStorage { /// - doesn't point to the end of the segment file: Option, - /// When false, we have just initialized storage using the LSN from find_end_of_wal(). - /// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular, - /// there can be a case with unexpected .partial file. + /// When true, WAL truncation potentially has been interrupted and we need + /// to finish it before allowing WAL writes; see truncate_wal for details. + /// In this case [`write_lsn`] can be less than actually written WAL on + /// disk. In particular, there can be a case with unexpected .partial file. /// /// Imagine the following: /// - 000000010000000000000001 - /// - it was fully written, but the last record is split between 2 segments - /// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in the end of this segment - /// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were initialized to 0/1FFFFF0 + /// - it was fully written, but the last record is split between 2 + /// segments + /// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in + /// the end of this segment + /// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were + /// initialized to 0/1FFFFF0 /// - 000000010000000000000002.partial - /// - it has only 1 byte written, which is not enough to make a full WAL record + /// - it has only 1 byte written, which is not enough to make a full WAL + /// record /// - /// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal(). - /// This flag will be set to true after the first truncate_wal() call. + /// Partial segment 002 has no WAL records, and it will be removed by the + /// next truncate_wal(). This flag will be set to true after the first + /// truncate_wal() call. /// /// [`write_lsn`]: Self::write_lsn - is_truncated_after_restart: bool, + pending_wal_truncation: bool, } impl PhysicalStorage { @@ -208,7 +214,7 @@ impl PhysicalStorage { flush_record_lsn: flush_lsn, decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000), file: None, - is_truncated_after_restart: false, + pending_wal_truncation: true, }) } @@ -405,6 +411,13 @@ impl Storage for PhysicalStorage { startpos ); } + if self.pending_wal_truncation { + bail!( + "write_wal called with pending WAL truncation, write_lsn={}, startpos={}", + self.write_lsn, + startpos + ); + } let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?; // WAL is written, updating write metrics @@ -479,15 +492,34 @@ impl Storage for PhysicalStorage { ); } - // Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on - // disk (this happens on each connect). - if self.is_truncated_after_restart + // Quick exit if nothing to do and we know that the state is clean to + // avoid writing up to 16 MiB of zeros on disk (this happens on each + // connect). + if !self.pending_wal_truncation && end_pos == self.write_lsn && end_pos == self.flush_record_lsn { return Ok(()); } + // Atomicity: we start with LSNs reset because once on disk deletion is + // started it can't be reversed. However, we might crash/error in the + // middle, leaving garbage above the truncation point. In theory, + // concatenated with previous records it might form bogus WAL (though + // very unlikely in practice because CRC would guard from that). To + // protect, set pending_wal_truncation flag before beginning: it means + // truncation must be retried and WAL writes are prohibited until it + // succeeds. Flag is also set on boot because we don't know if the last + // state was clean. + // + // Protocol (HandleElected before first AppendRequest) ensures we'll + // always try to ensure clean truncation before any writes. + self.pending_wal_truncation = true; + + self.write_lsn = end_pos; + self.write_record_lsn = end_pos; + self.flush_record_lsn = end_pos; + // Close previously opened file, if any if let Some(unflushed_file) = self.file.take() { self.fdatasync_file(&unflushed_file).await?; @@ -513,11 +545,7 @@ impl Storage for PhysicalStorage { fs::rename(wal_file_path, wal_file_partial_path).await?; } - // Update LSNs - self.write_lsn = end_pos; - self.write_record_lsn = end_pos; - self.flush_record_lsn = end_pos; - self.is_truncated_after_restart = true; + self.pending_wal_truncation = false; Ok(()) } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index e3a147bc0603..446c476b99c5 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -3642,6 +3642,7 @@ impl Service { match res { Ok(ok) => Ok(ok), Err(mgmt_api::Error::ApiError(StatusCode::CONFLICT, _)) => Ok(StatusCode::CONFLICT), + Err(mgmt_api::Error::ApiError(StatusCode::SERVICE_UNAVAILABLE, msg)) => Err(ApiError::ResourceUnavailable(msg.into())), Err(e) => { Err( ApiError::InternalServerError(anyhow::anyhow!( @@ -6355,6 +6356,19 @@ impl Service { // Pick the biggest tenant to split first top_n.sort_by_key(|i| i.resident_size); + + // Filter out tenants in a prohibiting scheduling mode + { + let locked = self.inner.read().unwrap(); + top_n.retain(|i| { + if let Some(shard) = locked.tenants.get(&i.id) { + matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) + } else { + false + } + }); + } + let Some(split_candidate) = top_n.into_iter().next() else { tracing::debug!("No split-elegible shards found"); return; diff --git a/test_runner/fixtures/neon_api.py b/test_runner/fixtures/neon_api.py index 89c1f324b421..9de6681bebc9 100644 --- a/test_runner/fixtures/neon_api.py +++ b/test_runner/fixtures/neon_api.py @@ -5,6 +5,8 @@ import requests +from fixtures.log_helper import log + if TYPE_CHECKING: from typing import Any, Literal, Optional @@ -30,7 +32,11 @@ def __request(self, method: str | bytes, endpoint: str, **kwargs: Any) -> reques kwargs["headers"] = {} kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}" - return requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs) + resp = requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs) + log.debug("%s %s returned a %d: %s", method, endpoint, resp.status_code, resp.text) + resp.raise_for_status() + + return resp def create_project( self, @@ -66,8 +72,6 @@ def create_project( json=data, ) - assert resp.status_code == 201 - return cast("dict[str, Any]", resp.json()) def get_project_details(self, project_id: str) -> dict[str, Any]: @@ -79,7 +83,7 @@ def get_project_details(self, project_id: str) -> dict[str, Any]: "Content-Type": "application/json", }, ) - assert resp.status_code == 200 + return cast("dict[str, Any]", resp.json()) def delete_project( @@ -95,8 +99,6 @@ def delete_project( }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def start_endpoint( @@ -112,8 +114,6 @@ def start_endpoint( }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def suspend_endpoint( @@ -129,8 +129,6 @@ def suspend_endpoint( }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def restart_endpoint( @@ -146,8 +144,6 @@ def restart_endpoint( }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def create_endpoint( @@ -178,8 +174,6 @@ def create_endpoint( json=data, ) - assert resp.status_code == 201 - return cast("dict[str, Any]", resp.json()) def get_connection_uri( @@ -206,8 +200,6 @@ def get_connection_uri( }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def get_branches(self, project_id: str) -> dict[str, Any]: @@ -219,8 +211,6 @@ def get_branches(self, project_id: str) -> dict[str, Any]: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def get_endpoints(self, project_id: str) -> dict[str, Any]: @@ -232,8 +222,6 @@ def get_endpoints(self, project_id: str) -> dict[str, Any]: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def get_operations(self, project_id: str) -> dict[str, Any]: @@ -246,8 +234,6 @@ def get_operations(self, project_id: str) -> dict[str, Any]: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def wait_for_operation_to_finish(self, project_id: str): diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 990db1aed01a..205a47a9d5d7 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2379,6 +2379,17 @@ def __init__(self, env: NeonEnv, id: int, port: PageserverPort, az_id: str): # # The entries in the list are regular experessions. self.allowed_errors: list[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS) + # Store persistent failpoints that should be reapplied on each start + self._persistent_failpoints: dict[str, str] = {} + + def add_persistent_failpoint(self, name: str, action: str): + """ + Add a failpoint that will be automatically reapplied each time the pageserver starts. + The failpoint will be set immediately if the pageserver is running. + """ + self._persistent_failpoints[name] = action + if self.running: + self.http_client().configure_failpoints([(name, action)]) def timeline_dir( self, @@ -2446,6 +2457,15 @@ def start( """ assert self.running is False + if self._persistent_failpoints: + # Tests shouldn't use this mechanism _and_ set FAILPOINTS explicitly + assert extra_env_vars is None or "FAILPOINTS" not in extra_env_vars + if extra_env_vars is None: + extra_env_vars = {} + extra_env_vars["FAILPOINTS"] = ",".join( + f"{k}={v}" for (k, v) in self._persistent_failpoints.items() + ) + storage = self.env.pageserver_remote_storage if isinstance(storage, S3Storage): s3_env_vars = storage.access_env_vars() @@ -4522,7 +4542,7 @@ def pytest_addoption(parser: Parser): SMALL_DB_FILE_NAME_REGEX: re.Pattern[str] = re.compile( - r"config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql|conf)" + r"config-v1|heatmap-v1|tenant-manifest|metadata|.+\.(?:toml|pid|json|sql|conf)" ) diff --git a/test_runner/performance/test_logical_replication.py b/test_runner/performance/test_logical_replication.py index 050c09c1e549..9d653d1a1ef8 100644 --- a/test_runner/performance/test_logical_replication.py +++ b/test_runner/performance/test_logical_replication.py @@ -1,6 +1,8 @@ from __future__ import annotations import time +from collections.abc import Iterator +from contextlib import contextmanager from typing import TYPE_CHECKING, cast import psycopg2 @@ -18,7 +20,7 @@ from fixtures.benchmark_fixture import NeonBenchmarker from fixtures.neon_api import NeonApiEndpoint from fixtures.neon_fixtures import NeonEnv, PgBin, VanillaPostgres - from psycopg2.extensions import cursor + from psycopg2.extensions import connection, cursor @pytest.mark.timeout(1000) @@ -292,6 +294,48 @@ def test_snap_files( then runs pgbench inserts while generating large numbers of snapfiles. Then restarts the node and tries to peek the replication changes. """ + + @contextmanager + def replication_slot(conn: connection, slot_name: str) -> Iterator[None]: + """ + Make sure that the replication slot doesn't outlive the test. Normally + we wouldn't want this behavior, but since the test creates and drops + the replication slot, we do. + + We've had problems in the past where this slot sticking around caused + issues with the publisher retaining WAL during the execution of the + other benchmarks in this suite. + """ + + def __drop_replication_slot(c: cursor) -> None: + c.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 + FROM pg_replication_slots + WHERE slot_name = %(slot_name)s + ) THEN + PERFORM pg_drop_replication_slot(%(slot_name)s); + END IF; + END $$; + """, + {"slot_name": slot_name}, + ) + + with conn.cursor() as c: + __drop_replication_slot(c) + c.execute( + "SELECT pg_create_logical_replication_slot(%(slot_name)s, 'test_decoding')", + {"slot_name": slot_name}, + ) + + yield + + with conn.cursor() as c: + __drop_replication_slot(c) + test_duration_min = 60 test_interval_min = 5 pgbench_duration = f"-T{test_duration_min * 60 * 2}" @@ -314,48 +358,35 @@ def test_snap_files( conn = psycopg2.connect(connstr) conn.autocommit = True - with conn.cursor() as cur: - cur.execute( - """ - DO $$ - BEGIN - IF EXISTS ( - SELECT 1 - FROM pg_replication_slots - WHERE slot_name = 'slotter' - ) THEN - PERFORM pg_drop_replication_slot('slotter'); - END IF; - END $$; - """ + with replication_slot(conn, "slotter"): + workload = pg_bin.run_nonblocking( + ["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env ) - cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')") + try: + start = time.time() + prev_measurement = time.time() + while time.time() - start < test_duration_min * 60: + conn = psycopg2.connect(connstr) + conn.autocommit = True + + with conn.cursor() as cur: + cur.execute( + "SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s" + ) + check_pgbench_still_running(workload) + cur.execute( + "SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())" + ) + + conn.close() + + # Measure storage + if time.time() - prev_measurement > test_interval_min * 60: + storage = benchmark_project_pub.get_synthetic_storage_size() + zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER) + prev_measurement = time.time() + time.sleep(test_interval_min * 60 / 3) + finally: + workload.terminate() conn.close() - - workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env) - try: - start = time.time() - prev_measurement = time.time() - while time.time() - start < test_duration_min * 60: - conn = psycopg2.connect(connstr) - conn.autocommit = True - - with conn.cursor() as cur: - cur.execute( - "SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s" - ) - check_pgbench_still_running(workload) - cur.execute("SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())") - - conn.close() - - # Measure storage - if time.time() - prev_measurement > test_interval_min * 60: - storage = benchmark_project_pub.get_synthetic_storage_size() - zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER) - prev_measurement = time.time() - time.sleep(test_interval_min * 60 / 3) - - finally: - workload.terminate() diff --git a/test_runner/regress/test_readonly_node.py b/test_runner/regress/test_readonly_node.py index f257f0853bb9..826136d5f93f 100644 --- a/test_runner/regress/test_readonly_node.py +++ b/test_runner/regress/test_readonly_node.py @@ -122,6 +122,7 @@ def test_readonly_node(neon_simple_env: NeonEnv): ) +@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/9754") def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): """ Test static endpoint is protected from GC by acquiring and renewing lsn leases. diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py index 61786c3b3e05..c3d5aea0b63c 100644 --- a/test_runner/regress/test_timeline_archive.py +++ b/test_runner/regress/test_timeline_archive.py @@ -1,10 +1,14 @@ from __future__ import annotations import json +import random +import threading +import time from typing import Optional import pytest -from fixtures.common_types import TenantId, TimelineArchivalState, TimelineId +import requests +from fixtures.common_types import TenantId, TenantShardId, TimelineArchivalState, TimelineId from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, @@ -17,8 +21,9 @@ list_prefix, wait_until_tenant_active, ) +from fixtures.pg_version import PgVersion from fixtures.remote_storage import S3Storage, s3_storage -from fixtures.utils import wait_until +from fixtures.utils import run_only_on_default_postgres, wait_until from mypy_boto3_s3.type_defs import ( ObjectTypeDef, ) @@ -384,6 +389,258 @@ def child_offloaded(): ) +@run_only_on_default_postgres("this test isn't sensitive to the contents of timelines") +def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder): + """ + A general consistency check on archival/offload timeline state, and its intersection + with tenant migrations and timeline deletions. + """ + + # Offloading is off by default at time of writing: remove this line when it's on by default + neon_env_builder.pageserver_config_override = "timeline_offloading = true" + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + + # We will exercise migrations, so need multiple pageservers + neon_env_builder.num_pageservers = 2 + + env = neon_env_builder.init_start( + initial_tenant_conf={ + "compaction_period": "1s", + } + ) + tenant_id = env.initial_tenant + tenant_shard_id = TenantShardId(tenant_id, 0, 0) + + # Unavailable pageservers during timeline CRUD operations can be logged as errors on the storage controller + env.storage_controller.allowed_errors.append(".*error sending request.*") + + for ps in env.pageservers: + # We will do unclean restarts, which results in these messages when cleaning up files + ps.allowed_errors.extend( + [ + ".*removing local file.*because it has unexpected length.*", + ".*__temp.*", + # FIXME: there are still anyhow::Error paths in timeline creation/deletion which + # generate 500 results when called during shutdown + ".*InternalServerError.*", + # FIXME: there are still anyhow::Error paths in timeline deletion that generate + # log lines at error severity + ".*delete_timeline.*Error", + ] + ) + + class TimelineState: + def __init__(self): + self.timeline_id = TimelineId.generate() + self.created = False + self.archived = False + self.offloaded = False + self.deleted = False + + controller_ps_api = env.storage_controller.pageserver_api() + + shutdown = threading.Event() + + violations = [] + + timelines_deleted = [] + + def list_timelines(tenant_id) -> tuple[set[TimelineId], set[TimelineId]]: + """Get the list of active and offloaded TimelineId""" + listing = controller_ps_api.timeline_and_offloaded_list(tenant_id) + active_ids = set([TimelineId(t["timeline_id"]) for t in listing.timelines]) + offloaded_ids = set([TimelineId(t["timeline_id"]) for t in listing.offloaded]) + + return (active_ids, offloaded_ids) + + def timeline_objects(tenant_shard_id, timeline_id): + response = list_prefix( + env.pageserver_remote_storage, # type: ignore + prefix="/".join( + ( + "tenants", + str(tenant_shard_id), + "timelines", + str(timeline_id), + ) + ) + + "/", + ) + + return [k["Key"] for k in response.get("Contents", [])] + + def worker(): + """ + Background thread which drives timeline lifecycle operations, and checks that between steps + it obeys invariants. This should detect errors in pageserver persistence and in errors in + concurrent operations on different timelines when it is run many times in parallel. + """ + state = TimelineState() + + # Jitter worker startup, we're not interested in exercising lots of concurrent creations + # as we know that's I/O bound. + shutdown.wait(random.random() * 10) + + while not shutdown.is_set(): + # A little wait between actions to jitter out the API calls rather than having them + # all queue up at once + shutdown.wait(random.random()) + + try: + if not state.created: + log.info(f"Creating timeline {state.timeline_id}") + controller_ps_api.timeline_create( + PgVersion.NOT_SET, tenant_id=tenant_id, new_timeline_id=state.timeline_id + ) + state.created = True + + if ( + timeline_objects( + tenant_shard_id=tenant_shard_id, timeline_id=state.timeline_id + ) + == [] + ): + msg = f"Timeline {state.timeline_id} unexpectedly not present in remote storage" + violations.append(msg) + + elif state.deleted: + # Try to confirm its deletion completed. + # Deleted timeline should not appear in listing API, either as offloaded or active + (active_ids, offloaded_ids) = list_timelines(tenant_id) + if state.timeline_id in active_ids or state.timeline_id in offloaded_ids: + msg = f"Timeline {state.timeline_id} appeared in listing after deletion was acked" + violations.append(msg) + raise RuntimeError(msg) + + objects = timeline_objects(tenant_shard_id, state.timeline_id) + if len(objects) == 0: + log.info(f"Confirmed deletion of timeline {state.timeline_id}") + timelines_deleted.append(state.timeline_id) + state = TimelineState() # A new timeline ID to create on next iteration + else: + # Deletion of objects doesn't have to be synchronous, we will keep polling + log.info(f"Timeline {state.timeline_id} objects still exist: {objects}") + shutdown.wait(random.random()) + else: + # The main lifetime of a timeline: proceed active->archived->offloaded->deleted + if not state.archived: + log.info(f"Archiving timeline {state.timeline_id}") + controller_ps_api.timeline_archival_config( + tenant_id, state.timeline_id, TimelineArchivalState.ARCHIVED + ) + state.archived = True + elif state.archived and not state.offloaded: + log.info(f"Waiting for offload of timeline {state.timeline_id}") + # Wait for offload: this should happen fast because we configured a short compaction interval + while not shutdown.is_set(): + (active_ids, offloaded_ids) = list_timelines(tenant_id) + if state.timeline_id in active_ids: + log.info(f"Timeline {state.timeline_id} is still active") + shutdown.wait(0.5) + elif state.timeline_id in offloaded_ids: + log.info(f"Timeline {state.timeline_id} is now offloaded") + state.offloaded = True + break + else: + # Timeline is neither offloaded nor active, this is unexpected: the pageserver + # should ensure that the timeline appears in either the offloaded list or main list + msg = f"Timeline {state.timeline_id} disappeared!" + violations.append(msg) + raise RuntimeError(msg) + elif state.offloaded: + # Once it's offloaded it should only be in offloaded or deleted state: check + # it didn't revert back to active. This tests that the manfiest is doing its + # job to suppress loading of offloaded timelines as active. + (active_ids, offloaded_ids) = list_timelines(tenant_id) + if state.timeline_id in active_ids: + msg = f"Timeline {state.timeline_id} is active, should be offloaded or deleted" + violations.append(msg) + raise RuntimeError(msg) + + log.info(f"Deleting timeline {state.timeline_id}") + controller_ps_api.timeline_delete(tenant_id, state.timeline_id) + state.deleted = True + else: + raise RuntimeError("State should be unreachable") + except PageserverApiException as e: + # This is expected: we are injecting chaos, API calls will sometimes fail. + # TODO: can we narrow this to assert we are getting friendly 503s? + log.info(f"Iteration error, will retry: {e}") + shutdown.wait(random.random()) + except requests.exceptions.RetryError as e: + # Retryable error repeated more times than `requests` is configured to tolerate, this + # is expected when a pageserver remains unavailable for a couple seconds + log.info(f"Iteration error, will retry: {e}") + shutdown.wait(random.random()) + except Exception as e: + log.warning( + f"Unexpected worker exception (current timeline {state.timeline_id}): {e}" + ) + else: + # In the non-error case, use a jitterd but small wait, we want to keep + # a high rate of operations going + shutdown.wait(random.random() * 0.1) + + n_workers = 4 + threads = [] + for _i in range(0, n_workers): + t = threading.Thread(target=worker) + t.start() + threads.append(t) + + # Set delay failpoints so that deletions and migrations take some time, and have a good + # chance to interact with other concurrent timeline mutations. + env.storage_controller.configure_failpoints( + [("reconciler-live-migrate-pre-await-lsn", "sleep(1)")] + ) + for ps in env.pageservers: + ps.add_persistent_failpoint("in_progress_delete", "sleep(1)") + + # Generate some chaos, while our workers are trying to complete their timeline operations + rng = random.Random() + try: + chaos_rounds = 48 + for _i in range(0, chaos_rounds): + action = rng.choice([0, 1]) + if action == 0: + # Pick a random pageserver to gracefully restart + pageserver = rng.choice(env.pageservers) + + # Whether to use a graceful shutdown or SIGKILL + immediate = random.choice([True, False]) + log.info(f"Restarting pageserver {pageserver.id}, immediate={immediate}") + + t1 = time.time() + pageserver.restart(immediate=immediate) + restart_duration = time.time() - t1 + + # Make sure we're up for as long as we spent restarting, to ensure operations can make progress + log.info(f"Staying alive for {restart_duration}s") + time.sleep(restart_duration) + else: + # Migrate our tenant between pageservers + origin_ps = env.get_tenant_pageserver(tenant_shard_id) + dest_ps = rng.choice([ps for ps in env.pageservers if ps.id != origin_ps.id]) + log.info(f"Migrating {tenant_shard_id} {origin_ps.id}->{dest_ps.id}") + env.storage_controller.tenant_shard_migrate( + tenant_shard_id=tenant_shard_id, dest_ps_id=dest_ps.id + ) + + log.info(f"Full timeline lifecycles so far: {len(timelines_deleted)}") + finally: + shutdown.set() + + for thread in threads: + thread.join() + + # Sanity check that during our run we did exercise some full timeline lifecycles, in case + # one of our workers got stuck + assert len(timelines_deleted) > 10 + + # That no invariant-violations were reported by workers + assert violations == [] + + @pytest.mark.parametrize("with_intermediary", [False, True]) @pytest.mark.parametrize( "offload_child", diff --git a/test_runner/regress/test_vm_truncate.py b/test_runner/regress/test_vm_truncate.py new file mode 100644 index 000000000000..43b4f2d8b10b --- /dev/null +++ b/test_runner/regress/test_vm_truncate.py @@ -0,0 +1,33 @@ +from fixtures.neon_fixtures import NeonEnv + + +# +# Test that VM is properly truncated +# +def test_vm_truncate(neon_simple_env: NeonEnv): + env = neon_simple_env + + endpoint = env.endpoints.create_start("main") + con = endpoint.connect() + cur = con.cursor() + cur.execute("CREATE EXTENSION neon_test_utils") + cur.execute("CREATE EXTENSION pageinspect") + + cur.execute( + "create table t(pk integer primary key, counter integer default 0, filler text default repeat('?', 200))" + ) + cur.execute("insert into t (pk) values (generate_series(1,1000))") + cur.execute("delete from t where pk>10") + cur.execute("vacuum t") # truncates the relation, including its VM and FSM + # get image of the first block of the VM excluding the page header. It's expected + # to still be in the buffer cache. + # ignore page header (24 bytes, 48 - it's hex representation) + cur.execute("select substr(encode(get_raw_page('t', 'vm', 0), 'hex'), 48)") + pg_bitmap = cur.fetchall()[0][0] + # flush shared buffers + cur.execute("SELECT clear_buffer_cache()") + # now download the first block of the VM from the pageserver ... + cur.execute("select substr(encode(get_raw_page('t', 'vm', 0), 'hex'), 48)") + ps_bitmap = cur.fetchall()[0][0] + # and check that content of bitmaps are equal, i.e. PS is producing the same VM page as Postgres + assert pg_bitmap == ps_bitmap diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index ae4018a8849c..d6773987eab1 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -58,7 +58,7 @@ num-integer = { version = "0.1", features = ["i128"] } num-traits = { version = "0.2", features = ["i128", "libm"] } once_cell = { version = "1" } parquet = { version = "53", default-features = false, features = ["zstd"] } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", default-features = false, features = ["with-serde_json-1"] } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", default-features = false, features = ["with-serde_json-1"] } prost = { version = "0.13", features = ["prost-derive"] } rand = { version = "0.8", features = ["small_rng"] } regex = { version = "1" } @@ -78,7 +78,7 @@ sync_wrapper = { version = "0.1", default-features = false, features = ["futures tikv-jemalloc-sys = { version = "0.5" } time = { version = "0.3", features = ["macros", "serde-well-known"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", features = ["with-serde_json-1"] } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", features = ["with-serde_json-1"] } tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] } tokio-stream = { version = "0.1", features = ["net"] } tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }