From a65d4379309e29a23f9e3544988712b33a89a75a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 24 Sep 2024 15:05:07 +0200 Subject: [PATCH 01/20] chore(#9077): cleanups & code dedup (#9082) Punted from https://github.com/neondatabase/neon/pull/9077 --- pageserver/src/metrics.rs | 33 +++++++++++++-------------------- pageserver/src/tenant/tasks.rs | 3 +-- 2 files changed, 14 insertions(+), 22 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 162e8d1836ff..366bd8290340 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -3208,45 +3208,38 @@ pub(crate) mod tenant_throttling { impl TimelineGet { pub(crate) fn new(tenant_shard_id: &TenantShardId) -> Self { + let per_tenant_label_values = &[ + KIND, + &tenant_shard_id.tenant_id.to_string(), + &tenant_shard_id.shard_slug().to_string(), + ]; TimelineGet { count_accounted_start: { GlobalAndPerTenantIntCounter { global: COUNT_ACCOUNTED_START.with_label_values(&[KIND]), - per_tenant: COUNT_ACCOUNTED_START_PER_TENANT.with_label_values(&[ - KIND, - &tenant_shard_id.tenant_id.to_string(), - &tenant_shard_id.shard_slug().to_string(), - ]), + per_tenant: COUNT_ACCOUNTED_START_PER_TENANT + .with_label_values(per_tenant_label_values), } }, count_accounted_finish: { GlobalAndPerTenantIntCounter { global: COUNT_ACCOUNTED_FINISH.with_label_values(&[KIND]), - per_tenant: COUNT_ACCOUNTED_FINISH_PER_TENANT.with_label_values(&[ - KIND, - &tenant_shard_id.tenant_id.to_string(), - &tenant_shard_id.shard_slug().to_string(), - ]), + per_tenant: COUNT_ACCOUNTED_FINISH_PER_TENANT + .with_label_values(per_tenant_label_values), } }, wait_time: { GlobalAndPerTenantIntCounter { global: WAIT_USECS.with_label_values(&[KIND]), - per_tenant: WAIT_USECS_PER_TENANT.with_label_values(&[ - KIND, - &tenant_shard_id.tenant_id.to_string(), - &tenant_shard_id.shard_slug().to_string(), - ]), + per_tenant: WAIT_USECS_PER_TENANT + .with_label_values(per_tenant_label_values), } }, count_throttled: { GlobalAndPerTenantIntCounter { global: WAIT_COUNT.with_label_values(&[KIND]), - per_tenant: WAIT_COUNT_PER_TENANT.with_label_values(&[ - KIND, - &tenant_shard_id.tenant_id.to_string(), - &tenant_shard_id.shard_slug().to_string(), - ]), + per_tenant: WAIT_COUNT_PER_TENANT + .with_label_values(per_tenant_label_values), } }, } diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 341febb30ab9..3f0f8a21c8a5 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -481,8 +481,7 @@ async fn ingest_housekeeping_loop(tenant: Arc, cancel: CancellationToken let allowed_rps = tenant.timeline_get_throttle.steady_rps(); let delta = now - prev; info!( - n_seconds=%format_args!("{:.3}", - delta.as_secs_f64()), + n_seconds=%format_args!("{:.3}", delta.as_secs_f64()), count_accounted = count_accounted_finish, // don't break existing log scraping count_throttled, sum_throttled_usecs, From b224a5a37734d05ffc88143750352eb318cba90d Mon Sep 17 00:00:00 2001 From: a-masterov <72613290+a-masterov@users.noreply.github.com> Date: Tue, 24 Sep 2024 15:13:18 +0200 Subject: [PATCH 02/20] Move the patch to compute (#9120) ## Problem All the other patches were moved to the compute directory, and only one was left in the patches subdirectory in the root directory. ## Summary of changes The patch was moved to the compute directory as others --- .github/workflows/cloud-regress.yml | 2 +- {patches => compute/patches}/cloud_regress_pg16.patch | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename {patches => compute/patches}/cloud_regress_pg16.patch (100%) diff --git a/.github/workflows/cloud-regress.yml b/.github/workflows/cloud-regress.yml index de6babdde39a..ecafe183f8a8 100644 --- a/.github/workflows/cloud-regress.yml +++ b/.github/workflows/cloud-regress.yml @@ -42,7 +42,7 @@ jobs: - name: Patch the test run: | cd "vendor/postgres-v${DEFAULT_PG_VERSION}" - patch -p1 < "../../patches/cloud_regress_pg${DEFAULT_PG_VERSION}.patch" + patch -p1 < "../../compute/patches/cloud_regress_pg${DEFAULT_PG_VERSION}.patch" - name: Generate a random password id: pwgen diff --git a/patches/cloud_regress_pg16.patch b/compute/patches/cloud_regress_pg16.patch similarity index 100% rename from patches/cloud_regress_pg16.patch rename to compute/patches/cloud_regress_pg16.patch From 70fe0075192d5bc4cbfec5f472ca466d0df477b9 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 24 Sep 2024 16:41:59 +0300 Subject: [PATCH 03/20] test: Make test_hot_standby_feedback more forgiving of slow initialization (#9113) Don't start waiting for the index to appear in the secondary until it has been created in the primary. Before, if the "pgbench -i" step took more than 60 s, we would give up. There was a flaky test failure along those lines at: https://neon-github-public-dev.s3.amazonaws.com/reports/pr-9105/10997477941/index.html#suites/950eff205b552e248417890b8b8f189e/73cf4b5648fa6f74/ Hopefully, this avoids such failures in the future. --- test_runner/regress/test_hot_standby.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 35e0c0decb26..be8f70bb7076 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -198,9 +198,6 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool): def run_pgbench(connstr: str, pg_bin: PgBin): log.info(f"Start a pgbench workload on pg {connstr}") - # s10 is about 150MB of data. In debug mode init takes about 15s on SSD. - pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", "-s10", connstr]) - log.info("pgbench init done") pg_bin.run_capture(["pgbench", "-T60", connstr]) @@ -247,9 +244,15 @@ def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): log.info( f"primary connstr is {primary.connstr()}, secondary connstr {secondary.connstr()}" ) + + # s10 is about 150MB of data. In debug mode init takes about 15s on SSD. + pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", "-s10", primary.connstr()]) + log.info("pgbench init done in primary") + t = threading.Thread(target=run_pgbench, args=(primary.connstr(), pg_bin)) t.start() - # Wait until pgbench_accounts is created + filled on replica *and* + + # Wait until we see that the pgbench_accounts is created + filled on replica *and* # index is created. Otherwise index creation would conflict with # read queries and hs feedback won't save us. wait_until(60, 1.0, partial(pgbench_accounts_initialized, secondary)) From 589594c2e1447632b28d31ec69602782ce4634d7 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 23 Sep 2024 20:48:41 +0300 Subject: [PATCH 04/20] test: Skip fsync when initdb'ing the storage controller db After initdb, we configure it with "fsync=off" anyway. --- control_plane/src/storage_controller.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 2b714fbfbf10..0c0e67dff057 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -346,7 +346,14 @@ impl StorageController { let pg_log_path = pg_data_path.join("postgres.log"); if !tokio::fs::try_exists(&pg_data_path).await? { - let initdb_args = ["-D", pg_data_path.as_ref(), "--username", &username()]; + let initdb_args = [ + "-D", + pg_data_path.as_ref(), + "--username", + &username(), + "--no-sync", + "--no-instructions", + ]; tracing::info!( "Initializing storage controller database with args: {:?}", initdb_args From 2f7cecaf6a92e29df0a576b793820899e889ba81 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 23 Sep 2024 20:48:43 +0300 Subject: [PATCH 05/20] test: Poll pageserver availability more aggressively at test startup Even with the 100 ms interval, on my laptop the pageserver always becomes available on second attempt, so this saves about 900 ms at every test startup. --- test_runner/fixtures/neon_fixtures.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 55c1423ed0d5..8c178ae63a50 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2553,7 +2553,7 @@ def poll_node_status( desired_availability: Optional[PageserverAvailability], desired_scheduling_policy: Optional[PageserverSchedulingPolicy], max_attempts: int, - backoff: int, + backoff: float, ): """ Poll the node status until it reaches 'desired_scheduling_policy' and 'desired_availability' @@ -2948,7 +2948,7 @@ def start( self.id ): self.env.storage_controller.poll_node_status( - self.id, PageserverAvailability.ACTIVE, None, max_attempts=20, backoff=1 + self.id, PageserverAvailability.ACTIVE, None, max_attempts=200, backoff=0.1 ) return self From 4f67b0225bb946c32f5b9c8d1d96eafbb05295ca Mon Sep 17 00:00:00 2001 From: Yuchen Liang <70461588+yliang412@users.noreply.github.com> Date: Tue, 24 Sep 2024 12:41:38 -0400 Subject: [PATCH 06/20] pageserver: handle decompression outside vectored `read_blobs` (#8942) Part of #8130. ## Problem Currently, decompression is performed within the `read_blobs` implementation and the decompressed blob will be appended to the end of the `BytesMut` buffer. We will lose this flexibility of extending the buffer when we switch to using our own dio-aligned buffer (WIP in https://github.com/neondatabase/neon/pull/8730). To facilitate the adoption of aligned buffer, we need to refactor the code to perform decompression outside `read_blobs`. ## Summary of changes - `VectoredBlobReader::read_blobs` will return `VectoredBlob` without performing decompression and appending decompressed blob. It becomes the caller's responsibility to decompress the buffer. - Added a new `BufView` type that functions as `Cow`. - Perform decompression within `VectoredBlob::read` so that people don't have to explicitly thinking about compression when using the reader interface. Signed-off-by: Yuchen Liang --- .../src/tenant/storage_layer/delta_layer.rs | 58 +++++-- .../src/tenant/storage_layer/image_layer.rs | 41 +++-- pageserver/src/tenant/vectored_blob_io.rs | 162 ++++++++++++++---- 3 files changed, 200 insertions(+), 61 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 34f1b15138ec..2b212cfed5d7 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{ use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ - BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, + BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadCoalesceMode, VectoredReadPlanner, }; use crate::tenant::PageReconstructError; @@ -1021,13 +1021,30 @@ impl DeltaLayerInner { continue; } }; - + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter().rev() { if Some(meta.meta.key) == ignore_key_with_err { continue; } + let blob_read = meta.read(&view).await; + let blob_read = match blob_read { + Ok(buf) => buf, + Err(e) => { + reconstruct_state.on_key_error( + meta.meta.key, + PageReconstructError::Other(anyhow!(e).context(format!( + "Failed to decompress blob from virtual file {}", + self.file.path, + ))), + ); + + ignore_key_with_err = Some(meta.meta.key); + continue; + } + }; + + let value = Value::des(&blob_read); - let value = Value::des(&blobs_buf.buf[meta.start..meta.end]); let value = match value { Ok(v) => v, Err(e) => { @@ -1243,21 +1260,21 @@ impl DeltaLayerInner { buf.reserve(read.size()); let res = reader.read_blobs(&read, buf, ctx).await?; + let view = BufView::new_slice(&res.buf); + for blob in res.blobs { let key = blob.meta.key; let lsn = blob.meta.lsn; - let data = &res.buf[blob.start..blob.end]; + + let data = blob.read(&view).await?; #[cfg(debug_assertions)] - Value::des(data) + Value::des(&data) .with_context(|| { format!( - "blob failed to deserialize for {}@{}, {}..{}: {:?}", - blob.meta.key, - blob.meta.lsn, - blob.start, - blob.end, - utils::Hex(data) + "blob failed to deserialize for {}: {:?}", + blob, + utils::Hex(&data) ) }) .unwrap(); @@ -1265,15 +1282,15 @@ impl DeltaLayerInner { // is it an image or will_init walrecord? // FIXME: this could be handled by threading the BlobRef to the // VectoredReadBuilder - let will_init = crate::repository::ValueBytes::will_init(data) + let will_init = crate::repository::ValueBytes::will_init(&data) .inspect_err(|_e| { #[cfg(feature = "testing")] - tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value"); + tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value"); }) .unwrap_or(false); per_blob_copy.clear(); - per_blob_copy.extend_from_slice(data); + per_blob_copy.extend_from_slice(&data); let (tmp, res) = writer .put_value_bytes( @@ -1538,8 +1555,11 @@ impl<'a> DeltaLayerIterator<'a> { .read_blobs(&plan, buf, self.ctx) .await?; let frozen_buf = blobs_buf.buf.freeze(); + let view = BufView::new_bytes(frozen_buf); for meta in blobs_buf.blobs.iter() { - let value = Value::des(&frozen_buf[meta.start..meta.end])?; + let blob_read = meta.read(&view).await?; + let value = Value::des(&blob_read)?; + next_batch.push_back((meta.meta.key, meta.meta.lsn, value)); } self.key_values_batch = next_batch; @@ -1916,9 +1936,13 @@ pub(crate) mod test { let blobs_buf = vectored_blob_reader .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx) .await?; + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { - let value = &blobs_buf.buf[meta.start..meta.end]; - assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]); + let value = meta.read(&view).await?; + assert_eq!( + &value[..], + &entries_meta.index[&(meta.meta.key, meta.meta.lsn)] + ); } buf = Some(blobs_buf.buf); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 5de2582ab79f..940d169db096 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -36,7 +36,8 @@ use crate::tenant::disk_btree::{ }; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ - BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner, + BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, + VectoredReadPlanner, }; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; @@ -547,15 +548,15 @@ impl ImageLayerInner { let buf = BytesMut::with_capacity(buf_size); let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?; - let frozen_buf = blobs_buf.buf.freeze(); + let view = BufView::new_bytes(frozen_buf); for meta in blobs_buf.blobs.iter() { - let img_buf = frozen_buf.slice(meta.start..meta.end); + let img_buf = meta.read(&view).await?; key_count += 1; writer - .put_image(meta.meta.key, img_buf, ctx) + .put_image(meta.meta.key, img_buf.into_bytes(), ctx) .await .context(format!("Storing key {}", meta.meta.key))?; } @@ -602,13 +603,28 @@ impl ImageLayerInner { match res { Ok(blobs_buf) => { let frozen_buf = blobs_buf.buf.freeze(); - + let view = BufView::new_bytes(frozen_buf); for meta in blobs_buf.blobs.iter() { - let img_buf = frozen_buf.slice(meta.start..meta.end); + let img_buf = meta.read(&view).await; + + let img_buf = match img_buf { + Ok(img_buf) => img_buf, + Err(e) => { + reconstruct_state.on_key_error( + meta.meta.key, + PageReconstructError::Other(anyhow!(e).context(format!( + "Failed to decompress blob from virtual file {}", + self.file.path, + ))), + ); + + continue; + } + }; reconstruct_state.update_key( &meta.meta.key, self.lsn, - Value::Image(img_buf), + Value::Image(img_buf.into_bytes()), ); } } @@ -1025,10 +1041,15 @@ impl<'a> ImageLayerIterator<'a> { let blobs_buf = vectored_blob_reader .read_blobs(&plan, buf, self.ctx) .await?; - let frozen_buf: Bytes = blobs_buf.buf.freeze(); + let frozen_buf = blobs_buf.buf.freeze(); + let view = BufView::new_bytes(frozen_buf); for meta in blobs_buf.blobs.iter() { - let img_buf = frozen_buf.slice(meta.start..meta.end); - next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf))); + let img_buf = meta.read(&view).await?; + next_batch.push_back(( + meta.meta.key, + self.image_layer.lsn, + Value::Image(img_buf.into_bytes()), + )); } self.key_values_batch = next_batch; Ok(()) diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 553edf6d8b34..aa37a45898bd 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -16,8 +16,9 @@ //! Note that the vectored blob api does *not* go through the page cache. use std::collections::BTreeMap; +use std::ops::Deref; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use pageserver_api::key::Key; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::BoundedBuf; @@ -35,11 +36,123 @@ pub struct BlobMeta { pub lsn: Lsn, } -/// Blob offsets into [`VectoredBlobsBuf::buf`] +/// A view into the vectored blobs read buffer. +#[derive(Clone, Debug)] +pub(crate) enum BufView<'a> { + Slice(&'a [u8]), + Bytes(bytes::Bytes), +} + +impl<'a> BufView<'a> { + /// Creates a new slice-based view on the blob. + pub fn new_slice(slice: &'a [u8]) -> Self { + Self::Slice(slice) + } + + /// Creates a new [`bytes::Bytes`]-based view on the blob. + pub fn new_bytes(bytes: bytes::Bytes) -> Self { + Self::Bytes(bytes) + } + + /// Convert the view into `Bytes`. + /// + /// If using slice as the underlying storage, the copy will be an O(n) operation. + pub fn into_bytes(self) -> Bytes { + match self { + BufView::Slice(slice) => Bytes::copy_from_slice(slice), + BufView::Bytes(bytes) => bytes, + } + } + + /// Creates a sub-view of the blob based on the range. + fn view(&self, range: std::ops::Range) -> Self { + match self { + BufView::Slice(slice) => BufView::Slice(&slice[range]), + BufView::Bytes(bytes) => BufView::Bytes(bytes.slice(range)), + } + } +} + +impl<'a> Deref for BufView<'a> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + match self { + BufView::Slice(slice) => slice, + BufView::Bytes(bytes) => bytes, + } + } +} + +impl<'a> AsRef<[u8]> for BufView<'a> { + fn as_ref(&self) -> &[u8] { + match self { + BufView::Slice(slice) => slice, + BufView::Bytes(bytes) => bytes.as_ref(), + } + } +} + +impl<'a> From<&'a [u8]> for BufView<'a> { + fn from(value: &'a [u8]) -> Self { + Self::new_slice(value) + } +} + +impl From for BufView<'_> { + fn from(value: Bytes) -> Self { + Self::new_bytes(value) + } +} + +/// Blob offsets into [`VectoredBlobsBuf::buf`]. The byte ranges is potentially compressed, +/// subject to [`VectoredBlob::compression_bits`]. pub struct VectoredBlob { - pub start: usize, - pub end: usize, + /// Blob metadata. pub meta: BlobMeta, + /// Start offset. + start: usize, + /// End offset. + end: usize, + /// Compression used on the the blob. + compression_bits: u8, +} + +impl VectoredBlob { + /// Reads a decompressed view of the blob. + pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result, std::io::Error> { + let view = buf.view(self.start..self.end); + + match self.compression_bits { + BYTE_UNCOMPRESSED => Ok(view), + BYTE_ZSTD => { + let mut decompressed_vec = Vec::new(); + let mut decoder = + async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec); + decoder.write_all(&view).await?; + decoder.flush().await?; + // Zero-copy conversion from `Vec` to `Bytes` + Ok(BufView::new_bytes(Bytes::from(decompressed_vec))) + } + bits => { + let error = std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}", self.meta.key, self.meta.lsn, self.start, self.end), + ); + Err(error) + } + } + } +} + +impl std::fmt::Display for VectoredBlob { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}@{}, {}..{}", + self.meta.key, self.meta.lsn, self.start, self.end + ) + } } /// Return type of [`VectoredBlobReader::read_blobs`] @@ -514,7 +627,7 @@ impl<'a> VectoredBlobReader<'a> { ); } - let mut buf = self + let buf = self .file .read_exact_at(buf.slice(0..read.size()), read.start, ctx) .await? @@ -529,9 +642,6 @@ impl<'a> VectoredBlobReader<'a> { // of a blob is implicit: the start of the next blob if one exists // or the end of the read. - // Some scratch space, put here for reusing the allocation - let mut decompressed_vec = Vec::new(); - for (blob_start, meta) in blobs_at { let blob_start_in_buf = blob_start - start_offset; let first_len_byte = buf[blob_start_in_buf as usize]; @@ -557,35 +667,14 @@ impl<'a> VectoredBlobReader<'a> { ) }; - let start_raw = blob_start_in_buf + size_length; - let end_raw = start_raw + blob_size; - let (start, end); - if compression_bits == BYTE_UNCOMPRESSED { - start = start_raw as usize; - end = end_raw as usize; - } else if compression_bits == BYTE_ZSTD { - let mut decoder = - async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec); - decoder - .write_all(&buf[start_raw as usize..end_raw as usize]) - .await?; - decoder.flush().await?; - start = buf.len(); - buf.extend_from_slice(&decompressed_vec); - end = buf.len(); - decompressed_vec.clear(); - } else { - let error = std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("invalid compression byte {compression_bits:x}"), - ); - return Err(error); - } + let start = (blob_start_in_buf + size_length) as usize; + let end = start + blob_size as usize; metas.push(VectoredBlob { start, end, meta: *meta, + compression_bits, }); } @@ -1020,8 +1109,13 @@ mod tests { let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?; assert_eq!(result.blobs.len(), 1); let read_blob = &result.blobs[0]; - let read_buf = &result.buf[read_blob.start..read_blob.end]; - assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}"); + let view = BufView::new_slice(&result.buf); + let read_buf = read_blob.read(&view).await?; + assert_eq!( + &blob[..], + &read_buf[..], + "mismatch for idx={idx} at offset={offset}" + ); buf = result.buf; } Ok(()) From c47f355ec1d35401d227f02518c24bb19d051085 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 24 Sep 2024 19:28:56 +0200 Subject: [PATCH 07/20] Catch Cancelled and don't print a warning for it (#9121) In the `imitate_synthetic_size_calculation_worker` function, we might obtain the `Cancelled` error variant instead of hitting the cancellation token based path. Therefore, catch `Cancelled` and handle it analogously to the cancellation case. Fixes #8886. --- pageserver/src/tenant/timeline/eviction_task.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 2f6cb4d73a69..26c2861b9308 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -30,8 +30,8 @@ use crate::{ pgdatadir_mapping::CollectKeySpaceError, task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ - storage_layer::LayerVisibilityHint, tasks::BackgroundLoopKind, timeline::EvictionError, - LogicalSizeCalculationCause, Tenant, + size::CalculateSyntheticSizeError, storage_layer::LayerVisibilityHint, + tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant, }, }; @@ -557,6 +557,8 @@ impl Timeline { gather_result = gather => { match gather_result { Ok(_) => {}, + // It can happen sometimes that we hit this instead of the cancellation token firing above + Err(CalculateSyntheticSizeError::Cancelled) => {} Err(e) => { // We don't care about the result, but, if it failed, we should log it, // since consumption metric might be hitting the cached value and From 523cf71721128ad6f58bfce3952fb33fe0086a8c Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Tue, 24 Sep 2024 19:11:31 +0100 Subject: [PATCH 08/20] Fix compiler warnings on macOS (#9128) ## Problem Compilation of neon extension on macOS produces a warning ``` pgxn/neon/neon_perf_counters.c:50:1: error: non-void function does not return a value [-Werror,-Wreturn-type] ``` ## Summary of changes - Change the return type of `NeonPerfCountersShmemInit` to void --- pgxn/neon/neon_perf_counters.c | 2 +- pgxn/neon/neon_perf_counters.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pgxn/neon/neon_perf_counters.c b/pgxn/neon/neon_perf_counters.c index 3e86d5b26276..de653826c019 100644 --- a/pgxn/neon/neon_perf_counters.c +++ b/pgxn/neon/neon_perf_counters.c @@ -32,7 +32,7 @@ NeonPerfCountersShmemSize(void) return size; } -bool +void NeonPerfCountersShmemInit(void) { bool found; diff --git a/pgxn/neon/neon_perf_counters.h b/pgxn/neon/neon_perf_counters.h index ae35e8c3a515..02163ada5571 100644 --- a/pgxn/neon/neon_perf_counters.h +++ b/pgxn/neon/neon_perf_counters.h @@ -105,7 +105,7 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared; extern void inc_getpage_wait(uint64 latency); extern Size NeonPerfCountersShmemSize(void); -extern bool NeonPerfCountersShmemInit(void); +extern void NeonPerfCountersShmemInit(void); #endif /* NEON_PERF_COUNTERS_H */ From af5c54ed14f34dfee477659af39628c0d7ec3502 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 24 Sep 2024 23:38:16 +0300 Subject: [PATCH 09/20] test: Make test_lfc_resize more robust (#9117) 1. Increase statement_timeout. It defaults to 120 s, which is not quite enough on slow or busy systems with debug build. On my laptop, the index creation takes about 100 s. On buildfarm, we've seen failures, e.g: https://neon-github-public-dev.s3.amazonaws.com/reports/pr-9084/10997888708/index.html#suites/821f97908a487f1d7d3a2a4dd1571e99/db1834bddfe8c5b9/ 2. Keep twiddling the LFC size through the whole test. Before, we would do it for the first 10 seconds, but that only covers a small part of the pgbench initialization phase. Change the loop so that the pgbench run time determines how long the test runs, and we keep changing the LFC for the whole time. In the passing, also fix bogus test description, copy-pasted from a completely unrelated test. --- test_runner/regress/test_lfc_resize.py | 45 +++++++++++++++++++------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/test_runner/regress/test_lfc_resize.py b/test_runner/regress/test_lfc_resize.py index cb0b30d9c6e8..0f791e924707 100644 --- a/test_runner/regress/test_lfc_resize.py +++ b/test_runner/regress/test_lfc_resize.py @@ -10,11 +10,11 @@ from fixtures.neon_fixtures import NeonEnv, PgBin -# -# Test branching, when a transaction is in prepared state -# @pytest.mark.timeout(600) def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin): + """ + Test resizing the Local File Cache + """ env = neon_simple_env endpoint = env.endpoints.create_start( "main", @@ -32,27 +32,48 @@ def run_pgbench(connstr: str): pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr]) pg_bin.run_capture(["pgbench", "-c10", f"-T{n_resize}", "-Mprepared", "-S", connstr]) - thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True) + # Initializing the pgbench database can be very slow, especially on debug builds. + connstr = endpoint.connstr(options="-cstatement_timeout=300s") + + thread = threading.Thread(target=run_pgbench, args=(connstr,), daemon=True) thread.start() conn = endpoint.connect() cur = conn.cursor() - for _ in range(n_resize): + # For as long as pgbench is running, twiddle the LFC size once a second. + # Note that we launch this immediately, already while the "pgbench -i" + # initialization step is still running. That's quite a different workload + # than the actual pgbench benchamark run, so this gives us coverage of both. + while thread.is_alive(): size = random.randint(1, 512) cur.execute(f"alter system set neon.file_cache_size_limit='{size}MB'") cur.execute("select pg_reload_conf()") time.sleep(1) + thread.join() + # At the end, set it at 100 MB, and perform a final check that the disk usage + # of the file is in that ballbark. + # + # We retry the check a few times, because it might take a while for the + # system to react to changing the setting and shrinking the file. cur.execute("alter system set neon.file_cache_size_limit='100MB'") cur.execute("select pg_reload_conf()") + nretries = 10 + while True: + lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache" + lfc_file_size = os.path.getsize(lfc_file_path) + res = subprocess.run( + ["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True + ) + lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0] + log.info(f"Size of LFC file {lfc_file_size}, blocks {lfc_file_blocks}") + assert lfc_file_size <= 512 * 1024 * 1024 - thread.join() + if int(lfc_file_blocks) <= 128 * 1024 or nretries == 0: + break + + nretries = nretries - 1 + time.sleep(1) - lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache" - lfc_file_size = os.path.getsize(lfc_file_path) - res = subprocess.run(["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True) - lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0] - log.info(f"Size of LFC file {lfc_file_size}, blocks {lfc_file_blocks}") - assert lfc_file_size <= 512 * 1024 * 1024 assert int(lfc_file_blocks) <= 128 * 1024 From 5cbf5b45ae337cc643812a2e6bb76e6eb79142e4 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 24 Sep 2024 23:58:54 +0300 Subject: [PATCH 10/20] Remove TenantState::Loading (#9118) The last real use was removed in commit de90bf4663. It was still used in a few unit tests, but they can use Attaching too. --- libs/pageserver_api/src/models.rs | 27 +++++---------------------- pageserver/src/tenant.rs | 21 +++++---------------- 2 files changed, 10 insertions(+), 38 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index c9be53f0b0c0..45abda0ad85d 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -37,14 +37,11 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; /// ```mermaid /// stateDiagram-v2 /// -/// [*] --> Loading: spawn_load() /// [*] --> Attaching: spawn_attach() /// -/// Loading --> Activating: activate() /// Attaching --> Activating: activate() /// Activating --> Active: infallible /// -/// Loading --> Broken: load() failure /// Attaching --> Broken: attach() failure /// /// Active --> Stopping: set_stopping(), part of shutdown & detach @@ -68,10 +65,6 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; )] #[serde(tag = "slug", content = "data")] pub enum TenantState { - /// This tenant is being loaded from local disk. - /// - /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass. - Loading, /// This tenant is being attached to the pageserver. /// /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass. @@ -121,8 +114,6 @@ impl TenantState { // But, our attach task might still be fetching the remote timelines, etc. // So, return `Maybe` while Attaching, making Console wait for the attach task to finish. Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe, - // tenant mgr startup distinguishes attaching from loading via marker file. - Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached, // We only reach Active after successful load / attach. // So, call atttachment status Attached. Self::Active => Attached, @@ -191,10 +182,11 @@ impl LsnLease { } /// The only [`TenantState`] variants we could be `TenantState::Activating` from. +/// +/// XXX: We used to have more variants here, but now it's just one, which makes this rather +/// useless. Remove, once we've checked that there's no client code left that looks at this. #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub enum ActivatingFrom { - /// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`] - Loading, /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`] Attaching, } @@ -1562,11 +1554,8 @@ mod tests { #[test] fn tenantstatus_activating_serde() { - let states = [ - TenantState::Activating(ActivatingFrom::Loading), - TenantState::Activating(ActivatingFrom::Attaching), - ]; - let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]"; + let states = [TenantState::Activating(ActivatingFrom::Attaching)]; + let expected = "[{\"slug\":\"Activating\",\"data\":\"Attaching\"}]"; let actual = serde_json::to_string(&states).unwrap(); @@ -1581,13 +1570,7 @@ mod tests { fn tenantstatus_activating_strum() { // tests added, because we use these for metrics let examples = [ - (line!(), TenantState::Loading, "Loading"), (line!(), TenantState::Attaching, "Attaching"), - ( - line!(), - TenantState::Activating(ActivatingFrom::Loading), - "Activating", - ), ( line!(), TenantState::Activating(ActivatingFrom::Attaching), diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 5ed63734f494..53cbaea621eb 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1968,9 +1968,6 @@ impl Tenant { TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => { panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state); } - TenantState::Loading => { - *current_state = TenantState::Activating(ActivatingFrom::Loading); - } TenantState::Attaching => { *current_state = TenantState::Activating(ActivatingFrom::Attaching); } @@ -2151,7 +2148,7 @@ impl Tenant { async fn set_stopping( &self, progress: completion::Barrier, - allow_transition_from_loading: bool, + _allow_transition_from_loading: bool, allow_transition_from_attaching: bool, ) -> Result<(), SetStoppingError> { let mut rx = self.state.subscribe(); @@ -2166,7 +2163,6 @@ impl Tenant { ); false } - TenantState::Loading => allow_transition_from_loading, TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true, }) .await @@ -2185,13 +2181,6 @@ impl Tenant { *current_state = TenantState::Stopping { progress }; true } - TenantState::Loading => { - if !allow_transition_from_loading { - unreachable!("3we ensured above that we're done with activation, and, there is no re-activation") - }; - *current_state = TenantState::Stopping { progress }; - true - } TenantState::Active => { // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines // are created after the transition to Stopping. That's harmless, as the Timelines @@ -2247,7 +2236,7 @@ impl Tenant { // The load & attach routines own the tenant state until it has reached `Active`. // So, wait until it's done. rx.wait_for(|state| match state { - TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => { + TenantState::Activating(_) | TenantState::Attaching => { info!( "waiting for {} to turn Active|Broken|Stopping", <&'static str>::from(state) @@ -2267,7 +2256,7 @@ impl Tenant { let reason = reason.to_string(); self.state.send_modify(|current_state| { match *current_state { - TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => { + TenantState::Activating(_) | TenantState::Attaching => { unreachable!("we ensured above that we're done with activation, and, there is no re-activation") } TenantState::Active => { @@ -2311,7 +2300,7 @@ impl Tenant { loop { let current_state = receiver.borrow_and_update().clone(); match current_state { - TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => { + TenantState::Attaching | TenantState::Activating(_) => { // in these states, there's a chance that we can reach ::Active self.activate_now(); match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await { @@ -4144,7 +4133,7 @@ pub(crate) mod harness { let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager)); let tenant = Arc::new(Tenant::new( - TenantState::Loading, + TenantState::Attaching, self.conf, AttachedTenantConf::try_from(LocationConf::attached_single( TenantConfOpt::from(self.tenant_conf.clone()), From 938b163b42d614ecc747931e35380b27bf6e1e62 Mon Sep 17 00:00:00 2001 From: Damian972 <25445518+Damian972@users.noreply.github.com> Date: Wed, 25 Sep 2024 00:05:23 +0200 Subject: [PATCH 11/20] chore(docker-compose): fix typo in readme (#9133) Typo in the readme inside docker-compose folder ## Summary of changes - Update the readme --- docker-compose/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose/README.md b/docker-compose/README.md index bd47805a6791..648e4ca030c6 100644 --- a/docker-compose/README.md +++ b/docker-compose/README.md @@ -2,8 +2,8 @@ # Example docker compose configuration The configuration in this directory is used for testing Neon docker images: it is -not intended for deploying a usable system. To run a development environment where -you can experiment with a minature Neon system, use `cargo neon` rather than container images. +not intended for deploying a usable system. To run a development environment where +you can experiment with a miniature Neon system, use `cargo neon` rather than container images. This configuration does not start the storage controller, because the controller needs a way to reconfigure running computes, and no such thing exists in this setup. From 5f2f31e87933be05bd93a239ddc66764ff877546 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Tue, 24 Sep 2024 18:33:03 -0400 Subject: [PATCH 12/20] fix(test): storage scrubber should only log to stdout with info (#9067) As @koivunej mentioned in the storage channel, for regress test, we don't need to create a log file for the scrubber, and we should reduce noisy logs. ## Summary of changes * Disable log file creation for storage scrubber * Only log at info level --------- Signed-off-by: Alex Chi Z --- test_runner/fixtures/neon_fixtures.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 8c178ae63a50..201eb1087ded 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4617,7 +4617,8 @@ def scrubber_cli( "REGION": s3_storage.bucket_region, "BUCKET": s3_storage.bucket_name, "BUCKET_PREFIX": s3_storage.prefix_in_bucket, - "RUST_LOG": "DEBUG", + "RUST_LOG": "INFO", + "PAGESERVER_DISABLE_FILE_LOGGING": "1", } env.update(s3_storage.access_env_vars()) @@ -4637,10 +4638,8 @@ def scrubber_cli( (output_path, stdout, status_code) = subprocess_capture( self.log_dir, args, - echo_stderr=True, - echo_stdout=True, env=env, - check=False, + check=True, capture_stdout=True, timeout=timeout, ) From a26cc29d92c8626750a994cb7d50fb796214f51e Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 25 Sep 2024 10:16:06 +0100 Subject: [PATCH 13/20] storcon: add tags to scheduler logs (#9127) We log something at info level each time we schedule a shard to a non-secondary location. Might as well have context for it. --- storage_controller/src/tenant_shard.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 1f5eb423be8b..eccde0e3ab60 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -548,6 +548,11 @@ impl TenantShard { } } + #[instrument(skip_all, fields( + tenant_id=%self.tenant_shard_id.tenant_id, + shard_id=%self.tenant_shard_id.shard_slug(), + sequence=%self.sequence + ))] pub(crate) fn schedule( &mut self, scheduler: &mut Scheduler, From 7dcfcccf7cf03e65dfea6b7b5884f34da1686660 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Wed, 25 Sep 2024 14:38:35 +0200 Subject: [PATCH 14/20] Re-export git-version from utils and remove as direct dep (#9138) --- Cargo.lock | 10 +--------- control_plane/Cargo.toml | 1 - libs/utils/Cargo.toml | 1 + libs/utils/src/lib.rs | 6 +++++- pageserver/Cargo.toml | 1 - pageserver/compaction/Cargo.toml | 1 - pageserver/ctl/Cargo.toml | 1 - proxy/Cargo.toml | 1 - safekeeper/Cargo.toml | 1 - storage_broker/Cargo.toml | 1 - storage_controller/Cargo.toml | 1 - storage_scrubber/Cargo.toml | 1 - 12 files changed, 7 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4dbd8b33398..d0702e09d412 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1321,7 +1321,6 @@ dependencies = [ "clap", "comfy-table", "compute_api", - "git-version", "humantime", "humantime-serde", "hyper 0.14.30", @@ -3578,7 +3577,6 @@ dependencies = [ "anyhow", "camino", "clap", - "git-version", "humantime", "pageserver", "pageserver_api", @@ -3617,7 +3615,6 @@ dependencies = [ "enumset", "fail", "futures", - "git-version", "hex", "hex-literal", "humantime", @@ -3737,7 +3734,6 @@ dependencies = [ "clap", "criterion", "futures", - "git-version", "hex-literal", "itertools 0.10.5", "once_cell", @@ -4307,7 +4303,6 @@ dependencies = [ "fallible-iterator", "framed-websockets", "futures", - "git-version", "hashbrown 0.14.5", "hashlink", "hex", @@ -5139,7 +5134,6 @@ dependencies = [ "desim", "fail", "futures", - "git-version", "hex", "humantime", "hyper 0.14.30", @@ -5702,7 +5696,6 @@ dependencies = [ "futures", "futures-core", "futures-util", - "git-version", "humantime", "hyper 0.14.30", "metrics", @@ -5730,7 +5723,6 @@ dependencies = [ "diesel_migrations", "fail", "futures", - "git-version", "hex", "humantime", "hyper 0.14.30", @@ -5783,7 +5775,6 @@ dependencies = [ "either", "futures", "futures-util", - "git-version", "hex", "humantime", "itertools 0.10.5", @@ -6715,6 +6706,7 @@ dependencies = [ "criterion", "fail", "futures", + "git-version", "hex", "hex-literal", "humantime", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index c185d20484a4..df87c181bf47 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -9,7 +9,6 @@ anyhow.workspace = true camino.workspace = true clap.workspace = true comfy-table.workspace = true -git-version.workspace = true humantime.workspace = true nix.workspace = true once_cell.workspace = true diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index f199b155540f..7d284a6fc567 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -19,6 +19,7 @@ bincode.workspace = true bytes.workspace = true camino.workspace = true chrono.workspace = true +git-version.workspace = true hex = { workspace = true, features = ["serde"] } humantime.workspace = true hyper = { workspace = true, features = ["full"] } diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 03fb36caf8b6..aacc1e1dd5e8 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -92,6 +92,10 @@ pub mod toml_edit_ext; pub mod circuit_breaker; +// Re-export used in macro. Avoids adding git-version as dep in target crates. +#[doc(hidden)] +pub use git_version; + /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages /// /// we have several cases: @@ -131,7 +135,7 @@ macro_rules! project_git_version { ($const_identifier:ident) => { // this should try GIT_VERSION first only then git_version::git_version! const $const_identifier: &::core::primitive::str = { - const __COMMIT_FROM_GIT: &::core::primitive::str = git_version::git_version! { + const __COMMIT_FROM_GIT: &::core::primitive::str = $crate::git_version::git_version! { prefix = "", fallback = "unknown", args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 0eb48d6823b8..f1fc3a86fe4b 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -27,7 +27,6 @@ crc32c.workspace = true either.workspace = true fail.workspace = true futures.workspace = true -git-version.workspace = true hex.workspace = true humantime.workspace = true humantime-serde.workspace = true diff --git a/pageserver/compaction/Cargo.toml b/pageserver/compaction/Cargo.toml index 52b58fc298ce..d4f89ac38a19 100644 --- a/pageserver/compaction/Cargo.toml +++ b/pageserver/compaction/Cargo.toml @@ -12,7 +12,6 @@ anyhow.workspace = true async-stream.workspace = true clap = { workspace = true, features = ["string"] } futures.workspace = true -git-version.workspace = true itertools.workspace = true once_cell.workspace = true pageserver_api.workspace = true diff --git a/pageserver/ctl/Cargo.toml b/pageserver/ctl/Cargo.toml index 9592002de131..a753f806a076 100644 --- a/pageserver/ctl/Cargo.toml +++ b/pageserver/ctl/Cargo.toml @@ -10,7 +10,6 @@ license.workspace = true anyhow.workspace = true camino.workspace = true clap = { workspace = true, features = ["string"] } -git-version.workspace = true humantime.workspace = true pageserver = { path = ".." } pageserver_api.workspace = true diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 6703eb06eb29..501ce050e071 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -29,7 +29,6 @@ dashmap.workspace = true env_logger.workspace = true framed-websockets.workspace = true futures.workspace = true -git-version.workspace = true hashbrown.workspace = true hashlink.workspace = true hex.workspace = true diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index daf21c70b045..67f32b3cc08b 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -21,7 +21,6 @@ chrono.workspace = true clap = { workspace = true, features = ["derive"] } crc32c.workspace = true fail.workspace = true -git-version.workspace = true hex.workspace = true humantime.workspace = true hyper.workspace = true diff --git a/storage_broker/Cargo.toml b/storage_broker/Cargo.toml index 82ec0aa272e3..5359f586e49d 100644 --- a/storage_broker/Cargo.toml +++ b/storage_broker/Cargo.toml @@ -15,7 +15,6 @@ const_format.workspace = true futures.workspace = true futures-core.workspace = true futures-util.workspace = true -git-version.workspace = true humantime.workspace = true hyper = { workspace = true, features = ["full"] } once_cell.workspace = true diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index a96d64e09670..9ed0501026dc 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -20,7 +20,6 @@ chrono.workspace = true clap.workspace = true fail.workspace = true futures.workspace = true -git-version.workspace = true hex.workspace = true hyper.workspace = true humantime.workspace = true diff --git a/storage_scrubber/Cargo.toml b/storage_scrubber/Cargo.toml index f9987662b9f5..a1b5b0b12f19 100644 --- a/storage_scrubber/Cargo.toml +++ b/storage_scrubber/Cargo.toml @@ -8,7 +8,6 @@ license.workspace = true aws-sdk-s3.workspace = true either.workspace = true anyhow.workspace = true -git-version.workspace = true hex.workspace = true humantime.workspace = true serde.workspace = true From 2cf47b1477d281a868deab4914aceb53e37a22e9 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 25 Sep 2024 14:31:04 +0100 Subject: [PATCH 15/20] storcon: do az aware scheduling (#9083) ## Problem Storage controller didn't previously consider AZ locality between compute and pageservers when scheduling nodes. Control plane has this feature, and, since we are migrating tenants away from it, we need feature parity to avoid perf degradations. ## Summary of changes The change itself is fairly simple: 1. Thread az info into the scheduler 2. Add an extra member to the scheduling scores Step (2) deserves some more discussion. Let's break it down by the shard type being scheduled: **Attached Shards** We wish for attached shards of a tenant to end up in the preferred AZ of the tenant since that is where the compute is like to be. The AZ member for `NodeAttachmentSchedulingScore` has been placed below the affinity score (so it's got the second biggest weight for picking the node). The rationale for going below the affinity score is to avoid having all shards of a single tenant placed on the same node in 2 node regions, since that would mean that one tenant can drive the general workload of an entire pageserver. I'm not 100% sure this is the right decision, so open to discussing hoisting the AZ up to first place. **Secondary Shards** We wish for secondary shards of a tenant to be scheduled in a different AZ from the preferred one for HA purposes. The AZ member for `NodeSecondarySchedulingScore` has been placed first, so nodes in different AZs from the preferred one will always be considered first. On small clusters, this can mean that all the secondaries of a tenant are scheduled to the same pageserver, but secondaries don't use up as many resources as the attached location, so IMO the argument made for attached shards doesn't hold. Related: https://github.com/neondatabase/neon/issues/8848 --- control_plane/storcon_cli/src/main.rs | 6 +- libs/pageserver_api/src/controller_api.rs | 14 +- pageserver/src/control_plane_client.rs | 6 +- storage_controller/src/node.rs | 16 +- storage_controller/src/persistence.rs | 7 +- storage_controller/src/scheduler.rs | 230 ++++++++++++++++++++-- storage_controller/src/service.rs | 16 +- storage_controller/src/tenant_shard.rs | 215 +++++++++++++++++--- 8 files changed, 445 insertions(+), 65 deletions(-) diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 651fcda8db52..73d89699edb6 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -4,8 +4,8 @@ use std::{str::FromStr, time::Duration}; use clap::{Parser, Subcommand}; use pageserver_api::{ controller_api::{ - NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, ShardSchedulingPolicy, - TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest, + AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, + ShardSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest, }, models::{ EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary, @@ -339,7 +339,7 @@ async fn main() -> anyhow::Result<()> { listen_pg_port, listen_http_addr, listen_http_port, - availability_zone_id, + availability_zone_id: AvailabilityZone(availability_zone_id), }), ) .await?; diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 40b7dbbbc2af..0ea30ce54f78 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::fmt::Display; use std::str::FromStr; use std::time::{Duration, Instant}; @@ -57,7 +58,7 @@ pub struct NodeRegisterRequest { pub listen_http_addr: String, pub listen_http_port: u16, - pub availability_zone_id: String, + pub availability_zone_id: AvailabilityZone, } #[derive(Serialize, Deserialize)] @@ -74,10 +75,19 @@ pub struct TenantPolicyRequest { pub scheduling: Option, } +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct AvailabilityZone(pub String); + +impl Display for AvailabilityZone { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + #[derive(Serialize, Deserialize)] pub struct ShardsPreferredAzsRequest { #[serde(flatten)] - pub preferred_az_ids: HashMap, + pub preferred_az_ids: HashMap, } #[derive(Serialize, Deserialize)] diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs index f6d1c35a8ce1..d0a967b9207e 100644 --- a/pageserver/src/control_plane_client.rs +++ b/pageserver/src/control_plane_client.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use futures::Future; use pageserver_api::{ - controller_api::NodeRegisterRequest, + controller_api::{AvailabilityZone, NodeRegisterRequest}, shard::TenantShardId, upcall_api::{ ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, @@ -148,10 +148,10 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient { .and_then(|jv| jv.as_str().map(|str| str.to_owned())); match az_id_from_metadata { - Some(az_id) => Some(az_id), + Some(az_id) => Some(AvailabilityZone(az_id)), None => { tracing::warn!("metadata.json does not contain an 'availability_zone_id' field"); - conf.availability_zone.clone() + conf.availability_zone.clone().map(AvailabilityZone) } } }; diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index cb9ce10d230a..4cc9b0070dc7 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -2,8 +2,8 @@ use std::{str::FromStr, time::Duration}; use pageserver_api::{ controller_api::{ - NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy, - TenantLocateResponseShard, + AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, + NodeSchedulingPolicy, TenantLocateResponseShard, }, shard::TenantShardId, }; @@ -36,7 +36,7 @@ pub(crate) struct Node { listen_pg_addr: String, listen_pg_port: u16, - availability_zone_id: String, + availability_zone_id: AvailabilityZone, // This cancellation token means "stop any RPCs in flight to this node, and don't start // any more". It is not related to process shutdown. @@ -64,8 +64,8 @@ impl Node { } #[allow(unused)] - pub(crate) fn get_availability_zone_id(&self) -> &str { - self.availability_zone_id.as_str() + pub(crate) fn get_availability_zone_id(&self) -> &AvailabilityZone { + &self.availability_zone_id } pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy { @@ -181,7 +181,7 @@ impl Node { listen_http_port: u16, listen_pg_addr: String, listen_pg_port: u16, - availability_zone_id: String, + availability_zone_id: AvailabilityZone, ) -> Self { Self { id, @@ -204,7 +204,7 @@ impl Node { listen_http_port: self.listen_http_port as i32, listen_pg_addr: self.listen_pg_addr.clone(), listen_pg_port: self.listen_pg_port as i32, - availability_zone_id: self.availability_zone_id.clone(), + availability_zone_id: self.availability_zone_id.0.clone(), } } @@ -219,7 +219,7 @@ impl Node { listen_http_port: np.listen_http_port as u16, listen_pg_addr: np.listen_pg_addr, listen_pg_port: np.listen_pg_port as u16, - availability_zone_id: np.availability_zone_id, + availability_zone_id: AvailabilityZone(np.availability_zone_id), cancel: CancellationToken::new(), } } diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 1dc1040d9637..14cc51240d10 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -9,6 +9,7 @@ use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::Connection; use itertools::Itertools; +use pageserver_api::controller_api::AvailabilityZone; use pageserver_api::controller_api::MetadataHealthRecord; use pageserver_api::controller_api::ShardSchedulingPolicy; use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy}; @@ -667,8 +668,8 @@ impl Persistence { pub(crate) async fn set_tenant_shard_preferred_azs( &self, - preferred_azs: Vec<(TenantShardId, String)>, - ) -> DatabaseResult> { + preferred_azs: Vec<(TenantShardId, AvailabilityZone)>, + ) -> DatabaseResult> { use crate::schema::tenant_shards::dsl::*; self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| { @@ -679,7 +680,7 @@ impl Persistence { .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .set(preferred_az_id.eq(preferred_az)) + .set(preferred_az_id.eq(preferred_az.0.clone())) .execute(conn)?; if updated == 1 { diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 1cb1fb104d60..2414d95eb89b 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -1,6 +1,6 @@ use crate::{node::Node, tenant_shard::TenantShard}; use itertools::Itertools; -use pageserver_api::models::PageserverUtilization; +use pageserver_api::{controller_api::AvailabilityZone, models::PageserverUtilization}; use serde::Serialize; use std::{collections::HashMap, fmt::Debug}; use utils::{http::error::ApiError, id::NodeId}; @@ -32,6 +32,8 @@ pub(crate) struct SchedulerNode { shard_count: usize, /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`]. attached_shard_count: usize, + /// Availability zone id in which the node resides + az: AvailabilityZone, /// Whether this node is currently elegible to have new shards scheduled (this is derived /// from a node's availability state and scheduling policy). @@ -42,6 +44,7 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized { fn generate( node_id: &NodeId, node: &mut SchedulerNode, + preferred_az: &Option, context: &ScheduleContext, ) -> Option; fn is_overloaded(&self) -> bool; @@ -62,6 +65,72 @@ impl ShardTag for SecondaryShardTag { type Score = NodeSecondarySchedulingScore; } +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +enum AzMatch { + Yes, + No, + Unknown, +} + +impl AzMatch { + fn new(node_az: &AvailabilityZone, shard_preferred_az: Option<&AvailabilityZone>) -> Self { + match shard_preferred_az { + Some(preferred_az) if preferred_az == node_az => Self::Yes, + Some(_preferred_az) => Self::No, + None => Self::Unknown, + } + } +} + +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +struct AttachmentAzMatch(AzMatch); + +impl Ord for AttachmentAzMatch { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Lower scores indicate a more suitable node. + // Note that we prefer a node for which we don't have + // info to a node which we are certain doesn't match the + // preferred AZ of the shard. + let az_match_score = |az_match: &AzMatch| match az_match { + AzMatch::Yes => 0, + AzMatch::Unknown => 1, + AzMatch::No => 2, + }; + + az_match_score(&self.0).cmp(&az_match_score(&other.0)) + } +} + +impl PartialOrd for AttachmentAzMatch { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +struct SecondaryAzMatch(AzMatch); + +impl Ord for SecondaryAzMatch { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Lower scores indicate a more suitable node. + // For secondary locations we wish to avoid the preferred AZ + // of the shard. + let az_match_score = |az_match: &AzMatch| match az_match { + AzMatch::No => 0, + AzMatch::Unknown => 1, + AzMatch::Yes => 2, + }; + + az_match_score(&self.0).cmp(&az_match_score(&other.0)) + } +} + +impl PartialOrd for SecondaryAzMatch { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// Scheduling score of a given node for shard attachments. /// Lower scores indicate more suitable nodes. /// Ordering is given by member declaration order (top to bottom). @@ -70,6 +139,10 @@ pub(crate) struct NodeAttachmentSchedulingScore { /// The number of shards belonging to the tenant currently being /// scheduled that are attached to this node. affinity_score: AffinityScore, + /// Flag indicating whether this node matches the preferred AZ + /// of the shard. For equal affinity scores, nodes in the matching AZ + /// are considered first. + az_match: AttachmentAzMatch, /// Size of [`ScheduleContext::attached_nodes`] for the current node. /// This normally tracks the number of attached shards belonging to the /// tenant being scheduled that are already on this node. @@ -87,6 +160,7 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { fn generate( node_id: &NodeId, node: &mut SchedulerNode, + preferred_az: &Option, context: &ScheduleContext, ) -> Option { let utilization = match &mut node.may_schedule { @@ -102,6 +176,7 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { .get(node_id) .copied() .unwrap_or(AffinityScore::FREE), + az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())), attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0), utilization_score: utilization.cached_score(), total_attached_shard_count: node.attached_shard_count, @@ -123,6 +198,11 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { /// Ordering is given by member declaration order (top to bottom). #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] pub(crate) struct NodeSecondarySchedulingScore { + /// Flag indicating whether this node matches the preferred AZ + /// of the shard. For secondary locations we wish to avoid nodes in. + /// the preferred AZ of the shard, since that's where the attached location + /// should be scheduled and having the secondary in the same AZ is bad for HA. + az_match: SecondaryAzMatch, /// The number of shards belonging to the tenant currently being /// scheduled that are attached to this node. affinity_score: AffinityScore, @@ -139,6 +219,7 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore { fn generate( node_id: &NodeId, node: &mut SchedulerNode, + preferred_az: &Option, context: &ScheduleContext, ) -> Option { let utilization = match &mut node.may_schedule { @@ -149,6 +230,7 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore { }; Some(Self { + az_match: SecondaryAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())), affinity_score: context .nodes .get(node_id) @@ -179,6 +261,7 @@ impl PartialEq for SchedulerNode { may_schedule_matches && self.shard_count == other.shard_count && self.attached_shard_count == other.attached_shard_count + && self.az == other.az } } @@ -293,6 +376,7 @@ impl Scheduler { shard_count: 0, attached_shard_count: 0, may_schedule: node.may_schedule(), + az: node.get_availability_zone_id().clone(), }, ); } @@ -319,6 +403,7 @@ impl Scheduler { shard_count: 0, attached_shard_count: 0, may_schedule: node.may_schedule(), + az: node.get_availability_zone_id().clone(), }, ); } @@ -497,6 +582,7 @@ impl Scheduler { shard_count: 0, attached_shard_count: 0, may_schedule: node.may_schedule(), + az: node.get_availability_zone_id().clone(), }); } } @@ -542,6 +628,7 @@ impl Scheduler { fn compute_node_scores( &mut self, hard_exclude: &[NodeId], + preferred_az: &Option, context: &ScheduleContext, ) -> Vec where @@ -553,7 +640,7 @@ impl Scheduler { if hard_exclude.contains(k) { None } else { - Score::generate(k, v, context) + Score::generate(k, v, preferred_az, context) } }) .collect() @@ -571,13 +658,15 @@ impl Scheduler { pub(crate) fn schedule_shard( &mut self, hard_exclude: &[NodeId], + preferred_az: &Option, context: &ScheduleContext, ) -> Result { if self.nodes.is_empty() { return Err(ScheduleError::NoPageservers); } - let mut scores = self.compute_node_scores::(hard_exclude, context); + let mut scores = + self.compute_node_scores::(hard_exclude, preferred_az, context); // Exclude nodes whose utilization is critically high, if there are alternatives available. This will // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example @@ -634,6 +723,12 @@ impl Scheduler { Ok(node_id) } + /// Selects any available node. This is suitable for performing background work (e.g. S3 + /// deletions). + pub(crate) fn any_available_node(&mut self) -> Result { + self.schedule_shard::(&[], &None, &ScheduleContext::default()) + } + /// Unit test access to internal state #[cfg(test)] pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize { @@ -650,13 +745,22 @@ impl Scheduler { pub(crate) mod test_utils { use crate::node::Node; - use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization}; + use pageserver_api::{ + controller_api::{AvailabilityZone, NodeAvailability}, + models::utilization::test_utilization, + }; use std::collections::HashMap; use utils::id::NodeId; + /// Test helper: synthesize the requested number of nodes, all in active state. /// /// Node IDs start at one. - pub(crate) fn make_test_nodes(n: u64) -> HashMap { + /// + /// The `azs` argument specifies the list of availability zones which will be assigned + /// to nodes in round-robin fashion. If empy, a default AZ is assigned. + pub(crate) fn make_test_nodes(n: u64, azs: &[AvailabilityZone]) -> HashMap { + let mut az_iter = azs.iter().cycle(); + (1..n + 1) .map(|i| { (NodeId(i), { @@ -666,7 +770,10 @@ pub(crate) mod test_utils { 80 + i as u16, format!("pghost-{i}"), 5432 + i as u16, - "test-az".to_string(), + az_iter + .next() + .cloned() + .unwrap_or(AvailabilityZone("test-az".to_string())), ); node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0))); assert!(node.is_available()); @@ -686,7 +793,7 @@ mod tests { use crate::tenant_shard::IntentState; #[test] fn scheduler_basic() -> anyhow::Result<()> { - let nodes = test_utils::make_test_nodes(2); + let nodes = test_utils::make_test_nodes(2, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut t1_intent = IntentState::new(); @@ -694,9 +801,9 @@ mod tests { let context = ScheduleContext::default(); - let scheduled = scheduler.schedule_shard::(&[], &context)?; + let scheduled = scheduler.schedule_shard::(&[], &None, &context)?; t1_intent.set_attached(&mut scheduler, Some(scheduled)); - let scheduled = scheduler.schedule_shard::(&[], &context)?; + let scheduled = scheduler.schedule_shard::(&[], &None, &context)?; t2_intent.set_attached(&mut scheduler, Some(scheduled)); assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1); @@ -705,8 +812,11 @@ mod tests { assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1); assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1); - let scheduled = - scheduler.schedule_shard::(&t1_intent.all_pageservers(), &context)?; + let scheduled = scheduler.schedule_shard::( + &t1_intent.all_pageservers(), + &None, + &context, + )?; t1_intent.push_secondary(&mut scheduler, scheduled); assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1); @@ -746,7 +856,7 @@ mod tests { #[test] /// Test the PageserverUtilization's contribution to scheduling algorithm fn scheduler_utilization() { - let mut nodes = test_utils::make_test_nodes(3); + let mut nodes = test_utils::make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); // Need to keep these alive because they contribute to shard counts via RAII @@ -761,7 +871,7 @@ mod tests { context: &ScheduleContext, ) { let scheduled = scheduler - .schedule_shard::(&[], context) + .schedule_shard::(&[], &None, context) .unwrap(); let mut intent = IntentState::new(); intent.set_attached(scheduler, Some(scheduled)); @@ -870,4 +980,98 @@ mod tests { intent.clear(&mut scheduler); } } + + #[test] + /// A simple test that showcases AZ-aware scheduling and its interaction with + /// affinity scores. + fn az_scheduling() { + let az_a_tag = AvailabilityZone("az-a".to_string()); + let az_b_tag = AvailabilityZone("az-b".to_string()); + + let nodes = test_utils::make_test_nodes(3, &[az_a_tag.clone(), az_b_tag.clone()]); + let mut scheduler = Scheduler::new(nodes.values()); + + // Need to keep these alive because they contribute to shard counts via RAII + let mut scheduled_intents = Vec::new(); + + let mut context = ScheduleContext::default(); + + fn assert_scheduler_chooses( + expect_node: NodeId, + preferred_az: Option, + scheduled_intents: &mut Vec, + scheduler: &mut Scheduler, + context: &mut ScheduleContext, + ) { + let scheduled = scheduler + .schedule_shard::(&[], &preferred_az, context) + .unwrap(); + let mut intent = IntentState::new(); + intent.set_attached(scheduler, Some(scheduled)); + scheduled_intents.push(intent); + assert_eq!(scheduled, expect_node); + + context.avoid(&[scheduled]); + } + + assert_scheduler_chooses::( + NodeId(1), + Some(az_a_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Node 2 and 3 have affinity score equal to 0, but node 3 + // is in "az-a" so we prefer that. + assert_scheduler_chooses::( + NodeId(3), + Some(az_a_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Node 2 is not in "az-a", but it has the lowest affinity so we prefer that. + assert_scheduler_chooses::( + NodeId(2), + Some(az_a_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Avoid nodes in "az-a" for the secondary location. + assert_scheduler_chooses::( + NodeId(2), + Some(az_a_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Avoid nodes in "az-b" for the secondary location. + // Nodes 1 and 3 are identically loaded, so prefer the lowest node id. + assert_scheduler_chooses::( + NodeId(1), + Some(az_b_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Avoid nodes in "az-b" for the secondary location. + // Node 3 has lower affinity score than 1, so prefer that. + assert_scheduler_chooses::( + NodeId(3), + Some(az_b_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + for mut intent in scheduled_intents { + intent.clear(&mut scheduler); + } + } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 5555505b81d9..6a11e9650ce9 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -26,7 +26,7 @@ use crate::{ ShardGenerationState, TenantFilter, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, - scheduler::{AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, + scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, tenant_shard::{ MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization, ScheduleOptimizationAction, @@ -1265,6 +1265,8 @@ impl Service { #[cfg(feature = "testing")] { + use pageserver_api::controller_api::AvailabilityZone; + // Hack: insert scheduler state for all nodes referenced by shards, as compatibility // tests only store the shards, not the nodes. The nodes will be loaded shortly // after when pageservers start up and register. @@ -1282,7 +1284,7 @@ impl Service { 123, "".to_string(), 123, - "test_az".to_string(), + AvailabilityZone("test_az".to_string()), ); scheduler.node_upsert(&node); @@ -2099,7 +2101,7 @@ impl Service { let az_id = locked .nodes .get(&resp.node_id) - .map(|n| n.get_availability_zone_id().to_string())?; + .map(|n| n.get_availability_zone_id().clone())?; Some((resp.shard_id, az_id)) }) @@ -2629,8 +2631,7 @@ impl Service { let scheduler = &mut locked.scheduler; // Right now we only perform the operation on a single node without parallelization // TODO fan out the operation to multiple nodes for better performance - let node_id = - scheduler.schedule_shard::(&[], &ScheduleContext::default())?; + let node_id = scheduler.any_available_node()?; let node = locked .nodes .get(&node_id) @@ -2816,8 +2817,7 @@ impl Service { // Pick an arbitrary node to use for remote deletions (does not have to be where the tenant // was attached, just has to be able to see the S3 content) - let node_id = - scheduler.schedule_shard::(&[], &ScheduleContext::default())?; + let node_id = scheduler.any_available_node()?; let node = nodes .get(&node_id) .expect("Pageservers may not be deleted while lock is active"); @@ -4481,7 +4481,7 @@ impl Service { let az_id = locked .nodes .get(node_id) - .map(|n| n.get_availability_zone_id().to_string())?; + .map(|n| n.get_availability_zone_id().clone())?; Some((*tid, az_id)) }) diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index eccde0e3ab60..afc89eae0073 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -15,7 +15,7 @@ use crate::{ service::ReconcileResultRequest, }; use pageserver_api::controller_api::{ - NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, + AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, }; use pageserver_api::{ models::{LocationConfig, LocationConfigMode, TenantConfig}, @@ -146,7 +146,7 @@ pub(crate) struct TenantShard { // We should attempt to schedule this shard in the provided AZ to // decrease chances of cross-AZ compute. - preferred_az_id: Option, + preferred_az_id: Option, } #[derive(Default, Clone, Debug, Serialize)] @@ -540,8 +540,11 @@ impl TenantShard { Ok((true, promote_secondary)) } else { // Pick a fresh node: either we had no secondaries or none were schedulable - let node_id = - scheduler.schedule_shard::(&self.intent.secondary, context)?; + let node_id = scheduler.schedule_shard::( + &self.intent.secondary, + &self.preferred_az_id, + context, + )?; tracing::debug!("Selected {} as attached", node_id); self.intent.set_attached(scheduler, Some(node_id)); Ok((true, node_id)) @@ -622,8 +625,11 @@ impl TenantShard { let mut used_pageservers = vec![attached_node_id]; while self.intent.secondary.len() < secondary_count { - let node_id = scheduler - .schedule_shard::(&used_pageservers, context)?; + let node_id = scheduler.schedule_shard::( + &used_pageservers, + &self.preferred_az_id, + context, + )?; self.intent.push_secondary(scheduler, node_id); used_pageservers.push(node_id); modified = true; @@ -636,7 +642,11 @@ impl TenantShard { modified = true; } else if self.intent.secondary.is_empty() { // Populate secondary by scheduling a fresh node - let node_id = scheduler.schedule_shard::(&[], context)?; + let node_id = scheduler.schedule_shard::( + &[], + &self.preferred_az_id, + context, + )?; self.intent.push_secondary(scheduler, node_id); modified = true; } @@ -815,6 +825,7 @@ impl TenantShard { // with lower utilization. let Ok(candidate_node) = scheduler.schedule_shard::( &self.intent.all_pageservers(), + &self.preferred_az_id, schedule_context, ) else { // A scheduling error means we have no possible candidate replacements @@ -1313,7 +1324,7 @@ impl TenantShard { pending_compute_notification: false, delayed_reconcile: false, scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(), - preferred_az_id: tsp.preferred_az_id, + preferred_az_id: tsp.preferred_az_id.map(AvailabilityZone), }) } @@ -1329,15 +1340,15 @@ impl TenantShard { config: serde_json::to_string(&self.config).unwrap(), splitting: SplitState::default(), scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(), - preferred_az_id: self.preferred_az_id.clone(), + preferred_az_id: self.preferred_az_id.as_ref().map(|az| az.0.clone()), } } - pub(crate) fn preferred_az(&self) -> Option<&str> { - self.preferred_az_id.as_deref() + pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> { + self.preferred_az_id.as_ref() } - pub(crate) fn set_preferred_az(&mut self, preferred_az_id: String) { + pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) { self.preferred_az_id = Some(preferred_az_id); } } @@ -1350,6 +1361,7 @@ pub(crate) mod tests { controller_api::NodeAvailability, shard::{ShardCount, ShardNumber}, }; + use rand::{rngs::StdRng, SeedableRng}; use utils::id::TenantId; use crate::scheduler::test_utils::make_test_nodes; @@ -1378,7 +1390,11 @@ pub(crate) mod tests { ) } - fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec { + fn make_test_tenant( + policy: PlacementPolicy, + shard_count: ShardCount, + preferred_az: Option, + ) -> Vec { let tenant_id = TenantId::generate(); (0..shard_count.count()) @@ -1390,7 +1406,7 @@ pub(crate) mod tests { shard_number, shard_count, }; - TenantShard::new( + let mut ts = TenantShard::new( tenant_shard_id, ShardIdentity::new( shard_number, @@ -1399,7 +1415,13 @@ pub(crate) mod tests { ) .unwrap(), policy.clone(), - ) + ); + + if let Some(az) = &preferred_az { + ts.set_preferred_az(az.clone()); + } + + ts }) .collect() } @@ -1410,7 +1432,7 @@ pub(crate) mod tests { fn tenant_ha_scheduling() -> anyhow::Result<()> { // Start with three nodes. Our tenant will only use two. The third one is // expected to remain unused. - let mut nodes = make_test_nodes(3); + let mut nodes = make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut context = ScheduleContext::default(); @@ -1462,7 +1484,7 @@ pub(crate) mod tests { #[test] fn intent_from_observed() -> anyhow::Result<()> { - let nodes = make_test_nodes(3); + let nodes = make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1)); @@ -1512,7 +1534,7 @@ pub(crate) mod tests { #[test] fn scheduling_mode() -> anyhow::Result<()> { - let nodes = make_test_nodes(3); + let nodes = make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1)); @@ -1537,7 +1559,7 @@ pub(crate) mod tests { #[test] fn optimize_attachment() -> anyhow::Result<()> { - let nodes = make_test_nodes(3); + let nodes = make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); @@ -1604,7 +1626,7 @@ pub(crate) mod tests { #[test] fn optimize_secondary() -> anyhow::Result<()> { - let nodes = make_test_nodes(4); + let nodes = make_test_nodes(4, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); @@ -1703,14 +1725,14 @@ pub(crate) mod tests { /// that it converges. #[test] fn optimize_add_nodes() -> anyhow::Result<()> { - let nodes = make_test_nodes(4); + let nodes = make_test_nodes(4, &[]); // Only show the scheduler a couple of nodes let mut scheduler = Scheduler::new([].iter()); scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap()); scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap()); - let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4)); + let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None); let mut schedule_context = ScheduleContext::default(); for shard in &mut shards { assert!(shard @@ -1759,16 +1781,16 @@ pub(crate) mod tests { fn initial_scheduling_is_optimal() -> anyhow::Result<()> { use itertools::Itertools; - let nodes = make_test_nodes(2); + let nodes = make_test_nodes(2, &[]); let mut scheduler = Scheduler::new([].iter()); scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap()); scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap()); - let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4)); + let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None); let a_context = Rc::new(RefCell::new(ScheduleContext::default())); - let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4)); + let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None); let b_context = Rc::new(RefCell::new(ScheduleContext::default())); let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone())); @@ -1793,4 +1815,147 @@ pub(crate) mod tests { Ok(()) } + + #[test] + fn random_az_shard_scheduling() -> anyhow::Result<()> { + use rand::seq::SliceRandom; + + for seed in 0..50 { + eprintln!("Running test with seed {seed}"); + let mut rng = StdRng::seed_from_u64(seed); + + let az_a_tag = AvailabilityZone("az-a".to_string()); + let az_b_tag = AvailabilityZone("az-b".to_string()); + let azs = [az_a_tag, az_b_tag]; + let nodes = make_test_nodes(4, &azs); + let mut shards_per_az: HashMap = HashMap::new(); + + let mut scheduler = Scheduler::new([].iter()); + for node in nodes.values() { + scheduler.node_upsert(node); + } + + let mut shards = Vec::default(); + let mut contexts = Vec::default(); + let mut az_picker = azs.iter().cycle().cloned(); + for i in 0..100 { + let az = az_picker.next().unwrap(); + let shard_count = i % 4 + 1; + *shards_per_az.entry(az.clone()).or_default() += shard_count; + + let tenant_shards = make_test_tenant( + PlacementPolicy::Attached(1), + ShardCount::new(shard_count.try_into().unwrap()), + Some(az), + ); + let context = Rc::new(RefCell::new(ScheduleContext::default())); + + contexts.push(context.clone()); + let with_ctx = tenant_shards + .into_iter() + .map(|shard| (shard, context.clone())); + for shard_with_ctx in with_ctx { + shards.push(shard_with_ctx); + } + } + + shards.shuffle(&mut rng); + + #[derive(Default, Debug)] + struct NodeStats { + attachments: u32, + secondaries: u32, + } + + let mut node_stats: HashMap = HashMap::default(); + let mut attachments_in_wrong_az = 0; + let mut secondaries_in_wrong_az = 0; + + for (shard, context) in &mut shards { + let context = &mut *context.borrow_mut(); + shard.schedule(&mut scheduler, context).unwrap(); + + let attached_node = shard.intent.get_attached().unwrap(); + let stats = node_stats.entry(attached_node).or_default(); + stats.attachments += 1; + + let secondary_node = *shard.intent.get_secondary().first().unwrap(); + let stats = node_stats.entry(secondary_node).or_default(); + stats.secondaries += 1; + + let attached_node_az = nodes + .get(&attached_node) + .unwrap() + .get_availability_zone_id(); + let secondary_node_az = nodes + .get(&secondary_node) + .unwrap() + .get_availability_zone_id(); + let preferred_az = shard.preferred_az().unwrap(); + + if attached_node_az != preferred_az { + eprintln!( + "{} attachment was scheduled in AZ {} but preferred AZ {}", + shard.tenant_shard_id, attached_node_az, preferred_az + ); + attachments_in_wrong_az += 1; + } + + if secondary_node_az == preferred_az { + eprintln!( + "{} secondary was scheduled in AZ {} which matches preference", + shard.tenant_shard_id, attached_node_az + ); + secondaries_in_wrong_az += 1; + } + } + + let mut violations = Vec::default(); + + if attachments_in_wrong_az > 0 { + violations.push(format!( + "{} attachments scheduled to the incorrect AZ", + attachments_in_wrong_az + )); + } + + if secondaries_in_wrong_az > 0 { + violations.push(format!( + "{} secondaries scheduled to the incorrect AZ", + secondaries_in_wrong_az + )); + } + + eprintln!( + "attachments_in_wrong_az={} secondaries_in_wrong_az={}", + attachments_in_wrong_az, secondaries_in_wrong_az + ); + + for (node_id, stats) in &node_stats { + let node_az = nodes.get(node_id).unwrap().get_availability_zone_id(); + let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2; + let allowed_attachment_load = + (ideal_attachment_load - 1)..(ideal_attachment_load + 2); + + if !allowed_attachment_load.contains(&stats.attachments) { + violations.push(format!( + "Found {} attachments on node {}, but expected {}", + stats.attachments, node_id, ideal_attachment_load + )); + } + + eprintln!( + "{}: attachments={} secondaries={} ideal_attachment_load={}", + node_id, stats.attachments, stats.secondaries, ideal_attachment_load + ); + } + + assert!(violations.is_empty(), "{violations:?}"); + + for (mut shard, _ctx) in shards { + shard.intent.clear(&mut scheduler); + } + } + Ok(()) + } } From 4b711caf5edb808a6bfbe69dc6a1cbe9a7ff70a6 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 25 Sep 2024 14:56:39 +0100 Subject: [PATCH 16/20] storage controller: make proxying of GETs to pageservers more robust (#9065) ## Problem These commits are split off from https://github.com/neondatabase/neon/pull/8971/commits where I was fixing this to make a better scale test pass -- Vlad also independently recognized these issues with cloudbench in https://github.com/neondatabase/neon/issues/9062. 1. The storage controller proxies GET requests to pageservers based on their intent, not the ground truth of where they're really attached. 2. Proxied requests can race with scheduling to tenants, resulting in 404 responses if the request hits the wrong pageserver. Closes: https://github.com/neondatabase/neon/issues/9062 ## Summary of changes 1. If a shard has a running reconciler, then use the database generation_pageserver to decide who to proxy the request to 2. If such a request gets a 404 response and its scheduled node has changed since the request was dispatched. --- storage_controller/src/http.rs | 23 ++++-- storage_controller/src/reconciler.rs | 4 + storage_controller/src/service.rs | 74 ++++++++++++----- .../regress/test_storage_controller.py | 82 +++++++++++++++++++ 4 files changed, 157 insertions(+), 26 deletions(-) diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 95e4a469ac95..4dd8badd0391 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -515,7 +515,7 @@ async fn handle_tenant_timeline_passthrough( tracing::info!("Proxying request for tenant {} ({})", tenant_id, path); // Find the node that holds shard zero - let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id)?; + let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?; // Callers will always pass an unsharded tenant ID. Before proxying, we must // rewrite this to a shard-aware shard zero ID. @@ -545,10 +545,10 @@ async fn handle_tenant_timeline_passthrough( let _timer = latency.start_timer(labels.clone()); let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref()); - let resp = client.get_raw(path).await.map_err(|_e| - // FIXME: give APiError a proper Unavailable variant. We return 503 here because - // if we can't successfully send a request to the pageserver, we aren't available. - ApiError::ShuttingDown)?; + let resp = client.get_raw(path).await.map_err(|e| + // We return 503 here because if we can't successfully send a request to the pageserver, + // either we aren't available or the pageserver is unavailable. + ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?; if !resp.status().is_success() { let error_counter = &METRICS_REGISTRY @@ -557,6 +557,19 @@ async fn handle_tenant_timeline_passthrough( error_counter.inc(labels); } + // Transform 404 into 503 if we raced with a migration + if resp.status() == reqwest::StatusCode::NOT_FOUND { + // Look up node again: if we migrated it will be different + let (new_node, _tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?; + if new_node.get_id() != node.get_id() { + // Rather than retry here, send the client a 503 to prompt a retry: this matches + // the pageserver's use of 503, and all clients calling this API should retry on 503. + return Err(ApiError::ResourceUnavailable( + format!("Pageserver {node} returned 404, was migrated to {new_node}").into(), + )); + } + } + // We have a reqest::Response, would like a http::Response let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?); for (k, v) in resp.headers() { diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index 750bcd7c0138..93b1c80566bd 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -541,6 +541,8 @@ impl Reconciler { } } + pausable_failpoint!("reconciler-live-migrate-pre-generation-inc"); + // Increment generation before attaching to new pageserver self.generation = Some( self.persistence @@ -617,6 +619,8 @@ impl Reconciler { }, ); + pausable_failpoint!("reconciler-live-migrate-post-detach"); + tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",); let dest_final_conf = build_location_config( &self.shard, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 6a11e9650ce9..a5e012968475 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -3508,34 +3508,66 @@ impl Service { /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this /// function looks up and returns node. If the tenant isn't found, returns Err(ApiError::NotFound) - pub(crate) fn tenant_shard0_node( + pub(crate) async fn tenant_shard0_node( &self, tenant_id: TenantId, ) -> Result<(Node, TenantShardId), ApiError> { - let locked = self.inner.read().unwrap(); - let Some((tenant_shard_id, shard)) = locked - .tenants - .range(TenantShardId::tenant_range(tenant_id)) - .next() - else { - return Err(ApiError::NotFound( - anyhow::anyhow!("Tenant {tenant_id} not found").into(), - )); + // Look up in-memory state and maybe use the node from there. + { + let locked = self.inner.read().unwrap(); + let Some((tenant_shard_id, shard)) = locked + .tenants + .range(TenantShardId::tenant_range(tenant_id)) + .next() + else { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant {tenant_id} not found").into(), + )); + }; + + let Some(intent_node_id) = shard.intent.get_attached() else { + tracing::warn!( + tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), + "Shard not scheduled (policy {:?}), cannot generate pass-through URL", + shard.policy + ); + return Err(ApiError::Conflict( + "Cannot call timeline API on non-attached tenant".to_string(), + )); + }; + + if shard.reconciler.is_none() { + // Optimization: while no reconcile is in flight, we may trust our in-memory state + // to tell us which pageserver to use. Otherwise we will fall through and hit the database + let Some(node) = locked.nodes.get(intent_node_id) else { + // This should never happen + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Shard refers to nonexistent node" + ))); + }; + return Ok((node.clone(), *tenant_shard_id)); + } }; - // TODO: should use the ID last published to compute_hook, rather than the intent: the intent might - // point to somewhere we haven't attached yet. - let Some(node_id) = shard.intent.get_attached() else { - tracing::warn!( - tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), - "Shard not scheduled (policy {:?}), cannot generate pass-through URL", - shard.policy - ); - return Err(ApiError::Conflict( - "Cannot call timeline API on non-attached tenant".to_string(), + // Look up the latest attached pageserver location from the database + // generation state: this will reflect the progress of any ongoing migration. + // Note that it is not guaranteed to _stay_ here, our caller must still handle + // the case where they call through to the pageserver and get a 404. + let db_result = self.persistence.tenant_generations(tenant_id).await?; + let Some(ShardGenerationState { + tenant_shard_id, + generation: _, + generation_pageserver: Some(node_id), + }) = db_result.first() + else { + // This can happen if we raced with a tenant deletion or a shard split. On a retry + // the caller will either succeed (shard split case), get a proper 404 (deletion case), + // or a conflict response (case where tenant was detached in background) + return Err(ApiError::ResourceUnavailable( + "Shard {} not found in database, or is not attached".into(), )); }; - + let locked = self.inner.read().unwrap(); let Some(node) = locked.nodes.get(node_id) else { // This should never happen return Err(ApiError::InternalServerError(anyhow::anyhow!( diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 4106efd4f9cc..3861f0b82274 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -4,6 +4,7 @@ import time from collections import defaultdict from datetime import datetime, timezone +from enum import Enum from typing import Any, Dict, List, Optional, Set, Tuple, Union import pytest @@ -2466,6 +2467,87 @@ def has_hit_migration_failpoint(): raise +class MigrationFailpoints(Enum): + # While only the origin is attached + PRE_GENERATION_INC = "reconciler-live-migrate-pre-generation-inc" + # While both locations are attached + POST_NOTIFY = "reconciler-live-migrate-post-notify" + # While only the destination is attached + POST_DETACH = "reconciler-live-migrate-post-detach" + + +@pytest.mark.parametrize( + "migration_failpoint", + [ + MigrationFailpoints.PRE_GENERATION_INC, + MigrationFailpoints.POST_NOTIFY, + MigrationFailpoints.POST_DETACH, + ], +) +def test_storage_controller_proxy_during_migration( + neon_env_builder: NeonEnvBuilder, migration_failpoint: MigrationFailpoints +): + """ + If we send a proxied GET request to the controller during a migration, it should route + the request to whichever pageserver was most recently issued a generation. + + Reproducer for https://github.com/neondatabase/neon/issues/9062 + """ + neon_env_builder.num_pageservers = 2 + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + env = neon_env_builder.init_configs() + env.start() + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + env.neon_cli.create_tenant(tenant_id, timeline_id) + + # Activate a failpoint that will cause live migration to get stuck _after_ the generation has been issued + # to the new pageserver: this should result in requests routed to the new pageserver. + env.storage_controller.configure_failpoints((migration_failpoint.value, "pause")) + + origin_pageserver = env.get_tenant_pageserver(tenant_id) + dest_ps_id = [p.id for p in env.pageservers if p.id != origin_pageserver.id][0] + + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + migrate_fut = executor.submit( + env.storage_controller.tenant_shard_migrate, + TenantShardId(tenant_id, 0, 0), + dest_ps_id, + ) + + def has_hit_migration_failpoint(): + expr = f"at failpoint {str(migration_failpoint.value)}" + log.info(expr) + assert env.storage_controller.log_contains(expr) + + wait_until(10, 1, has_hit_migration_failpoint) + + # This request should be routed to whichever pageserver holds the highest generation + tenant_info = env.storage_controller.pageserver_api().tenant_status( + tenant_id, + ) + + if migration_failpoint in ( + MigrationFailpoints.POST_NOTIFY, + MigrationFailpoints.POST_DETACH, + ): + # We expect request to land on the destination + assert tenant_info["generation"] == 2 + elif migration_failpoint == MigrationFailpoints.PRE_GENERATION_INC: + # We expect request to land on the origin + assert tenant_info["generation"] == 1 + + # Eventually migration completes + env.storage_controller.configure_failpoints((migration_failpoint.value, "off")) + migrate_fut.result() + except: + # Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown + env.storage_controller.configure_failpoints((migration_failpoint.value, "off")) + raise + + @run_only_on_default_postgres("this is like a 'unit test' against storcon db") def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_configs() From 518f598e2d7e056c711694592c50737136eb8f38 Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Wed, 25 Sep 2024 16:24:09 +0200 Subject: [PATCH 17/20] docs(rfc): Independent compute release flow (#8881) Related to https://github.com/neondatabase/cloud/issues/11698 --- docs/rfcs/038-independent-compute-release.md | 343 +++++++++++++++++++ 1 file changed, 343 insertions(+) create mode 100644 docs/rfcs/038-independent-compute-release.md diff --git a/docs/rfcs/038-independent-compute-release.md b/docs/rfcs/038-independent-compute-release.md new file mode 100644 index 000000000000..3deaf1e6fdfb --- /dev/null +++ b/docs/rfcs/038-independent-compute-release.md @@ -0,0 +1,343 @@ +# Independent compute release + +Created at: 2024-08-30. Author: Alexey Kondratov (@ololobus) + +## Summary + +This document proposes an approach to fully independent compute release flow. It attempts to +cover the following features: + +- Process is automated as much as possible to minimize human errors. +- Compute<->storage protocol compatibility is ensured. +- A transparent release history is available with an easy rollback strategy. +- Although not in the scope of this document, there is a viable way to extend the proposed release + flow to achieve the canary and/or blue-green deployment strategies. + +## Motivation + +Previously, the compute release was tightly coupled to the storage release. This meant that once +some storage nodes got restarted with a newer version, all new compute starts using these nodes +automatically got a new version. Thus, two releases happen in parallel, which increases the blast +radius and makes ownership fuzzy. + +Now, we practice a manual v0 independent compute release flow -- after getting a new compute release +image and tag, we pin it region by region using Admin UI. It's better, but it still has its own flaws: + +1. It's a simple but fairly manual process, as you need to click through a few pages. +2. It's prone to human errors, e.g., you could mistype or copy the wrong compute tag. +3. We now require an additional approval in the Admin UI, which partially solves the 2., + but also makes the whole process pretty annoying, as you constantly need to go back + and forth between two people. + +## Non-goals + +It's not the goal of this document to propose a design for some general-purpose release tool like Helm. +The document considers how the current compute fleet is orchestrated at Neon. Even if we later +decide to split the control plane further (e.g., introduce a separate compute controller), the proposed +release process shouldn't change much, i.e., the releases table and API will reside in +one of the parts. + +Achieving the canary and/or blue-green deploy strategies is out of the scope of this document. They +were kept in mind, though, so it's expected that the proposed approach will lay down the foundation +for implementing them in future iterations. + +## Impacted components + +Compute, control plane, CI, observability (some Grafana dashboards may require changes). + +## Prior art + +One of the very close examples is how Helm tracks [releases history](https://helm.sh/docs/helm/helm_history/). + +In the code: + +- [Release](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/release.go#L20-L43) +- [Release info](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/info.go#L24-L40) +- [Release status](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/status.go#L18-L42) + +TL;DR it has several important attributes: + +- Revision -- unique release ID/primary key. It is not the same as the application version, + because the same version can be deployed several times, e.g., after a newer version rollback. +- App version -- version of the application chart/code. +- Config -- set of overrides to the default config of the application. +- Status -- current status of the release in the history. +- Timestamps -- tracks when a release was created and deployed. + +## Proposed implementation + +### Separate release branch + +We will use a separate release branch, `release-compute`, to have a clean history for releases and commits. +In order to avoid confusion with storage releases, we will use a different prefix for compute [git release +tags](https://github.com/neondatabase/neon/releases) -- `release-compute-XXXX`. We will use the same tag for +Docker images as well. The `neondatabase/compute-node-v16:release-compute-XXXX` looks longer and a bit redundant, +but it's better to have image and git tags in sync. + +Currently, control plane relies on the numeric compute and storage release versions to decide on compute->storage +compatibility. Once we implement this proposal, we should drop this code as release numbers will be completely +independent. The only constraint we want is that it must monotonically increase within the same release branch. + +### Compute config/settings manifest + +We will create a new sub-directory `compute` and file `compute/manifest.yaml` with a structure: + +```yaml +pg_settings: + # Common settings for primaries and secondaries of all versions. + common: + wal_log_hints: "off" + max_wal_size: "1024" + + per_version: + 14: + # Common settings for both replica and primary of version PG 14 + common: + shared_preload_libraries: "neon,pg_stat_statements,extension_x" + 15: + common: + shared_preload_libraries: "neon,pg_stat_statements,extension_x" + # Settings that should be applied only to + replica: + # Available only starting Postgres 15th + recovery_prefetch: "off" + # ... + 17: + common: + # For example, if third-party `extension_x` is not yet available for PG 17 + shared_preload_libraries: "neon,pg_stat_statements" + replica: + recovery_prefetch: "off" +``` + +**N.B.** Setting value should be a string with `on|off` for booleans and a number (as a string) +without units for all numeric settings. That's how the control plane currently operates. + +The priority of settings will be (a higher number is a higher priority): + +1. Any static and hard-coded settings in the control plane +2. `pg_settings->common` +3. Per-version `common` +4. Per-version `replica` +5. Any per-user/project/endpoint overrides in the control plane +6. Any dynamic setting calculated based on the compute size + +**N.B.** For simplicity, we do not do any custom logic for `shared_preload_libraries`, so it's completely +overridden if specified on some level. Make sure that you include all necessary extensions in it when you +do any overrides. + +**N.B.** There is a tricky question about what to do with custom compute image pinning we sometimes +do for particular projects and customers. That's usually some ad-hoc work and images are based on +the latest compute image, so it's relatively safe to assume that we could use settings from the latest compute +release. If for some reason that's not true, and further overrides are needed, it's also possible to do +on the project level together with pinning the image, so it's on-call/engineer/support responsibility to +ensure that compute starts with the specified custom image. The only real risk is that compute image will get +stale and settings from new releases will drift away, so eventually it will get something incompatible, +but i) this is some operational issue, as we do not want stale images anyway, and ii) base settings +receive something really new so rarely that the chance of this happening is very low. If we want to solve it completely, +then together with pinning the image we could also pin the matching release revision in the control plane. + +The compute team will own the content of `compute/manifest.yaml`. + +### Control plane: releases table + +In order to store information about releases, the control plane will use a table `compute_releases` with the following +schema: + +```sql +CREATE TABLE compute_releases ( + -- Unique release ID + -- N.B. Revision won't by synchronized across all regions, because all control planes are technically independent + -- services. We have the same situation with Helm releases as well because they could be deployed and rolled back + -- independently in different clusters. + revision BIGSERIAL PRIMARY KEY, + -- Numeric version of the compute image, e.g. 9057 + version BIGINT NOT NULL, + -- Compute image tag, e.g. `release-9057` + tag TEXT NOT NULL, + -- Current release status. Currently, it will be a simple enum + -- * `deployed` -- release is deployed and used for new compute starts. + -- Exactly one release can have this status at a time. + -- * `superseded` -- release has been replaced by a newer one. + -- But we can always extend it in the future when we need more statuses + -- for more complex deployment strategies. + status TEXT NOT NULL, + -- Any additional metadata for compute in the corresponding release + manifest JSONB NOT NULL, + -- Timestamp when release record was created in the control plane database + created_at TIMESTAMP NOT NULL DEFAULT now(), + -- Timestamp when release deployment was finished + deployed_at TIMESTAMP +); +``` + +We keep track of the old releases not only for the sake of audit, but also because we usually have ~30% of +old computes started using the image from one of the previous releases. Yet, when users want to reconfigure +them without restarting, the control plane needs to know what settings are applicable to them, so we also need +information about the previous releases that are readily available. There could be some other auxiliary info +needed as well: supported extensions, compute flags, etc. + +**N.B.** Here, we can end up in an ambiguous situation when the same compute image is deployed twice, e.g., +it was deployed once, then rolled back, and then deployed again, potentially with a different manifest. Yet, +we could've started some computes with the first deployment and some with the second. Thus, when we need to +look up the manifest for the compute by its image tag, we will see two records in the table with the same tag, +but different revision numbers. We can assume that this could happen only in case of rollbacks, so we +can just take the latest revision for the given tag. + +### Control plane: management API + +The control plane will implement new API methods to manage releases: + +1. `POST /management/api/v2/compute_releases` to create a new release. With payload + + ```json + { + "version": 9057, + "tag": "release-9057", + "manifest": {} + } + ``` + + and response + + ```json + { + "revision": 53, + "version": 9057, + "tag": "release-9057", + "status": "deployed", + "manifest": {}, + "created_at": "2024-08-15T15:52:01.0000Z", + "deployed_at": "2024-08-15T15:52:01.0000Z", + } + ``` + + Here, we can actually mix-in custom (remote) extensions metadata into the `manifest`, so that the control plane + will get information about all available extensions not bundled into compute image. The corresponding + workflow in `neondatabase/build-custom-extensions` should produce it as an artifact and make + it accessible to the workflow in the `neondatabase/infra`. See the complete release flow below. Doing that, + we put a constraint that new custom extension requires new compute release, which is good for the safety, + but is not exactly what we want operational-wise (we want to be able to deploy new extensions without new + images). Yet, it can be solved incrementally: v0 -- do not do anything with extensions at all; + v1 -- put them into the same manifest; v2 -- make them separate entities with their own lifecycle. + + **N.B.** This method is intended to be used in CI workflows, and CI/network can be flaky. It's reasonable + to assume that we could retry the request several times, even though it's already succeeded. Although it's + not a big deal to create several identical releases one-by-one, it's better to avoid it, so the control plane + should check if the latest release is identical and just return `304 Not Modified` in this case. + +2. `POST /management/api/v2/compute_releases/rollback` to rollback to any previously deployed release. With payload + including the revision of the release to rollback to: + + ```json + { + "revision": 52 + } + ``` + + Rollback marks the current release as `superseded` and creates a new release with all the same data as the + requested revision, but with a new revision number. + + This rollback API is not strictly needed, as we can just use `infra` repo workflow to deploy any + available tag. It's still nice to have for on-call and any urgent matters, for example, if we need + to rollback and GitHub is down. It's much easier to specify only the revision number vs. crafting + all the necessary data for the new release payload. + +### Compute->storage compatibility tests + +In order to safely release new compute versions independently from storage, we need to ensure that the currently +deployed storage is compatible with the new compute version. Currently, we maintain backward compatibility +in storage, but newer computes may require a newer storage version. + +Remote end-to-end (e2e) tests [already accept](https://github.com/neondatabase/cloud/blob/e3468d433e0d73d02b7d7e738d027f509b522408/.github/workflows/testing.yml#L43-L48) +`storage_image_tag` and `compute_image_tag` as separate inputs. That means that we could reuse e2e tests to ensure +compatibility between storage and compute: + +1. Pick the latest storage release tag and use it as `storage_image_tag`. +2. Pick a new compute tag built in the current compute release PR and use it as `compute_image_tag`. + Here, we should use a temporary ECR image tag, because the final tag will be known only after the release PR is merged. +3. Trigger e2e tests as usual. + +### Release flow + +```mermaid + sequenceDiagram + + actor oncall as Compute on-call person + participant neon as neondatabase/neon + + box private + participant cloud as neondatabase/cloud + participant exts as neondatabase/build-custom-extensions + participant infra as neondatabase/infra + end + + box cloud + participant preprod as Pre-prod control plane + participant prod as Production control plane + participant k8s as Compute k8s + end + + oncall ->> neon: Open release PR into release-compute + + activate neon + neon ->> cloud: CI: trigger e2e compatibility tests + activate cloud + cloud -->> neon: CI: e2e tests pass + deactivate cloud + neon ->> neon: CI: pass PR checks, get approvals + deactivate neon + + oncall ->> neon: Merge release PR into release-compute + + activate neon + neon ->> neon: CI: pass checks, build and push images + neon ->> exts: CI: trigger extensions build + activate exts + exts -->> neon: CI: extensions are ready + deactivate exts + neon ->> neon: CI: create release tag + neon ->> infra: Trigger release workflow using the produced tag + deactivate neon + + activate infra + infra ->> infra: CI: pass checks + infra ->> preprod: Release new compute image to pre-prod automatically
POST /management/api/v2/compute_releases + activate preprod + preprod -->> infra: 200 OK + deactivate preprod + + infra ->> infra: CI: wait for per-region production deploy approvals + oncall ->> infra: CI: approve deploys region by region + infra ->> k8s: Prewarm new compute image + infra ->> prod: POST /management/api/v2/compute_releases + activate prod + prod -->> infra: 200 OK + deactivate prod + deactivate infra +``` + +## Further work + +As briefly mentioned in other sections, eventually, we would like to use more complex deployment strategies. +For example, we can pass a fraction of the total compute starts that should use the new release. Then we can +mark the release as `partial` or `canary` and monitor its performance. If everything is fine, we can promote it +to `deployed` status. If not, we can roll back to the previous one. + +## Alternatives + +In theory, we can try using Helm as-is: + +1. Write a compute Helm chart. That will actually have only some config map, which the control plane can access and read. + N.B. We could reuse the control plane chart as well, but then it's not a fully independent release again and even more fuzzy. +2. The control plane will read it and start using the new compute version for new starts. + +Drawbacks: + +1. Helm releases work best if the workload is controlled by the Helm chart itself. Then you can have different + deployment strategies like rolling update or canary or blue/green deployments. At Neon, the compute starts are controlled + by control plane, so it makes it much more tricky. +2. Releases visibility will suffer, i.e. instead of a nice table in the control plane and Admin UI, we would need to use + `helm` cli and/or K8s UIs like K8sLens. +3. We do not restart all computes shortly after the new version release. This means that for some features and compatibility + purpose (see above) control plane may need some auxiliary info from the previous releases. From c4f5736d5a1077c4e6b4b26478006e874a789c57 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Wed, 25 Sep 2024 16:50:05 +0200 Subject: [PATCH 18/20] Build images for PG17 using Debian 12 "Bookworm" (#9132) This increases the support window of the OS used for PG17 by 2 years compared to the previous usage of Debian 11 "Bullseye". --- .github/workflows/build_and_test.yml | 41 +++++++++++++++++------- .github/workflows/trigger-e2e-tests.yml | 2 +- compute/Dockerfile.compute-node | 42 ++++++++++++++++++------- 3 files changed, 61 insertions(+), 24 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index a634edb96b87..9dcc9709eb4c 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -602,7 +602,20 @@ jobs: strategy: fail-fast: false matrix: - version: [ v14, v15, v16, v17 ] + version: + # Much data was already generated on old PG versions with bullseye's + # libraries, the locales of which can cause data incompatibilities. + # However, new PG versions should check if they can be built on newer + # images, as that reduces the support burden of old and ancient + # distros. + - pg: v14 + debian: bullseye-slim + - pg: v15 + debian: bullseye-slim + - pg: v16 + debian: bullseye-slim + - pg: v17 + debian: bookworm-slim arch: [ x64, arm64 ] runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }} @@ -645,41 +658,46 @@ jobs: context: . build-args: | GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }} - PG_VERSION=${{ matrix.version }} + PG_VERSION=${{ matrix.version.pg }} BUILD_TAG=${{ needs.tag.outputs.build-tag }} TAG=${{ needs.build-build-tools-image.outputs.image-tag }} + DEBIAN_FLAVOR=${{ matrix.version.debian }} provenance: false push: true pull: true file: compute/Dockerfile.compute-node - cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version }}:cache-${{ matrix.arch }} - cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1},mode=max', matrix.version, matrix.arch) || '' }} + cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.arch }} + cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1},mode=max', matrix.version.pg, matrix.arch) || '' }} tags: | - neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }} + neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }} - name: Build neon extensions test image - if: matrix.version == 'v16' + if: matrix.version.pg == 'v16' uses: docker/build-push-action@v6 with: context: . build-args: | GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }} - PG_VERSION=${{ matrix.version }} + PG_VERSION=${{ matrix.version.pg }} BUILD_TAG=${{ needs.tag.outputs.build-tag }} TAG=${{ needs.build-build-tools-image.outputs.image-tag }} + DEBIAN_FLAVOR=${{ matrix.version.debian }} provenance: false push: true pull: true file: compute/Dockerfile.compute-node target: neon-pg-ext-test - cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version }}:cache-${{ matrix.arch }} - cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon-test-extensions-{0}:cache-{1},mode=max', matrix.version, matrix.arch) || '' }} + cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version.pg }}:cache-${{ matrix.arch }} + cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon-test-extensions-{0}:cache-{1},mode=max', matrix.version.pg, matrix.arch) || '' }} tags: | - neondatabase/neon-test-extensions-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}-${{ matrix.arch }} + neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.arch }} - name: Build compute-tools image # compute-tools are Postgres independent, so build it only once - if: matrix.version == 'v17' + # We pick 16, because that builds on debian 11 with older glibc (and is + # thus compatible with newer glibc), rather than 17 on Debian 12, as + # that isn't guaranteed to be compatible with Debian 11 + if: matrix.version.pg == 'v16' uses: docker/build-push-action@v6 with: target: compute-tools-image @@ -688,6 +706,7 @@ jobs: GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }} BUILD_TAG=${{ needs.tag.outputs.build-tag }} TAG=${{ needs.build-build-tools-image.outputs.image-tag }} + DEBIAN_FLAVOR=${{ matrix.version.debian }} provenance: false push: true pull: true diff --git a/.github/workflows/trigger-e2e-tests.yml b/.github/workflows/trigger-e2e-tests.yml index f25c1051cd98..cad97645327b 100644 --- a/.github/workflows/trigger-e2e-tests.yml +++ b/.github/workflows/trigger-e2e-tests.yml @@ -102,7 +102,7 @@ jobs: # Default set of platforms to run e2e tests on platforms='["docker", "k8s"]' - # If the PR changes vendor/, pgxn/ or libs/vm_monitor/ directories, or Dockerfile.compute-node, add k8s-neonvm to the list of platforms. + # If the PR changes vendor/, pgxn/ or libs/vm_monitor/ directories, or compute/Dockerfile.compute-node, add k8s-neonvm to the list of platforms. # If the workflow run is not a pull request, add k8s-neonvm to the list. if [ "$GITHUB_EVENT_NAME" == "pull_request" ]; then for f in $(gh api "/repos/${GITHUB_REPOSITORY}/pulls/${PR_NUMBER}/files" --paginate --jq '.[].filename'); do diff --git a/compute/Dockerfile.compute-node b/compute/Dockerfile.compute-node index 18c68c116a94..2c647a669c28 100644 --- a/compute/Dockerfile.compute-node +++ b/compute/Dockerfile.compute-node @@ -3,13 +3,15 @@ ARG REPOSITORY=neondatabase ARG IMAGE=build-tools ARG TAG=pinned ARG BUILD_TAG +ARG DEBIAN_FLAVOR=bullseye-slim ######################################################################################### # # Layer "build-deps" # ######################################################################################### -FROM debian:bullseye-slim AS build-deps +FROM debian:$DEBIAN_FLAVOR AS build-deps +ARG DEBIAN_FLAVOR RUN apt update && \ apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \ zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \ @@ -1027,7 +1029,8 @@ RUN cd compute_tools && mold -run cargo build --locked --profile release-line-de # ######################################################################################### -FROM debian:bullseye-slim AS compute-tools-image +FROM debian:$DEBIAN_FLAVOR AS compute-tools-image +ARG DEBIAN_FLAVOR COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl @@ -1037,7 +1040,8 @@ COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compu # ######################################################################################### -FROM debian:bullseye-slim AS pgbouncer +FROM debian:$DEBIAN_FLAVOR AS pgbouncer +ARG DEBIAN_FLAVOR RUN set -e \ && apt-get update \ && apt-get install -y \ @@ -1179,7 +1183,9 @@ ENV PGDATABASE=postgres # Put it all together into the final image # ######################################################################################### -FROM debian:bullseye-slim +FROM debian:$DEBIAN_FLAVOR +ARG DEBIAN_FLAVOR +ENV DEBIAN_FLAVOR=$DEBIAN_FLAVOR # Add user postgres RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \ echo "postgres:test_console_pass" | chpasswd && \ @@ -1211,21 +1217,34 @@ COPY --chmod=0644 compute/etc/neon_collector_autoscaling.yml /etc/neon_collector # Create remote extension download directory RUN mkdir /usr/local/download_extensions && chown -R postgres:postgres /usr/local/download_extensions - # Install: # libreadline8 for psql -# libicu67, locales for collations (including ICU and plpgsql_check) # liblz4-1 for lz4 # libossp-uuid16 for extension ossp-uuid -# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS +# libgeos, libsfcgal1, and libprotobuf-c1 for PostGIS # libxml2, libxslt1.1 for xml2 # libzstd1 for zstd # libboost* for rdkit # ca-certificates for communicating with s3 by compute_ctl -RUN apt update && \ + + +RUN apt update && \ + case $DEBIAN_FLAVOR in \ + # Version-specific installs for Bullseye (PG14-PG16): + # libicu67, locales for collations (including ICU and plpgsql_check) + # libgdal28, libproj19 for PostGIS + bullseye*) \ + VERSION_INSTALLS="libicu67 libgdal28 libproj19"; \ + ;; \ + # Version-specific installs for Bookworm (PG17): + # libicu72, locales for collations (including ICU and plpgsql_check) + # libgdal32, libproj25 for PostGIS + bookworm*) \ + VERSION_INSTALLS="libicu72 libgdal32 libproj25"; \ + ;; \ + esac && \ apt install --no-install-recommends -y \ gdb \ - libicu67 \ liblz4-1 \ libreadline8 \ libboost-iostreams1.74.0 \ @@ -1234,8 +1253,6 @@ RUN apt update && \ libboost-system1.74.0 \ libossp-uuid16 \ libgeos-c1v5 \ - libgdal28 \ - libproj19 \ libprotobuf-c1 \ libsfcgal1 \ libxml2 \ @@ -1244,7 +1261,8 @@ RUN apt update && \ libcurl4-openssl-dev \ locales \ procps \ - ca-certificates && \ + ca-certificates \ + $VERSION_INSTALLS && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8 From c5972389aafe51ea77b958f1d192d103d9ff7e6b Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 25 Sep 2024 15:54:41 +0100 Subject: [PATCH 19/20] storcon: include timeline ID in LSN waiting logs (#9141) ## Problem Hard to tell which timeline is holding the migration. ## Summary of Changes Add timeline id to log. --- storage_controller/src/reconciler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index 93b1c80566bd..2c42da404355 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -463,7 +463,7 @@ impl Reconciler { for (timeline_id, baseline_lsn) in &baseline { match latest.get(timeline_id) { Some(latest_lsn) => { - tracing::info!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}"); + tracing::info!(timeline_id = %timeline_id, "🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}"); if latest_lsn < baseline_lsn { any_behind = true; } From d447f49bc33c7207bd1659eccfc5d892300dd57a Mon Sep 17 00:00:00 2001 From: Yuchen Liang <70461588+yliang412@users.noreply.github.com> Date: Wed, 25 Sep 2024 10:57:38 -0400 Subject: [PATCH 20/20] fix(pageserver): handle lsn lease requests for unnormalized lsns (#9137) Fixes https://github.com/neondatabase/neon/issues/9098. ## Problem See https://github.com/neondatabase/neon/issues/9098#issuecomment-2372484969. ### Related A similar problem happened with branch creation, which was discussed [here](https://github.com/neondatabase/neon/pull/2143#issuecomment-1199969052) and fixed by https://github.com/neondatabase/neon/pull/2529. ## Summary of changes - Normalize the lsn on pageserver side upon lsn lease request, stores the normalized LSN. Signed-off-by: Yuchen Liang --- pageserver/src/tenant/timeline.rs | 6 +++++- test_runner/regress/test_readonly_node.py | 15 +++++++++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index c98efd5f7184..d301ba23eafa 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -112,7 +112,7 @@ use pageserver_api::reltag::RelTag; use pageserver_api::shard::ShardIndex; use postgres_connection::PgConnectionConfig; -use postgres_ffi::to_pg_timestamp; +use postgres_ffi::{to_pg_timestamp, v14::xlog_utils, WAL_SEGMENT_SIZE}; use utils::{ completion, generation::Generation, @@ -1337,6 +1337,10 @@ impl Timeline { _ctx: &RequestContext, ) -> anyhow::Result { let lease = { + // Normalize the requested LSN to be aligned, and move to the first record + // if it points to the beginning of the page (header). + let lsn = xlog_utils::normalize_lsn(lsn, WAL_SEGMENT_SIZE); + let mut gc_info = self.gc_info.write().unwrap(); let valid_until = SystemTime::now() + length; diff --git a/test_runner/regress/test_readonly_node.py b/test_runner/regress/test_readonly_node.py index 347fc3a04ddb..5e8b8d38f7e2 100644 --- a/test_runner/regress/test_readonly_node.py +++ b/test_runner/regress/test_readonly_node.py @@ -122,6 +122,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): Test static endpoint is protected from GC by acquiring and renewing lsn leases. """ + LSN_LEASE_LENGTH = 8 neon_env_builder.num_pageservers = 2 # GC is manual triggered. env = neon_env_builder.init_start( @@ -139,7 +140,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): "image_creation_threshold": "1", "image_layer_creation_check_threshold": "0", # Short lease length to fit test. - "lsn_lease_length": "3s", + "lsn_lease_length": f"{LSN_LEASE_LENGTH}s", }, initial_tenant_shard_count=2, ) @@ -170,10 +171,14 @@ def generate_updates_on_main( with env.endpoints.create_start("main") as ep_main: with ep_main.cursor() as cur: cur.execute("CREATE TABLE t0(v0 int primary key, v1 text)") - lsn = None + lsn = Lsn(0) for i in range(2): lsn = generate_updates_on_main(env, ep_main, i) + # Round down to the closest LSN on page boundary (unnormalized). + XLOG_BLCKSZ = 8192 + lsn = Lsn((int(lsn) // XLOG_BLCKSZ) * XLOG_BLCKSZ) + with env.endpoints.create_start( branch_name="main", endpoint_id="static", @@ -183,7 +188,8 @@ def generate_updates_on_main( cur.execute("SELECT count(*) FROM t0") assert cur.fetchone() == (ROW_COUNT,) - time.sleep(3) + # Wait for static compute to renew lease at least once. + time.sleep(LSN_LEASE_LENGTH / 2) generate_updates_on_main(env, ep_main, i, end=100) @@ -204,8 +210,9 @@ def generate_updates_on_main( # Do some update so we can increment latest_gc_cutoff generate_updates_on_main(env, ep_main, i, end=100) + # Wait for the existing lease to expire. + time.sleep(LSN_LEASE_LENGTH) # Now trigger GC again, layers should be removed. - time.sleep(4) for shard, ps in tenant_get_shards(env, env.initial_tenant): client = ps.http_client() gc_result = client.timeline_gc(shard, env.initial_timeline, 0)