diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index a634edb96b87..9dcc9709eb4c 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -602,7 +602,20 @@ jobs: strategy: fail-fast: false matrix: - version: [ v14, v15, v16, v17 ] + version: + # Much data was already generated on old PG versions with bullseye's + # libraries, the locales of which can cause data incompatibilities. + # However, new PG versions should check if they can be built on newer + # images, as that reduces the support burden of old and ancient + # distros. + - pg: v14 + debian: bullseye-slim + - pg: v15 + debian: bullseye-slim + - pg: v16 + debian: bullseye-slim + - pg: v17 + debian: bookworm-slim arch: [ x64, arm64 ] runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }} @@ -645,41 +658,46 @@ jobs: context: . build-args: | GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }} - PG_VERSION=${{ matrix.version }} + PG_VERSION=${{ matrix.version.pg }} BUILD_TAG=${{ needs.tag.outputs.build-tag }} TAG=${{ needs.build-build-tools-image.outputs.image-tag }} + DEBIAN_FLAVOR=${{ matrix.version.debian }} provenance: false push: true pull: true file: compute/Dockerfile.compute-node - cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version }}:cache-${{ matrix.arch }} - cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1},mode=max', matrix.version, matrix.arch) || '' }} + cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.arch }} + cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1},mode=max', matrix.version.pg, matrix.arch) || '' }} tags: | - neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }} + neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }} - name: Build neon extensions test image - if: matrix.version == 'v16' + if: matrix.version.pg == 'v16' uses: docker/build-push-action@v6 with: context: . build-args: | GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }} - PG_VERSION=${{ matrix.version }} + PG_VERSION=${{ matrix.version.pg }} BUILD_TAG=${{ needs.tag.outputs.build-tag }} TAG=${{ needs.build-build-tools-image.outputs.image-tag }} + DEBIAN_FLAVOR=${{ matrix.version.debian }} provenance: false push: true pull: true file: compute/Dockerfile.compute-node target: neon-pg-ext-test - cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version }}:cache-${{ matrix.arch }} - cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon-test-extensions-{0}:cache-{1},mode=max', matrix.version, matrix.arch) || '' }} + cache-from: type=registry,ref=cache.neon.build/neon-test-extensions-${{ matrix.version.pg }}:cache-${{ matrix.arch }} + cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon-test-extensions-{0}:cache-{1},mode=max', matrix.version.pg, matrix.arch) || '' }} tags: | - neondatabase/neon-test-extensions-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}-${{ matrix.arch }} + neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.arch }} - name: Build compute-tools image # compute-tools are Postgres independent, so build it only once - if: matrix.version == 'v17' + # We pick 16, because that builds on debian 11 with older glibc (and is + # thus compatible with newer glibc), rather than 17 on Debian 12, as + # that isn't guaranteed to be compatible with Debian 11 + if: matrix.version.pg == 'v16' uses: docker/build-push-action@v6 with: target: compute-tools-image @@ -688,6 +706,7 @@ jobs: GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }} BUILD_TAG=${{ needs.tag.outputs.build-tag }} TAG=${{ needs.build-build-tools-image.outputs.image-tag }} + DEBIAN_FLAVOR=${{ matrix.version.debian }} provenance: false push: true pull: true diff --git a/.github/workflows/cloud-regress.yml b/.github/workflows/cloud-regress.yml index de6babdde39a..ecafe183f8a8 100644 --- a/.github/workflows/cloud-regress.yml +++ b/.github/workflows/cloud-regress.yml @@ -42,7 +42,7 @@ jobs: - name: Patch the test run: | cd "vendor/postgres-v${DEFAULT_PG_VERSION}" - patch -p1 < "../../patches/cloud_regress_pg${DEFAULT_PG_VERSION}.patch" + patch -p1 < "../../compute/patches/cloud_regress_pg${DEFAULT_PG_VERSION}.patch" - name: Generate a random password id: pwgen diff --git a/.github/workflows/trigger-e2e-tests.yml b/.github/workflows/trigger-e2e-tests.yml index f25c1051cd98..cad97645327b 100644 --- a/.github/workflows/trigger-e2e-tests.yml +++ b/.github/workflows/trigger-e2e-tests.yml @@ -102,7 +102,7 @@ jobs: # Default set of platforms to run e2e tests on platforms='["docker", "k8s"]' - # If the PR changes vendor/, pgxn/ or libs/vm_monitor/ directories, or Dockerfile.compute-node, add k8s-neonvm to the list of platforms. + # If the PR changes vendor/, pgxn/ or libs/vm_monitor/ directories, or compute/Dockerfile.compute-node, add k8s-neonvm to the list of platforms. # If the workflow run is not a pull request, add k8s-neonvm to the list. if [ "$GITHUB_EVENT_NAME" == "pull_request" ]; then for f in $(gh api "/repos/${GITHUB_REPOSITORY}/pulls/${PR_NUMBER}/files" --paginate --jq '.[].filename'); do diff --git a/Cargo.lock b/Cargo.lock index e4dbd8b33398..d0702e09d412 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1321,7 +1321,6 @@ dependencies = [ "clap", "comfy-table", "compute_api", - "git-version", "humantime", "humantime-serde", "hyper 0.14.30", @@ -3578,7 +3577,6 @@ dependencies = [ "anyhow", "camino", "clap", - "git-version", "humantime", "pageserver", "pageserver_api", @@ -3617,7 +3615,6 @@ dependencies = [ "enumset", "fail", "futures", - "git-version", "hex", "hex-literal", "humantime", @@ -3737,7 +3734,6 @@ dependencies = [ "clap", "criterion", "futures", - "git-version", "hex-literal", "itertools 0.10.5", "once_cell", @@ -4307,7 +4303,6 @@ dependencies = [ "fallible-iterator", "framed-websockets", "futures", - "git-version", "hashbrown 0.14.5", "hashlink", "hex", @@ -5139,7 +5134,6 @@ dependencies = [ "desim", "fail", "futures", - "git-version", "hex", "humantime", "hyper 0.14.30", @@ -5702,7 +5696,6 @@ dependencies = [ "futures", "futures-core", "futures-util", - "git-version", "humantime", "hyper 0.14.30", "metrics", @@ -5730,7 +5723,6 @@ dependencies = [ "diesel_migrations", "fail", "futures", - "git-version", "hex", "humantime", "hyper 0.14.30", @@ -5783,7 +5775,6 @@ dependencies = [ "either", "futures", "futures-util", - "git-version", "hex", "humantime", "itertools 0.10.5", @@ -6715,6 +6706,7 @@ dependencies = [ "criterion", "fail", "futures", + "git-version", "hex", "hex-literal", "humantime", diff --git a/compute/Dockerfile.compute-node b/compute/Dockerfile.compute-node index 18c68c116a94..2c647a669c28 100644 --- a/compute/Dockerfile.compute-node +++ b/compute/Dockerfile.compute-node @@ -3,13 +3,15 @@ ARG REPOSITORY=neondatabase ARG IMAGE=build-tools ARG TAG=pinned ARG BUILD_TAG +ARG DEBIAN_FLAVOR=bullseye-slim ######################################################################################### # # Layer "build-deps" # ######################################################################################### -FROM debian:bullseye-slim AS build-deps +FROM debian:$DEBIAN_FLAVOR AS build-deps +ARG DEBIAN_FLAVOR RUN apt update && \ apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \ zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \ @@ -1027,7 +1029,8 @@ RUN cd compute_tools && mold -run cargo build --locked --profile release-line-de # ######################################################################################### -FROM debian:bullseye-slim AS compute-tools-image +FROM debian:$DEBIAN_FLAVOR AS compute-tools-image +ARG DEBIAN_FLAVOR COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl @@ -1037,7 +1040,8 @@ COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compu # ######################################################################################### -FROM debian:bullseye-slim AS pgbouncer +FROM debian:$DEBIAN_FLAVOR AS pgbouncer +ARG DEBIAN_FLAVOR RUN set -e \ && apt-get update \ && apt-get install -y \ @@ -1179,7 +1183,9 @@ ENV PGDATABASE=postgres # Put it all together into the final image # ######################################################################################### -FROM debian:bullseye-slim +FROM debian:$DEBIAN_FLAVOR +ARG DEBIAN_FLAVOR +ENV DEBIAN_FLAVOR=$DEBIAN_FLAVOR # Add user postgres RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \ echo "postgres:test_console_pass" | chpasswd && \ @@ -1211,21 +1217,34 @@ COPY --chmod=0644 compute/etc/neon_collector_autoscaling.yml /etc/neon_collector # Create remote extension download directory RUN mkdir /usr/local/download_extensions && chown -R postgres:postgres /usr/local/download_extensions - # Install: # libreadline8 for psql -# libicu67, locales for collations (including ICU and plpgsql_check) # liblz4-1 for lz4 # libossp-uuid16 for extension ossp-uuid -# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS +# libgeos, libsfcgal1, and libprotobuf-c1 for PostGIS # libxml2, libxslt1.1 for xml2 # libzstd1 for zstd # libboost* for rdkit # ca-certificates for communicating with s3 by compute_ctl -RUN apt update && \ + + +RUN apt update && \ + case $DEBIAN_FLAVOR in \ + # Version-specific installs for Bullseye (PG14-PG16): + # libicu67, locales for collations (including ICU and plpgsql_check) + # libgdal28, libproj19 for PostGIS + bullseye*) \ + VERSION_INSTALLS="libicu67 libgdal28 libproj19"; \ + ;; \ + # Version-specific installs for Bookworm (PG17): + # libicu72, locales for collations (including ICU and plpgsql_check) + # libgdal32, libproj25 for PostGIS + bookworm*) \ + VERSION_INSTALLS="libicu72 libgdal32 libproj25"; \ + ;; \ + esac && \ apt install --no-install-recommends -y \ gdb \ - libicu67 \ liblz4-1 \ libreadline8 \ libboost-iostreams1.74.0 \ @@ -1234,8 +1253,6 @@ RUN apt update && \ libboost-system1.74.0 \ libossp-uuid16 \ libgeos-c1v5 \ - libgdal28 \ - libproj19 \ libprotobuf-c1 \ libsfcgal1 \ libxml2 \ @@ -1244,7 +1261,8 @@ RUN apt update && \ libcurl4-openssl-dev \ locales \ procps \ - ca-certificates && \ + ca-certificates \ + $VERSION_INSTALLS && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8 diff --git a/patches/cloud_regress_pg16.patch b/compute/patches/cloud_regress_pg16.patch similarity index 100% rename from patches/cloud_regress_pg16.patch rename to compute/patches/cloud_regress_pg16.patch diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index c185d20484a4..df87c181bf47 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -9,7 +9,6 @@ anyhow.workspace = true camino.workspace = true clap.workspace = true comfy-table.workspace = true -git-version.workspace = true humantime.workspace = true nix.workspace = true once_cell.workspace = true diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 2b714fbfbf10..0c0e67dff057 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -346,7 +346,14 @@ impl StorageController { let pg_log_path = pg_data_path.join("postgres.log"); if !tokio::fs::try_exists(&pg_data_path).await? { - let initdb_args = ["-D", pg_data_path.as_ref(), "--username", &username()]; + let initdb_args = [ + "-D", + pg_data_path.as_ref(), + "--username", + &username(), + "--no-sync", + "--no-instructions", + ]; tracing::info!( "Initializing storage controller database with args: {:?}", initdb_args diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 651fcda8db52..73d89699edb6 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -4,8 +4,8 @@ use std::{str::FromStr, time::Duration}; use clap::{Parser, Subcommand}; use pageserver_api::{ controller_api::{ - NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, ShardSchedulingPolicy, - TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest, + AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, + ShardSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest, }, models::{ EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary, @@ -339,7 +339,7 @@ async fn main() -> anyhow::Result<()> { listen_pg_port, listen_http_addr, listen_http_port, - availability_zone_id, + availability_zone_id: AvailabilityZone(availability_zone_id), }), ) .await?; diff --git a/docker-compose/README.md b/docker-compose/README.md index bd47805a6791..648e4ca030c6 100644 --- a/docker-compose/README.md +++ b/docker-compose/README.md @@ -2,8 +2,8 @@ # Example docker compose configuration The configuration in this directory is used for testing Neon docker images: it is -not intended for deploying a usable system. To run a development environment where -you can experiment with a minature Neon system, use `cargo neon` rather than container images. +not intended for deploying a usable system. To run a development environment where +you can experiment with a miniature Neon system, use `cargo neon` rather than container images. This configuration does not start the storage controller, because the controller needs a way to reconfigure running computes, and no such thing exists in this setup. diff --git a/docs/rfcs/038-independent-compute-release.md b/docs/rfcs/038-independent-compute-release.md new file mode 100644 index 000000000000..3deaf1e6fdfb --- /dev/null +++ b/docs/rfcs/038-independent-compute-release.md @@ -0,0 +1,343 @@ +# Independent compute release + +Created at: 2024-08-30. Author: Alexey Kondratov (@ololobus) + +## Summary + +This document proposes an approach to fully independent compute release flow. It attempts to +cover the following features: + +- Process is automated as much as possible to minimize human errors. +- Compute<->storage protocol compatibility is ensured. +- A transparent release history is available with an easy rollback strategy. +- Although not in the scope of this document, there is a viable way to extend the proposed release + flow to achieve the canary and/or blue-green deployment strategies. + +## Motivation + +Previously, the compute release was tightly coupled to the storage release. This meant that once +some storage nodes got restarted with a newer version, all new compute starts using these nodes +automatically got a new version. Thus, two releases happen in parallel, which increases the blast +radius and makes ownership fuzzy. + +Now, we practice a manual v0 independent compute release flow -- after getting a new compute release +image and tag, we pin it region by region using Admin UI. It's better, but it still has its own flaws: + +1. It's a simple but fairly manual process, as you need to click through a few pages. +2. It's prone to human errors, e.g., you could mistype or copy the wrong compute tag. +3. We now require an additional approval in the Admin UI, which partially solves the 2., + but also makes the whole process pretty annoying, as you constantly need to go back + and forth between two people. + +## Non-goals + +It's not the goal of this document to propose a design for some general-purpose release tool like Helm. +The document considers how the current compute fleet is orchestrated at Neon. Even if we later +decide to split the control plane further (e.g., introduce a separate compute controller), the proposed +release process shouldn't change much, i.e., the releases table and API will reside in +one of the parts. + +Achieving the canary and/or blue-green deploy strategies is out of the scope of this document. They +were kept in mind, though, so it's expected that the proposed approach will lay down the foundation +for implementing them in future iterations. + +## Impacted components + +Compute, control plane, CI, observability (some Grafana dashboards may require changes). + +## Prior art + +One of the very close examples is how Helm tracks [releases history](https://helm.sh/docs/helm/helm_history/). + +In the code: + +- [Release](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/release.go#L20-L43) +- [Release info](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/info.go#L24-L40) +- [Release status](https://github.com/helm/helm/blob/2b30cf4b61d587d3f7594102bb202b787b9918db/pkg/release/status.go#L18-L42) + +TL;DR it has several important attributes: + +- Revision -- unique release ID/primary key. It is not the same as the application version, + because the same version can be deployed several times, e.g., after a newer version rollback. +- App version -- version of the application chart/code. +- Config -- set of overrides to the default config of the application. +- Status -- current status of the release in the history. +- Timestamps -- tracks when a release was created and deployed. + +## Proposed implementation + +### Separate release branch + +We will use a separate release branch, `release-compute`, to have a clean history for releases and commits. +In order to avoid confusion with storage releases, we will use a different prefix for compute [git release +tags](https://github.com/neondatabase/neon/releases) -- `release-compute-XXXX`. We will use the same tag for +Docker images as well. The `neondatabase/compute-node-v16:release-compute-XXXX` looks longer and a bit redundant, +but it's better to have image and git tags in sync. + +Currently, control plane relies on the numeric compute and storage release versions to decide on compute->storage +compatibility. Once we implement this proposal, we should drop this code as release numbers will be completely +independent. The only constraint we want is that it must monotonically increase within the same release branch. + +### Compute config/settings manifest + +We will create a new sub-directory `compute` and file `compute/manifest.yaml` with a structure: + +```yaml +pg_settings: + # Common settings for primaries and secondaries of all versions. + common: + wal_log_hints: "off" + max_wal_size: "1024" + + per_version: + 14: + # Common settings for both replica and primary of version PG 14 + common: + shared_preload_libraries: "neon,pg_stat_statements,extension_x" + 15: + common: + shared_preload_libraries: "neon,pg_stat_statements,extension_x" + # Settings that should be applied only to + replica: + # Available only starting Postgres 15th + recovery_prefetch: "off" + # ... + 17: + common: + # For example, if third-party `extension_x` is not yet available for PG 17 + shared_preload_libraries: "neon,pg_stat_statements" + replica: + recovery_prefetch: "off" +``` + +**N.B.** Setting value should be a string with `on|off` for booleans and a number (as a string) +without units for all numeric settings. That's how the control plane currently operates. + +The priority of settings will be (a higher number is a higher priority): + +1. Any static and hard-coded settings in the control plane +2. `pg_settings->common` +3. Per-version `common` +4. Per-version `replica` +5. Any per-user/project/endpoint overrides in the control plane +6. Any dynamic setting calculated based on the compute size + +**N.B.** For simplicity, we do not do any custom logic for `shared_preload_libraries`, so it's completely +overridden if specified on some level. Make sure that you include all necessary extensions in it when you +do any overrides. + +**N.B.** There is a tricky question about what to do with custom compute image pinning we sometimes +do for particular projects and customers. That's usually some ad-hoc work and images are based on +the latest compute image, so it's relatively safe to assume that we could use settings from the latest compute +release. If for some reason that's not true, and further overrides are needed, it's also possible to do +on the project level together with pinning the image, so it's on-call/engineer/support responsibility to +ensure that compute starts with the specified custom image. The only real risk is that compute image will get +stale and settings from new releases will drift away, so eventually it will get something incompatible, +but i) this is some operational issue, as we do not want stale images anyway, and ii) base settings +receive something really new so rarely that the chance of this happening is very low. If we want to solve it completely, +then together with pinning the image we could also pin the matching release revision in the control plane. + +The compute team will own the content of `compute/manifest.yaml`. + +### Control plane: releases table + +In order to store information about releases, the control plane will use a table `compute_releases` with the following +schema: + +```sql +CREATE TABLE compute_releases ( + -- Unique release ID + -- N.B. Revision won't by synchronized across all regions, because all control planes are technically independent + -- services. We have the same situation with Helm releases as well because they could be deployed and rolled back + -- independently in different clusters. + revision BIGSERIAL PRIMARY KEY, + -- Numeric version of the compute image, e.g. 9057 + version BIGINT NOT NULL, + -- Compute image tag, e.g. `release-9057` + tag TEXT NOT NULL, + -- Current release status. Currently, it will be a simple enum + -- * `deployed` -- release is deployed and used for new compute starts. + -- Exactly one release can have this status at a time. + -- * `superseded` -- release has been replaced by a newer one. + -- But we can always extend it in the future when we need more statuses + -- for more complex deployment strategies. + status TEXT NOT NULL, + -- Any additional metadata for compute in the corresponding release + manifest JSONB NOT NULL, + -- Timestamp when release record was created in the control plane database + created_at TIMESTAMP NOT NULL DEFAULT now(), + -- Timestamp when release deployment was finished + deployed_at TIMESTAMP +); +``` + +We keep track of the old releases not only for the sake of audit, but also because we usually have ~30% of +old computes started using the image from one of the previous releases. Yet, when users want to reconfigure +them without restarting, the control plane needs to know what settings are applicable to them, so we also need +information about the previous releases that are readily available. There could be some other auxiliary info +needed as well: supported extensions, compute flags, etc. + +**N.B.** Here, we can end up in an ambiguous situation when the same compute image is deployed twice, e.g., +it was deployed once, then rolled back, and then deployed again, potentially with a different manifest. Yet, +we could've started some computes with the first deployment and some with the second. Thus, when we need to +look up the manifest for the compute by its image tag, we will see two records in the table with the same tag, +but different revision numbers. We can assume that this could happen only in case of rollbacks, so we +can just take the latest revision for the given tag. + +### Control plane: management API + +The control plane will implement new API methods to manage releases: + +1. `POST /management/api/v2/compute_releases` to create a new release. With payload + + ```json + { + "version": 9057, + "tag": "release-9057", + "manifest": {} + } + ``` + + and response + + ```json + { + "revision": 53, + "version": 9057, + "tag": "release-9057", + "status": "deployed", + "manifest": {}, + "created_at": "2024-08-15T15:52:01.0000Z", + "deployed_at": "2024-08-15T15:52:01.0000Z", + } + ``` + + Here, we can actually mix-in custom (remote) extensions metadata into the `manifest`, so that the control plane + will get information about all available extensions not bundled into compute image. The corresponding + workflow in `neondatabase/build-custom-extensions` should produce it as an artifact and make + it accessible to the workflow in the `neondatabase/infra`. See the complete release flow below. Doing that, + we put a constraint that new custom extension requires new compute release, which is good for the safety, + but is not exactly what we want operational-wise (we want to be able to deploy new extensions without new + images). Yet, it can be solved incrementally: v0 -- do not do anything with extensions at all; + v1 -- put them into the same manifest; v2 -- make them separate entities with their own lifecycle. + + **N.B.** This method is intended to be used in CI workflows, and CI/network can be flaky. It's reasonable + to assume that we could retry the request several times, even though it's already succeeded. Although it's + not a big deal to create several identical releases one-by-one, it's better to avoid it, so the control plane + should check if the latest release is identical and just return `304 Not Modified` in this case. + +2. `POST /management/api/v2/compute_releases/rollback` to rollback to any previously deployed release. With payload + including the revision of the release to rollback to: + + ```json + { + "revision": 52 + } + ``` + + Rollback marks the current release as `superseded` and creates a new release with all the same data as the + requested revision, but with a new revision number. + + This rollback API is not strictly needed, as we can just use `infra` repo workflow to deploy any + available tag. It's still nice to have for on-call and any urgent matters, for example, if we need + to rollback and GitHub is down. It's much easier to specify only the revision number vs. crafting + all the necessary data for the new release payload. + +### Compute->storage compatibility tests + +In order to safely release new compute versions independently from storage, we need to ensure that the currently +deployed storage is compatible with the new compute version. Currently, we maintain backward compatibility +in storage, but newer computes may require a newer storage version. + +Remote end-to-end (e2e) tests [already accept](https://github.com/neondatabase/cloud/blob/e3468d433e0d73d02b7d7e738d027f509b522408/.github/workflows/testing.yml#L43-L48) +`storage_image_tag` and `compute_image_tag` as separate inputs. That means that we could reuse e2e tests to ensure +compatibility between storage and compute: + +1. Pick the latest storage release tag and use it as `storage_image_tag`. +2. Pick a new compute tag built in the current compute release PR and use it as `compute_image_tag`. + Here, we should use a temporary ECR image tag, because the final tag will be known only after the release PR is merged. +3. Trigger e2e tests as usual. + +### Release flow + +```mermaid + sequenceDiagram + + actor oncall as Compute on-call person + participant neon as neondatabase/neon + + box private + participant cloud as neondatabase/cloud + participant exts as neondatabase/build-custom-extensions + participant infra as neondatabase/infra + end + + box cloud + participant preprod as Pre-prod control plane + participant prod as Production control plane + participant k8s as Compute k8s + end + + oncall ->> neon: Open release PR into release-compute + + activate neon + neon ->> cloud: CI: trigger e2e compatibility tests + activate cloud + cloud -->> neon: CI: e2e tests pass + deactivate cloud + neon ->> neon: CI: pass PR checks, get approvals + deactivate neon + + oncall ->> neon: Merge release PR into release-compute + + activate neon + neon ->> neon: CI: pass checks, build and push images + neon ->> exts: CI: trigger extensions build + activate exts + exts -->> neon: CI: extensions are ready + deactivate exts + neon ->> neon: CI: create release tag + neon ->> infra: Trigger release workflow using the produced tag + deactivate neon + + activate infra + infra ->> infra: CI: pass checks + infra ->> preprod: Release new compute image to pre-prod automatically
POST /management/api/v2/compute_releases + activate preprod + preprod -->> infra: 200 OK + deactivate preprod + + infra ->> infra: CI: wait for per-region production deploy approvals + oncall ->> infra: CI: approve deploys region by region + infra ->> k8s: Prewarm new compute image + infra ->> prod: POST /management/api/v2/compute_releases + activate prod + prod -->> infra: 200 OK + deactivate prod + deactivate infra +``` + +## Further work + +As briefly mentioned in other sections, eventually, we would like to use more complex deployment strategies. +For example, we can pass a fraction of the total compute starts that should use the new release. Then we can +mark the release as `partial` or `canary` and monitor its performance. If everything is fine, we can promote it +to `deployed` status. If not, we can roll back to the previous one. + +## Alternatives + +In theory, we can try using Helm as-is: + +1. Write a compute Helm chart. That will actually have only some config map, which the control plane can access and read. + N.B. We could reuse the control plane chart as well, but then it's not a fully independent release again and even more fuzzy. +2. The control plane will read it and start using the new compute version for new starts. + +Drawbacks: + +1. Helm releases work best if the workload is controlled by the Helm chart itself. Then you can have different + deployment strategies like rolling update or canary or blue/green deployments. At Neon, the compute starts are controlled + by control plane, so it makes it much more tricky. +2. Releases visibility will suffer, i.e. instead of a nice table in the control plane and Admin UI, we would need to use + `helm` cli and/or K8s UIs like K8sLens. +3. We do not restart all computes shortly after the new version release. This means that for some features and compatibility + purpose (see above) control plane may need some auxiliary info from the previous releases. diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 40b7dbbbc2af..0ea30ce54f78 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::fmt::Display; use std::str::FromStr; use std::time::{Duration, Instant}; @@ -57,7 +58,7 @@ pub struct NodeRegisterRequest { pub listen_http_addr: String, pub listen_http_port: u16, - pub availability_zone_id: String, + pub availability_zone_id: AvailabilityZone, } #[derive(Serialize, Deserialize)] @@ -74,10 +75,19 @@ pub struct TenantPolicyRequest { pub scheduling: Option, } +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct AvailabilityZone(pub String); + +impl Display for AvailabilityZone { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + #[derive(Serialize, Deserialize)] pub struct ShardsPreferredAzsRequest { #[serde(flatten)] - pub preferred_az_ids: HashMap, + pub preferred_az_ids: HashMap, } #[derive(Serialize, Deserialize)] diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index c9be53f0b0c0..45abda0ad85d 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -37,14 +37,11 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; /// ```mermaid /// stateDiagram-v2 /// -/// [*] --> Loading: spawn_load() /// [*] --> Attaching: spawn_attach() /// -/// Loading --> Activating: activate() /// Attaching --> Activating: activate() /// Activating --> Active: infallible /// -/// Loading --> Broken: load() failure /// Attaching --> Broken: attach() failure /// /// Active --> Stopping: set_stopping(), part of shutdown & detach @@ -68,10 +65,6 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; )] #[serde(tag = "slug", content = "data")] pub enum TenantState { - /// This tenant is being loaded from local disk. - /// - /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass. - Loading, /// This tenant is being attached to the pageserver. /// /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass. @@ -121,8 +114,6 @@ impl TenantState { // But, our attach task might still be fetching the remote timelines, etc. // So, return `Maybe` while Attaching, making Console wait for the attach task to finish. Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe, - // tenant mgr startup distinguishes attaching from loading via marker file. - Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached, // We only reach Active after successful load / attach. // So, call atttachment status Attached. Self::Active => Attached, @@ -191,10 +182,11 @@ impl LsnLease { } /// The only [`TenantState`] variants we could be `TenantState::Activating` from. +/// +/// XXX: We used to have more variants here, but now it's just one, which makes this rather +/// useless. Remove, once we've checked that there's no client code left that looks at this. #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub enum ActivatingFrom { - /// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`] - Loading, /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`] Attaching, } @@ -1562,11 +1554,8 @@ mod tests { #[test] fn tenantstatus_activating_serde() { - let states = [ - TenantState::Activating(ActivatingFrom::Loading), - TenantState::Activating(ActivatingFrom::Attaching), - ]; - let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]"; + let states = [TenantState::Activating(ActivatingFrom::Attaching)]; + let expected = "[{\"slug\":\"Activating\",\"data\":\"Attaching\"}]"; let actual = serde_json::to_string(&states).unwrap(); @@ -1581,13 +1570,7 @@ mod tests { fn tenantstatus_activating_strum() { // tests added, because we use these for metrics let examples = [ - (line!(), TenantState::Loading, "Loading"), (line!(), TenantState::Attaching, "Attaching"), - ( - line!(), - TenantState::Activating(ActivatingFrom::Loading), - "Activating", - ), ( line!(), TenantState::Activating(ActivatingFrom::Attaching), diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index f199b155540f..7d284a6fc567 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -19,6 +19,7 @@ bincode.workspace = true bytes.workspace = true camino.workspace = true chrono.workspace = true +git-version.workspace = true hex = { workspace = true, features = ["serde"] } humantime.workspace = true hyper = { workspace = true, features = ["full"] } diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 03fb36caf8b6..aacc1e1dd5e8 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -92,6 +92,10 @@ pub mod toml_edit_ext; pub mod circuit_breaker; +// Re-export used in macro. Avoids adding git-version as dep in target crates. +#[doc(hidden)] +pub use git_version; + /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages /// /// we have several cases: @@ -131,7 +135,7 @@ macro_rules! project_git_version { ($const_identifier:ident) => { // this should try GIT_VERSION first only then git_version::git_version! const $const_identifier: &::core::primitive::str = { - const __COMMIT_FROM_GIT: &::core::primitive::str = git_version::git_version! { + const __COMMIT_FROM_GIT: &::core::primitive::str = $crate::git_version::git_version! { prefix = "", fallback = "unknown", args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 0eb48d6823b8..f1fc3a86fe4b 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -27,7 +27,6 @@ crc32c.workspace = true either.workspace = true fail.workspace = true futures.workspace = true -git-version.workspace = true hex.workspace = true humantime.workspace = true humantime-serde.workspace = true diff --git a/pageserver/compaction/Cargo.toml b/pageserver/compaction/Cargo.toml index 52b58fc298ce..d4f89ac38a19 100644 --- a/pageserver/compaction/Cargo.toml +++ b/pageserver/compaction/Cargo.toml @@ -12,7 +12,6 @@ anyhow.workspace = true async-stream.workspace = true clap = { workspace = true, features = ["string"] } futures.workspace = true -git-version.workspace = true itertools.workspace = true once_cell.workspace = true pageserver_api.workspace = true diff --git a/pageserver/ctl/Cargo.toml b/pageserver/ctl/Cargo.toml index 9592002de131..a753f806a076 100644 --- a/pageserver/ctl/Cargo.toml +++ b/pageserver/ctl/Cargo.toml @@ -10,7 +10,6 @@ license.workspace = true anyhow.workspace = true camino.workspace = true clap = { workspace = true, features = ["string"] } -git-version.workspace = true humantime.workspace = true pageserver = { path = ".." } pageserver_api.workspace = true diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs index f6d1c35a8ce1..d0a967b9207e 100644 --- a/pageserver/src/control_plane_client.rs +++ b/pageserver/src/control_plane_client.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use futures::Future; use pageserver_api::{ - controller_api::NodeRegisterRequest, + controller_api::{AvailabilityZone, NodeRegisterRequest}, shard::TenantShardId, upcall_api::{ ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, @@ -148,10 +148,10 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient { .and_then(|jv| jv.as_str().map(|str| str.to_owned())); match az_id_from_metadata { - Some(az_id) => Some(az_id), + Some(az_id) => Some(AvailabilityZone(az_id)), None => { tracing::warn!("metadata.json does not contain an 'availability_zone_id' field"); - conf.availability_zone.clone() + conf.availability_zone.clone().map(AvailabilityZone) } } }; diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 162e8d1836ff..366bd8290340 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -3208,45 +3208,38 @@ pub(crate) mod tenant_throttling { impl TimelineGet { pub(crate) fn new(tenant_shard_id: &TenantShardId) -> Self { + let per_tenant_label_values = &[ + KIND, + &tenant_shard_id.tenant_id.to_string(), + &tenant_shard_id.shard_slug().to_string(), + ]; TimelineGet { count_accounted_start: { GlobalAndPerTenantIntCounter { global: COUNT_ACCOUNTED_START.with_label_values(&[KIND]), - per_tenant: COUNT_ACCOUNTED_START_PER_TENANT.with_label_values(&[ - KIND, - &tenant_shard_id.tenant_id.to_string(), - &tenant_shard_id.shard_slug().to_string(), - ]), + per_tenant: COUNT_ACCOUNTED_START_PER_TENANT + .with_label_values(per_tenant_label_values), } }, count_accounted_finish: { GlobalAndPerTenantIntCounter { global: COUNT_ACCOUNTED_FINISH.with_label_values(&[KIND]), - per_tenant: COUNT_ACCOUNTED_FINISH_PER_TENANT.with_label_values(&[ - KIND, - &tenant_shard_id.tenant_id.to_string(), - &tenant_shard_id.shard_slug().to_string(), - ]), + per_tenant: COUNT_ACCOUNTED_FINISH_PER_TENANT + .with_label_values(per_tenant_label_values), } }, wait_time: { GlobalAndPerTenantIntCounter { global: WAIT_USECS.with_label_values(&[KIND]), - per_tenant: WAIT_USECS_PER_TENANT.with_label_values(&[ - KIND, - &tenant_shard_id.tenant_id.to_string(), - &tenant_shard_id.shard_slug().to_string(), - ]), + per_tenant: WAIT_USECS_PER_TENANT + .with_label_values(per_tenant_label_values), } }, count_throttled: { GlobalAndPerTenantIntCounter { global: WAIT_COUNT.with_label_values(&[KIND]), - per_tenant: WAIT_COUNT_PER_TENANT.with_label_values(&[ - KIND, - &tenant_shard_id.tenant_id.to_string(), - &tenant_shard_id.shard_slug().to_string(), - ]), + per_tenant: WAIT_COUNT_PER_TENANT + .with_label_values(per_tenant_label_values), } }, } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 5ed63734f494..53cbaea621eb 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1968,9 +1968,6 @@ impl Tenant { TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => { panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state); } - TenantState::Loading => { - *current_state = TenantState::Activating(ActivatingFrom::Loading); - } TenantState::Attaching => { *current_state = TenantState::Activating(ActivatingFrom::Attaching); } @@ -2151,7 +2148,7 @@ impl Tenant { async fn set_stopping( &self, progress: completion::Barrier, - allow_transition_from_loading: bool, + _allow_transition_from_loading: bool, allow_transition_from_attaching: bool, ) -> Result<(), SetStoppingError> { let mut rx = self.state.subscribe(); @@ -2166,7 +2163,6 @@ impl Tenant { ); false } - TenantState::Loading => allow_transition_from_loading, TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true, }) .await @@ -2185,13 +2181,6 @@ impl Tenant { *current_state = TenantState::Stopping { progress }; true } - TenantState::Loading => { - if !allow_transition_from_loading { - unreachable!("3we ensured above that we're done with activation, and, there is no re-activation") - }; - *current_state = TenantState::Stopping { progress }; - true - } TenantState::Active => { // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines // are created after the transition to Stopping. That's harmless, as the Timelines @@ -2247,7 +2236,7 @@ impl Tenant { // The load & attach routines own the tenant state until it has reached `Active`. // So, wait until it's done. rx.wait_for(|state| match state { - TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => { + TenantState::Activating(_) | TenantState::Attaching => { info!( "waiting for {} to turn Active|Broken|Stopping", <&'static str>::from(state) @@ -2267,7 +2256,7 @@ impl Tenant { let reason = reason.to_string(); self.state.send_modify(|current_state| { match *current_state { - TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => { + TenantState::Activating(_) | TenantState::Attaching => { unreachable!("we ensured above that we're done with activation, and, there is no re-activation") } TenantState::Active => { @@ -2311,7 +2300,7 @@ impl Tenant { loop { let current_state = receiver.borrow_and_update().clone(); match current_state { - TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => { + TenantState::Attaching | TenantState::Activating(_) => { // in these states, there's a chance that we can reach ::Active self.activate_now(); match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await { @@ -4144,7 +4133,7 @@ pub(crate) mod harness { let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager)); let tenant = Arc::new(Tenant::new( - TenantState::Loading, + TenantState::Attaching, self.conf, AttachedTenantConf::try_from(LocationConf::attached_single( TenantConfOpt::from(self.tenant_conf.clone()), diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 34f1b15138ec..2b212cfed5d7 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{ use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ - BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, + BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadCoalesceMode, VectoredReadPlanner, }; use crate::tenant::PageReconstructError; @@ -1021,13 +1021,30 @@ impl DeltaLayerInner { continue; } }; - + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter().rev() { if Some(meta.meta.key) == ignore_key_with_err { continue; } + let blob_read = meta.read(&view).await; + let blob_read = match blob_read { + Ok(buf) => buf, + Err(e) => { + reconstruct_state.on_key_error( + meta.meta.key, + PageReconstructError::Other(anyhow!(e).context(format!( + "Failed to decompress blob from virtual file {}", + self.file.path, + ))), + ); + + ignore_key_with_err = Some(meta.meta.key); + continue; + } + }; + + let value = Value::des(&blob_read); - let value = Value::des(&blobs_buf.buf[meta.start..meta.end]); let value = match value { Ok(v) => v, Err(e) => { @@ -1243,21 +1260,21 @@ impl DeltaLayerInner { buf.reserve(read.size()); let res = reader.read_blobs(&read, buf, ctx).await?; + let view = BufView::new_slice(&res.buf); + for blob in res.blobs { let key = blob.meta.key; let lsn = blob.meta.lsn; - let data = &res.buf[blob.start..blob.end]; + + let data = blob.read(&view).await?; #[cfg(debug_assertions)] - Value::des(data) + Value::des(&data) .with_context(|| { format!( - "blob failed to deserialize for {}@{}, {}..{}: {:?}", - blob.meta.key, - blob.meta.lsn, - blob.start, - blob.end, - utils::Hex(data) + "blob failed to deserialize for {}: {:?}", + blob, + utils::Hex(&data) ) }) .unwrap(); @@ -1265,15 +1282,15 @@ impl DeltaLayerInner { // is it an image or will_init walrecord? // FIXME: this could be handled by threading the BlobRef to the // VectoredReadBuilder - let will_init = crate::repository::ValueBytes::will_init(data) + let will_init = crate::repository::ValueBytes::will_init(&data) .inspect_err(|_e| { #[cfg(feature = "testing")] - tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value"); + tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value"); }) .unwrap_or(false); per_blob_copy.clear(); - per_blob_copy.extend_from_slice(data); + per_blob_copy.extend_from_slice(&data); let (tmp, res) = writer .put_value_bytes( @@ -1538,8 +1555,11 @@ impl<'a> DeltaLayerIterator<'a> { .read_blobs(&plan, buf, self.ctx) .await?; let frozen_buf = blobs_buf.buf.freeze(); + let view = BufView::new_bytes(frozen_buf); for meta in blobs_buf.blobs.iter() { - let value = Value::des(&frozen_buf[meta.start..meta.end])?; + let blob_read = meta.read(&view).await?; + let value = Value::des(&blob_read)?; + next_batch.push_back((meta.meta.key, meta.meta.lsn, value)); } self.key_values_batch = next_batch; @@ -1916,9 +1936,13 @@ pub(crate) mod test { let blobs_buf = vectored_blob_reader .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx) .await?; + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { - let value = &blobs_buf.buf[meta.start..meta.end]; - assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]); + let value = meta.read(&view).await?; + assert_eq!( + &value[..], + &entries_meta.index[&(meta.meta.key, meta.meta.lsn)] + ); } buf = Some(blobs_buf.buf); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 5de2582ab79f..940d169db096 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -36,7 +36,8 @@ use crate::tenant::disk_btree::{ }; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ - BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner, + BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, + VectoredReadPlanner, }; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; @@ -547,15 +548,15 @@ impl ImageLayerInner { let buf = BytesMut::with_capacity(buf_size); let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?; - let frozen_buf = blobs_buf.buf.freeze(); + let view = BufView::new_bytes(frozen_buf); for meta in blobs_buf.blobs.iter() { - let img_buf = frozen_buf.slice(meta.start..meta.end); + let img_buf = meta.read(&view).await?; key_count += 1; writer - .put_image(meta.meta.key, img_buf, ctx) + .put_image(meta.meta.key, img_buf.into_bytes(), ctx) .await .context(format!("Storing key {}", meta.meta.key))?; } @@ -602,13 +603,28 @@ impl ImageLayerInner { match res { Ok(blobs_buf) => { let frozen_buf = blobs_buf.buf.freeze(); - + let view = BufView::new_bytes(frozen_buf); for meta in blobs_buf.blobs.iter() { - let img_buf = frozen_buf.slice(meta.start..meta.end); + let img_buf = meta.read(&view).await; + + let img_buf = match img_buf { + Ok(img_buf) => img_buf, + Err(e) => { + reconstruct_state.on_key_error( + meta.meta.key, + PageReconstructError::Other(anyhow!(e).context(format!( + "Failed to decompress blob from virtual file {}", + self.file.path, + ))), + ); + + continue; + } + }; reconstruct_state.update_key( &meta.meta.key, self.lsn, - Value::Image(img_buf), + Value::Image(img_buf.into_bytes()), ); } } @@ -1025,10 +1041,15 @@ impl<'a> ImageLayerIterator<'a> { let blobs_buf = vectored_blob_reader .read_blobs(&plan, buf, self.ctx) .await?; - let frozen_buf: Bytes = blobs_buf.buf.freeze(); + let frozen_buf = blobs_buf.buf.freeze(); + let view = BufView::new_bytes(frozen_buf); for meta in blobs_buf.blobs.iter() { - let img_buf = frozen_buf.slice(meta.start..meta.end); - next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf))); + let img_buf = meta.read(&view).await?; + next_batch.push_back(( + meta.meta.key, + self.image_layer.lsn, + Value::Image(img_buf.into_bytes()), + )); } self.key_values_batch = next_batch; Ok(()) diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 341febb30ab9..3f0f8a21c8a5 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -481,8 +481,7 @@ async fn ingest_housekeeping_loop(tenant: Arc, cancel: CancellationToken let allowed_rps = tenant.timeline_get_throttle.steady_rps(); let delta = now - prev; info!( - n_seconds=%format_args!("{:.3}", - delta.as_secs_f64()), + n_seconds=%format_args!("{:.3}", delta.as_secs_f64()), count_accounted = count_accounted_finish, // don't break existing log scraping count_throttled, sum_throttled_usecs, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index c98efd5f7184..d301ba23eafa 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -112,7 +112,7 @@ use pageserver_api::reltag::RelTag; use pageserver_api::shard::ShardIndex; use postgres_connection::PgConnectionConfig; -use postgres_ffi::to_pg_timestamp; +use postgres_ffi::{to_pg_timestamp, v14::xlog_utils, WAL_SEGMENT_SIZE}; use utils::{ completion, generation::Generation, @@ -1337,6 +1337,10 @@ impl Timeline { _ctx: &RequestContext, ) -> anyhow::Result { let lease = { + // Normalize the requested LSN to be aligned, and move to the first record + // if it points to the beginning of the page (header). + let lsn = xlog_utils::normalize_lsn(lsn, WAL_SEGMENT_SIZE); + let mut gc_info = self.gc_info.write().unwrap(); let valid_until = SystemTime::now() + length; diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 2f6cb4d73a69..26c2861b9308 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -30,8 +30,8 @@ use crate::{ pgdatadir_mapping::CollectKeySpaceError, task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ - storage_layer::LayerVisibilityHint, tasks::BackgroundLoopKind, timeline::EvictionError, - LogicalSizeCalculationCause, Tenant, + size::CalculateSyntheticSizeError, storage_layer::LayerVisibilityHint, + tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant, }, }; @@ -557,6 +557,8 @@ impl Timeline { gather_result = gather => { match gather_result { Ok(_) => {}, + // It can happen sometimes that we hit this instead of the cancellation token firing above + Err(CalculateSyntheticSizeError::Cancelled) => {} Err(e) => { // We don't care about the result, but, if it failed, we should log it, // since consumption metric might be hitting the cached value and diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 553edf6d8b34..aa37a45898bd 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -16,8 +16,9 @@ //! Note that the vectored blob api does *not* go through the page cache. use std::collections::BTreeMap; +use std::ops::Deref; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use pageserver_api::key::Key; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::BoundedBuf; @@ -35,11 +36,123 @@ pub struct BlobMeta { pub lsn: Lsn, } -/// Blob offsets into [`VectoredBlobsBuf::buf`] +/// A view into the vectored blobs read buffer. +#[derive(Clone, Debug)] +pub(crate) enum BufView<'a> { + Slice(&'a [u8]), + Bytes(bytes::Bytes), +} + +impl<'a> BufView<'a> { + /// Creates a new slice-based view on the blob. + pub fn new_slice(slice: &'a [u8]) -> Self { + Self::Slice(slice) + } + + /// Creates a new [`bytes::Bytes`]-based view on the blob. + pub fn new_bytes(bytes: bytes::Bytes) -> Self { + Self::Bytes(bytes) + } + + /// Convert the view into `Bytes`. + /// + /// If using slice as the underlying storage, the copy will be an O(n) operation. + pub fn into_bytes(self) -> Bytes { + match self { + BufView::Slice(slice) => Bytes::copy_from_slice(slice), + BufView::Bytes(bytes) => bytes, + } + } + + /// Creates a sub-view of the blob based on the range. + fn view(&self, range: std::ops::Range) -> Self { + match self { + BufView::Slice(slice) => BufView::Slice(&slice[range]), + BufView::Bytes(bytes) => BufView::Bytes(bytes.slice(range)), + } + } +} + +impl<'a> Deref for BufView<'a> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + match self { + BufView::Slice(slice) => slice, + BufView::Bytes(bytes) => bytes, + } + } +} + +impl<'a> AsRef<[u8]> for BufView<'a> { + fn as_ref(&self) -> &[u8] { + match self { + BufView::Slice(slice) => slice, + BufView::Bytes(bytes) => bytes.as_ref(), + } + } +} + +impl<'a> From<&'a [u8]> for BufView<'a> { + fn from(value: &'a [u8]) -> Self { + Self::new_slice(value) + } +} + +impl From for BufView<'_> { + fn from(value: Bytes) -> Self { + Self::new_bytes(value) + } +} + +/// Blob offsets into [`VectoredBlobsBuf::buf`]. The byte ranges is potentially compressed, +/// subject to [`VectoredBlob::compression_bits`]. pub struct VectoredBlob { - pub start: usize, - pub end: usize, + /// Blob metadata. pub meta: BlobMeta, + /// Start offset. + start: usize, + /// End offset. + end: usize, + /// Compression used on the the blob. + compression_bits: u8, +} + +impl VectoredBlob { + /// Reads a decompressed view of the blob. + pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result, std::io::Error> { + let view = buf.view(self.start..self.end); + + match self.compression_bits { + BYTE_UNCOMPRESSED => Ok(view), + BYTE_ZSTD => { + let mut decompressed_vec = Vec::new(); + let mut decoder = + async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec); + decoder.write_all(&view).await?; + decoder.flush().await?; + // Zero-copy conversion from `Vec` to `Bytes` + Ok(BufView::new_bytes(Bytes::from(decompressed_vec))) + } + bits => { + let error = std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}", self.meta.key, self.meta.lsn, self.start, self.end), + ); + Err(error) + } + } + } +} + +impl std::fmt::Display for VectoredBlob { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}@{}, {}..{}", + self.meta.key, self.meta.lsn, self.start, self.end + ) + } } /// Return type of [`VectoredBlobReader::read_blobs`] @@ -514,7 +627,7 @@ impl<'a> VectoredBlobReader<'a> { ); } - let mut buf = self + let buf = self .file .read_exact_at(buf.slice(0..read.size()), read.start, ctx) .await? @@ -529,9 +642,6 @@ impl<'a> VectoredBlobReader<'a> { // of a blob is implicit: the start of the next blob if one exists // or the end of the read. - // Some scratch space, put here for reusing the allocation - let mut decompressed_vec = Vec::new(); - for (blob_start, meta) in blobs_at { let blob_start_in_buf = blob_start - start_offset; let first_len_byte = buf[blob_start_in_buf as usize]; @@ -557,35 +667,14 @@ impl<'a> VectoredBlobReader<'a> { ) }; - let start_raw = blob_start_in_buf + size_length; - let end_raw = start_raw + blob_size; - let (start, end); - if compression_bits == BYTE_UNCOMPRESSED { - start = start_raw as usize; - end = end_raw as usize; - } else if compression_bits == BYTE_ZSTD { - let mut decoder = - async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec); - decoder - .write_all(&buf[start_raw as usize..end_raw as usize]) - .await?; - decoder.flush().await?; - start = buf.len(); - buf.extend_from_slice(&decompressed_vec); - end = buf.len(); - decompressed_vec.clear(); - } else { - let error = std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("invalid compression byte {compression_bits:x}"), - ); - return Err(error); - } + let start = (blob_start_in_buf + size_length) as usize; + let end = start + blob_size as usize; metas.push(VectoredBlob { start, end, meta: *meta, + compression_bits, }); } @@ -1020,8 +1109,13 @@ mod tests { let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?; assert_eq!(result.blobs.len(), 1); let read_blob = &result.blobs[0]; - let read_buf = &result.buf[read_blob.start..read_blob.end]; - assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}"); + let view = BufView::new_slice(&result.buf); + let read_buf = read_blob.read(&view).await?; + assert_eq!( + &blob[..], + &read_buf[..], + "mismatch for idx={idx} at offset={offset}" + ); buf = result.buf; } Ok(()) diff --git a/pgxn/neon/neon_perf_counters.c b/pgxn/neon/neon_perf_counters.c index 3e86d5b26276..de653826c019 100644 --- a/pgxn/neon/neon_perf_counters.c +++ b/pgxn/neon/neon_perf_counters.c @@ -32,7 +32,7 @@ NeonPerfCountersShmemSize(void) return size; } -bool +void NeonPerfCountersShmemInit(void) { bool found; diff --git a/pgxn/neon/neon_perf_counters.h b/pgxn/neon/neon_perf_counters.h index ae35e8c3a515..02163ada5571 100644 --- a/pgxn/neon/neon_perf_counters.h +++ b/pgxn/neon/neon_perf_counters.h @@ -105,7 +105,7 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared; extern void inc_getpage_wait(uint64 latency); extern Size NeonPerfCountersShmemSize(void); -extern bool NeonPerfCountersShmemInit(void); +extern void NeonPerfCountersShmemInit(void); #endif /* NEON_PERF_COUNTERS_H */ diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 6703eb06eb29..501ce050e071 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -29,7 +29,6 @@ dashmap.workspace = true env_logger.workspace = true framed-websockets.workspace = true futures.workspace = true -git-version.workspace = true hashbrown.workspace = true hashlink.workspace = true hex.workspace = true diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index daf21c70b045..67f32b3cc08b 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -21,7 +21,6 @@ chrono.workspace = true clap = { workspace = true, features = ["derive"] } crc32c.workspace = true fail.workspace = true -git-version.workspace = true hex.workspace = true humantime.workspace = true hyper.workspace = true diff --git a/storage_broker/Cargo.toml b/storage_broker/Cargo.toml index 82ec0aa272e3..5359f586e49d 100644 --- a/storage_broker/Cargo.toml +++ b/storage_broker/Cargo.toml @@ -15,7 +15,6 @@ const_format.workspace = true futures.workspace = true futures-core.workspace = true futures-util.workspace = true -git-version.workspace = true humantime.workspace = true hyper = { workspace = true, features = ["full"] } once_cell.workspace = true diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index a96d64e09670..9ed0501026dc 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -20,7 +20,6 @@ chrono.workspace = true clap.workspace = true fail.workspace = true futures.workspace = true -git-version.workspace = true hex.workspace = true hyper.workspace = true humantime.workspace = true diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 95e4a469ac95..4dd8badd0391 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -515,7 +515,7 @@ async fn handle_tenant_timeline_passthrough( tracing::info!("Proxying request for tenant {} ({})", tenant_id, path); // Find the node that holds shard zero - let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id)?; + let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?; // Callers will always pass an unsharded tenant ID. Before proxying, we must // rewrite this to a shard-aware shard zero ID. @@ -545,10 +545,10 @@ async fn handle_tenant_timeline_passthrough( let _timer = latency.start_timer(labels.clone()); let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref()); - let resp = client.get_raw(path).await.map_err(|_e| - // FIXME: give APiError a proper Unavailable variant. We return 503 here because - // if we can't successfully send a request to the pageserver, we aren't available. - ApiError::ShuttingDown)?; + let resp = client.get_raw(path).await.map_err(|e| + // We return 503 here because if we can't successfully send a request to the pageserver, + // either we aren't available or the pageserver is unavailable. + ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?; if !resp.status().is_success() { let error_counter = &METRICS_REGISTRY @@ -557,6 +557,19 @@ async fn handle_tenant_timeline_passthrough( error_counter.inc(labels); } + // Transform 404 into 503 if we raced with a migration + if resp.status() == reqwest::StatusCode::NOT_FOUND { + // Look up node again: if we migrated it will be different + let (new_node, _tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?; + if new_node.get_id() != node.get_id() { + // Rather than retry here, send the client a 503 to prompt a retry: this matches + // the pageserver's use of 503, and all clients calling this API should retry on 503. + return Err(ApiError::ResourceUnavailable( + format!("Pageserver {node} returned 404, was migrated to {new_node}").into(), + )); + } + } + // We have a reqest::Response, would like a http::Response let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?); for (k, v) in resp.headers() { diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index cb9ce10d230a..4cc9b0070dc7 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -2,8 +2,8 @@ use std::{str::FromStr, time::Duration}; use pageserver_api::{ controller_api::{ - NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy, - TenantLocateResponseShard, + AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, + NodeSchedulingPolicy, TenantLocateResponseShard, }, shard::TenantShardId, }; @@ -36,7 +36,7 @@ pub(crate) struct Node { listen_pg_addr: String, listen_pg_port: u16, - availability_zone_id: String, + availability_zone_id: AvailabilityZone, // This cancellation token means "stop any RPCs in flight to this node, and don't start // any more". It is not related to process shutdown. @@ -64,8 +64,8 @@ impl Node { } #[allow(unused)] - pub(crate) fn get_availability_zone_id(&self) -> &str { - self.availability_zone_id.as_str() + pub(crate) fn get_availability_zone_id(&self) -> &AvailabilityZone { + &self.availability_zone_id } pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy { @@ -181,7 +181,7 @@ impl Node { listen_http_port: u16, listen_pg_addr: String, listen_pg_port: u16, - availability_zone_id: String, + availability_zone_id: AvailabilityZone, ) -> Self { Self { id, @@ -204,7 +204,7 @@ impl Node { listen_http_port: self.listen_http_port as i32, listen_pg_addr: self.listen_pg_addr.clone(), listen_pg_port: self.listen_pg_port as i32, - availability_zone_id: self.availability_zone_id.clone(), + availability_zone_id: self.availability_zone_id.0.clone(), } } @@ -219,7 +219,7 @@ impl Node { listen_http_port: np.listen_http_port as u16, listen_pg_addr: np.listen_pg_addr, listen_pg_port: np.listen_pg_port as u16, - availability_zone_id: np.availability_zone_id, + availability_zone_id: AvailabilityZone(np.availability_zone_id), cancel: CancellationToken::new(), } } diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 1dc1040d9637..14cc51240d10 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -9,6 +9,7 @@ use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::Connection; use itertools::Itertools; +use pageserver_api::controller_api::AvailabilityZone; use pageserver_api::controller_api::MetadataHealthRecord; use pageserver_api::controller_api::ShardSchedulingPolicy; use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy}; @@ -667,8 +668,8 @@ impl Persistence { pub(crate) async fn set_tenant_shard_preferred_azs( &self, - preferred_azs: Vec<(TenantShardId, String)>, - ) -> DatabaseResult> { + preferred_azs: Vec<(TenantShardId, AvailabilityZone)>, + ) -> DatabaseResult> { use crate::schema::tenant_shards::dsl::*; self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| { @@ -679,7 +680,7 @@ impl Persistence { .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .set(preferred_az_id.eq(preferred_az)) + .set(preferred_az_id.eq(preferred_az.0.clone())) .execute(conn)?; if updated == 1 { diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index 750bcd7c0138..2c42da404355 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -463,7 +463,7 @@ impl Reconciler { for (timeline_id, baseline_lsn) in &baseline { match latest.get(timeline_id) { Some(latest_lsn) => { - tracing::info!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}"); + tracing::info!(timeline_id = %timeline_id, "🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}"); if latest_lsn < baseline_lsn { any_behind = true; } @@ -541,6 +541,8 @@ impl Reconciler { } } + pausable_failpoint!("reconciler-live-migrate-pre-generation-inc"); + // Increment generation before attaching to new pageserver self.generation = Some( self.persistence @@ -617,6 +619,8 @@ impl Reconciler { }, ); + pausable_failpoint!("reconciler-live-migrate-post-detach"); + tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",); let dest_final_conf = build_location_config( &self.shard, diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 1cb1fb104d60..2414d95eb89b 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -1,6 +1,6 @@ use crate::{node::Node, tenant_shard::TenantShard}; use itertools::Itertools; -use pageserver_api::models::PageserverUtilization; +use pageserver_api::{controller_api::AvailabilityZone, models::PageserverUtilization}; use serde::Serialize; use std::{collections::HashMap, fmt::Debug}; use utils::{http::error::ApiError, id::NodeId}; @@ -32,6 +32,8 @@ pub(crate) struct SchedulerNode { shard_count: usize, /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`]. attached_shard_count: usize, + /// Availability zone id in which the node resides + az: AvailabilityZone, /// Whether this node is currently elegible to have new shards scheduled (this is derived /// from a node's availability state and scheduling policy). @@ -42,6 +44,7 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized { fn generate( node_id: &NodeId, node: &mut SchedulerNode, + preferred_az: &Option, context: &ScheduleContext, ) -> Option; fn is_overloaded(&self) -> bool; @@ -62,6 +65,72 @@ impl ShardTag for SecondaryShardTag { type Score = NodeSecondarySchedulingScore; } +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +enum AzMatch { + Yes, + No, + Unknown, +} + +impl AzMatch { + fn new(node_az: &AvailabilityZone, shard_preferred_az: Option<&AvailabilityZone>) -> Self { + match shard_preferred_az { + Some(preferred_az) if preferred_az == node_az => Self::Yes, + Some(_preferred_az) => Self::No, + None => Self::Unknown, + } + } +} + +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +struct AttachmentAzMatch(AzMatch); + +impl Ord for AttachmentAzMatch { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Lower scores indicate a more suitable node. + // Note that we prefer a node for which we don't have + // info to a node which we are certain doesn't match the + // preferred AZ of the shard. + let az_match_score = |az_match: &AzMatch| match az_match { + AzMatch::Yes => 0, + AzMatch::Unknown => 1, + AzMatch::No => 2, + }; + + az_match_score(&self.0).cmp(&az_match_score(&other.0)) + } +} + +impl PartialOrd for AttachmentAzMatch { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +struct SecondaryAzMatch(AzMatch); + +impl Ord for SecondaryAzMatch { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Lower scores indicate a more suitable node. + // For secondary locations we wish to avoid the preferred AZ + // of the shard. + let az_match_score = |az_match: &AzMatch| match az_match { + AzMatch::No => 0, + AzMatch::Unknown => 1, + AzMatch::Yes => 2, + }; + + az_match_score(&self.0).cmp(&az_match_score(&other.0)) + } +} + +impl PartialOrd for SecondaryAzMatch { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// Scheduling score of a given node for shard attachments. /// Lower scores indicate more suitable nodes. /// Ordering is given by member declaration order (top to bottom). @@ -70,6 +139,10 @@ pub(crate) struct NodeAttachmentSchedulingScore { /// The number of shards belonging to the tenant currently being /// scheduled that are attached to this node. affinity_score: AffinityScore, + /// Flag indicating whether this node matches the preferred AZ + /// of the shard. For equal affinity scores, nodes in the matching AZ + /// are considered first. + az_match: AttachmentAzMatch, /// Size of [`ScheduleContext::attached_nodes`] for the current node. /// This normally tracks the number of attached shards belonging to the /// tenant being scheduled that are already on this node. @@ -87,6 +160,7 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { fn generate( node_id: &NodeId, node: &mut SchedulerNode, + preferred_az: &Option, context: &ScheduleContext, ) -> Option { let utilization = match &mut node.may_schedule { @@ -102,6 +176,7 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { .get(node_id) .copied() .unwrap_or(AffinityScore::FREE), + az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())), attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0), utilization_score: utilization.cached_score(), total_attached_shard_count: node.attached_shard_count, @@ -123,6 +198,11 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { /// Ordering is given by member declaration order (top to bottom). #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] pub(crate) struct NodeSecondarySchedulingScore { + /// Flag indicating whether this node matches the preferred AZ + /// of the shard. For secondary locations we wish to avoid nodes in. + /// the preferred AZ of the shard, since that's where the attached location + /// should be scheduled and having the secondary in the same AZ is bad for HA. + az_match: SecondaryAzMatch, /// The number of shards belonging to the tenant currently being /// scheduled that are attached to this node. affinity_score: AffinityScore, @@ -139,6 +219,7 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore { fn generate( node_id: &NodeId, node: &mut SchedulerNode, + preferred_az: &Option, context: &ScheduleContext, ) -> Option { let utilization = match &mut node.may_schedule { @@ -149,6 +230,7 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore { }; Some(Self { + az_match: SecondaryAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())), affinity_score: context .nodes .get(node_id) @@ -179,6 +261,7 @@ impl PartialEq for SchedulerNode { may_schedule_matches && self.shard_count == other.shard_count && self.attached_shard_count == other.attached_shard_count + && self.az == other.az } } @@ -293,6 +376,7 @@ impl Scheduler { shard_count: 0, attached_shard_count: 0, may_schedule: node.may_schedule(), + az: node.get_availability_zone_id().clone(), }, ); } @@ -319,6 +403,7 @@ impl Scheduler { shard_count: 0, attached_shard_count: 0, may_schedule: node.may_schedule(), + az: node.get_availability_zone_id().clone(), }, ); } @@ -497,6 +582,7 @@ impl Scheduler { shard_count: 0, attached_shard_count: 0, may_schedule: node.may_schedule(), + az: node.get_availability_zone_id().clone(), }); } } @@ -542,6 +628,7 @@ impl Scheduler { fn compute_node_scores( &mut self, hard_exclude: &[NodeId], + preferred_az: &Option, context: &ScheduleContext, ) -> Vec where @@ -553,7 +640,7 @@ impl Scheduler { if hard_exclude.contains(k) { None } else { - Score::generate(k, v, context) + Score::generate(k, v, preferred_az, context) } }) .collect() @@ -571,13 +658,15 @@ impl Scheduler { pub(crate) fn schedule_shard( &mut self, hard_exclude: &[NodeId], + preferred_az: &Option, context: &ScheduleContext, ) -> Result { if self.nodes.is_empty() { return Err(ScheduleError::NoPageservers); } - let mut scores = self.compute_node_scores::(hard_exclude, context); + let mut scores = + self.compute_node_scores::(hard_exclude, preferred_az, context); // Exclude nodes whose utilization is critically high, if there are alternatives available. This will // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example @@ -634,6 +723,12 @@ impl Scheduler { Ok(node_id) } + /// Selects any available node. This is suitable for performing background work (e.g. S3 + /// deletions). + pub(crate) fn any_available_node(&mut self) -> Result { + self.schedule_shard::(&[], &None, &ScheduleContext::default()) + } + /// Unit test access to internal state #[cfg(test)] pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize { @@ -650,13 +745,22 @@ impl Scheduler { pub(crate) mod test_utils { use crate::node::Node; - use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization}; + use pageserver_api::{ + controller_api::{AvailabilityZone, NodeAvailability}, + models::utilization::test_utilization, + }; use std::collections::HashMap; use utils::id::NodeId; + /// Test helper: synthesize the requested number of nodes, all in active state. /// /// Node IDs start at one. - pub(crate) fn make_test_nodes(n: u64) -> HashMap { + /// + /// The `azs` argument specifies the list of availability zones which will be assigned + /// to nodes in round-robin fashion. If empy, a default AZ is assigned. + pub(crate) fn make_test_nodes(n: u64, azs: &[AvailabilityZone]) -> HashMap { + let mut az_iter = azs.iter().cycle(); + (1..n + 1) .map(|i| { (NodeId(i), { @@ -666,7 +770,10 @@ pub(crate) mod test_utils { 80 + i as u16, format!("pghost-{i}"), 5432 + i as u16, - "test-az".to_string(), + az_iter + .next() + .cloned() + .unwrap_or(AvailabilityZone("test-az".to_string())), ); node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0))); assert!(node.is_available()); @@ -686,7 +793,7 @@ mod tests { use crate::tenant_shard::IntentState; #[test] fn scheduler_basic() -> anyhow::Result<()> { - let nodes = test_utils::make_test_nodes(2); + let nodes = test_utils::make_test_nodes(2, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut t1_intent = IntentState::new(); @@ -694,9 +801,9 @@ mod tests { let context = ScheduleContext::default(); - let scheduled = scheduler.schedule_shard::(&[], &context)?; + let scheduled = scheduler.schedule_shard::(&[], &None, &context)?; t1_intent.set_attached(&mut scheduler, Some(scheduled)); - let scheduled = scheduler.schedule_shard::(&[], &context)?; + let scheduled = scheduler.schedule_shard::(&[], &None, &context)?; t2_intent.set_attached(&mut scheduler, Some(scheduled)); assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1); @@ -705,8 +812,11 @@ mod tests { assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1); assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1); - let scheduled = - scheduler.schedule_shard::(&t1_intent.all_pageservers(), &context)?; + let scheduled = scheduler.schedule_shard::( + &t1_intent.all_pageservers(), + &None, + &context, + )?; t1_intent.push_secondary(&mut scheduler, scheduled); assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1); @@ -746,7 +856,7 @@ mod tests { #[test] /// Test the PageserverUtilization's contribution to scheduling algorithm fn scheduler_utilization() { - let mut nodes = test_utils::make_test_nodes(3); + let mut nodes = test_utils::make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); // Need to keep these alive because they contribute to shard counts via RAII @@ -761,7 +871,7 @@ mod tests { context: &ScheduleContext, ) { let scheduled = scheduler - .schedule_shard::(&[], context) + .schedule_shard::(&[], &None, context) .unwrap(); let mut intent = IntentState::new(); intent.set_attached(scheduler, Some(scheduled)); @@ -870,4 +980,98 @@ mod tests { intent.clear(&mut scheduler); } } + + #[test] + /// A simple test that showcases AZ-aware scheduling and its interaction with + /// affinity scores. + fn az_scheduling() { + let az_a_tag = AvailabilityZone("az-a".to_string()); + let az_b_tag = AvailabilityZone("az-b".to_string()); + + let nodes = test_utils::make_test_nodes(3, &[az_a_tag.clone(), az_b_tag.clone()]); + let mut scheduler = Scheduler::new(nodes.values()); + + // Need to keep these alive because they contribute to shard counts via RAII + let mut scheduled_intents = Vec::new(); + + let mut context = ScheduleContext::default(); + + fn assert_scheduler_chooses( + expect_node: NodeId, + preferred_az: Option, + scheduled_intents: &mut Vec, + scheduler: &mut Scheduler, + context: &mut ScheduleContext, + ) { + let scheduled = scheduler + .schedule_shard::(&[], &preferred_az, context) + .unwrap(); + let mut intent = IntentState::new(); + intent.set_attached(scheduler, Some(scheduled)); + scheduled_intents.push(intent); + assert_eq!(scheduled, expect_node); + + context.avoid(&[scheduled]); + } + + assert_scheduler_chooses::( + NodeId(1), + Some(az_a_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Node 2 and 3 have affinity score equal to 0, but node 3 + // is in "az-a" so we prefer that. + assert_scheduler_chooses::( + NodeId(3), + Some(az_a_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Node 2 is not in "az-a", but it has the lowest affinity so we prefer that. + assert_scheduler_chooses::( + NodeId(2), + Some(az_a_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Avoid nodes in "az-a" for the secondary location. + assert_scheduler_chooses::( + NodeId(2), + Some(az_a_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Avoid nodes in "az-b" for the secondary location. + // Nodes 1 and 3 are identically loaded, so prefer the lowest node id. + assert_scheduler_chooses::( + NodeId(1), + Some(az_b_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Avoid nodes in "az-b" for the secondary location. + // Node 3 has lower affinity score than 1, so prefer that. + assert_scheduler_chooses::( + NodeId(3), + Some(az_b_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + for mut intent in scheduled_intents { + intent.clear(&mut scheduler); + } + } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 5555505b81d9..a5e012968475 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -26,7 +26,7 @@ use crate::{ ShardGenerationState, TenantFilter, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, - scheduler::{AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, + scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, tenant_shard::{ MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization, ScheduleOptimizationAction, @@ -1265,6 +1265,8 @@ impl Service { #[cfg(feature = "testing")] { + use pageserver_api::controller_api::AvailabilityZone; + // Hack: insert scheduler state for all nodes referenced by shards, as compatibility // tests only store the shards, not the nodes. The nodes will be loaded shortly // after when pageservers start up and register. @@ -1282,7 +1284,7 @@ impl Service { 123, "".to_string(), 123, - "test_az".to_string(), + AvailabilityZone("test_az".to_string()), ); scheduler.node_upsert(&node); @@ -2099,7 +2101,7 @@ impl Service { let az_id = locked .nodes .get(&resp.node_id) - .map(|n| n.get_availability_zone_id().to_string())?; + .map(|n| n.get_availability_zone_id().clone())?; Some((resp.shard_id, az_id)) }) @@ -2629,8 +2631,7 @@ impl Service { let scheduler = &mut locked.scheduler; // Right now we only perform the operation on a single node without parallelization // TODO fan out the operation to multiple nodes for better performance - let node_id = - scheduler.schedule_shard::(&[], &ScheduleContext::default())?; + let node_id = scheduler.any_available_node()?; let node = locked .nodes .get(&node_id) @@ -2816,8 +2817,7 @@ impl Service { // Pick an arbitrary node to use for remote deletions (does not have to be where the tenant // was attached, just has to be able to see the S3 content) - let node_id = - scheduler.schedule_shard::(&[], &ScheduleContext::default())?; + let node_id = scheduler.any_available_node()?; let node = nodes .get(&node_id) .expect("Pageservers may not be deleted while lock is active"); @@ -3508,34 +3508,66 @@ impl Service { /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this /// function looks up and returns node. If the tenant isn't found, returns Err(ApiError::NotFound) - pub(crate) fn tenant_shard0_node( + pub(crate) async fn tenant_shard0_node( &self, tenant_id: TenantId, ) -> Result<(Node, TenantShardId), ApiError> { - let locked = self.inner.read().unwrap(); - let Some((tenant_shard_id, shard)) = locked - .tenants - .range(TenantShardId::tenant_range(tenant_id)) - .next() - else { - return Err(ApiError::NotFound( - anyhow::anyhow!("Tenant {tenant_id} not found").into(), - )); + // Look up in-memory state and maybe use the node from there. + { + let locked = self.inner.read().unwrap(); + let Some((tenant_shard_id, shard)) = locked + .tenants + .range(TenantShardId::tenant_range(tenant_id)) + .next() + else { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant {tenant_id} not found").into(), + )); + }; + + let Some(intent_node_id) = shard.intent.get_attached() else { + tracing::warn!( + tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), + "Shard not scheduled (policy {:?}), cannot generate pass-through URL", + shard.policy + ); + return Err(ApiError::Conflict( + "Cannot call timeline API on non-attached tenant".to_string(), + )); + }; + + if shard.reconciler.is_none() { + // Optimization: while no reconcile is in flight, we may trust our in-memory state + // to tell us which pageserver to use. Otherwise we will fall through and hit the database + let Some(node) = locked.nodes.get(intent_node_id) else { + // This should never happen + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Shard refers to nonexistent node" + ))); + }; + return Ok((node.clone(), *tenant_shard_id)); + } }; - // TODO: should use the ID last published to compute_hook, rather than the intent: the intent might - // point to somewhere we haven't attached yet. - let Some(node_id) = shard.intent.get_attached() else { - tracing::warn!( - tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), - "Shard not scheduled (policy {:?}), cannot generate pass-through URL", - shard.policy - ); - return Err(ApiError::Conflict( - "Cannot call timeline API on non-attached tenant".to_string(), + // Look up the latest attached pageserver location from the database + // generation state: this will reflect the progress of any ongoing migration. + // Note that it is not guaranteed to _stay_ here, our caller must still handle + // the case where they call through to the pageserver and get a 404. + let db_result = self.persistence.tenant_generations(tenant_id).await?; + let Some(ShardGenerationState { + tenant_shard_id, + generation: _, + generation_pageserver: Some(node_id), + }) = db_result.first() + else { + // This can happen if we raced with a tenant deletion or a shard split. On a retry + // the caller will either succeed (shard split case), get a proper 404 (deletion case), + // or a conflict response (case where tenant was detached in background) + return Err(ApiError::ResourceUnavailable( + "Shard {} not found in database, or is not attached".into(), )); }; - + let locked = self.inner.read().unwrap(); let Some(node) = locked.nodes.get(node_id) else { // This should never happen return Err(ApiError::InternalServerError(anyhow::anyhow!( @@ -4481,7 +4513,7 @@ impl Service { let az_id = locked .nodes .get(node_id) - .map(|n| n.get_availability_zone_id().to_string())?; + .map(|n| n.get_availability_zone_id().clone())?; Some((*tid, az_id)) }) diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 1f5eb423be8b..afc89eae0073 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -15,7 +15,7 @@ use crate::{ service::ReconcileResultRequest, }; use pageserver_api::controller_api::{ - NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, + AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, }; use pageserver_api::{ models::{LocationConfig, LocationConfigMode, TenantConfig}, @@ -146,7 +146,7 @@ pub(crate) struct TenantShard { // We should attempt to schedule this shard in the provided AZ to // decrease chances of cross-AZ compute. - preferred_az_id: Option, + preferred_az_id: Option, } #[derive(Default, Clone, Debug, Serialize)] @@ -540,14 +540,22 @@ impl TenantShard { Ok((true, promote_secondary)) } else { // Pick a fresh node: either we had no secondaries or none were schedulable - let node_id = - scheduler.schedule_shard::(&self.intent.secondary, context)?; + let node_id = scheduler.schedule_shard::( + &self.intent.secondary, + &self.preferred_az_id, + context, + )?; tracing::debug!("Selected {} as attached", node_id); self.intent.set_attached(scheduler, Some(node_id)); Ok((true, node_id)) } } + #[instrument(skip_all, fields( + tenant_id=%self.tenant_shard_id.tenant_id, + shard_id=%self.tenant_shard_id.shard_slug(), + sequence=%self.sequence + ))] pub(crate) fn schedule( &mut self, scheduler: &mut Scheduler, @@ -617,8 +625,11 @@ impl TenantShard { let mut used_pageservers = vec![attached_node_id]; while self.intent.secondary.len() < secondary_count { - let node_id = scheduler - .schedule_shard::(&used_pageservers, context)?; + let node_id = scheduler.schedule_shard::( + &used_pageservers, + &self.preferred_az_id, + context, + )?; self.intent.push_secondary(scheduler, node_id); used_pageservers.push(node_id); modified = true; @@ -631,7 +642,11 @@ impl TenantShard { modified = true; } else if self.intent.secondary.is_empty() { // Populate secondary by scheduling a fresh node - let node_id = scheduler.schedule_shard::(&[], context)?; + let node_id = scheduler.schedule_shard::( + &[], + &self.preferred_az_id, + context, + )?; self.intent.push_secondary(scheduler, node_id); modified = true; } @@ -810,6 +825,7 @@ impl TenantShard { // with lower utilization. let Ok(candidate_node) = scheduler.schedule_shard::( &self.intent.all_pageservers(), + &self.preferred_az_id, schedule_context, ) else { // A scheduling error means we have no possible candidate replacements @@ -1308,7 +1324,7 @@ impl TenantShard { pending_compute_notification: false, delayed_reconcile: false, scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(), - preferred_az_id: tsp.preferred_az_id, + preferred_az_id: tsp.preferred_az_id.map(AvailabilityZone), }) } @@ -1324,15 +1340,15 @@ impl TenantShard { config: serde_json::to_string(&self.config).unwrap(), splitting: SplitState::default(), scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(), - preferred_az_id: self.preferred_az_id.clone(), + preferred_az_id: self.preferred_az_id.as_ref().map(|az| az.0.clone()), } } - pub(crate) fn preferred_az(&self) -> Option<&str> { - self.preferred_az_id.as_deref() + pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> { + self.preferred_az_id.as_ref() } - pub(crate) fn set_preferred_az(&mut self, preferred_az_id: String) { + pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) { self.preferred_az_id = Some(preferred_az_id); } } @@ -1345,6 +1361,7 @@ pub(crate) mod tests { controller_api::NodeAvailability, shard::{ShardCount, ShardNumber}, }; + use rand::{rngs::StdRng, SeedableRng}; use utils::id::TenantId; use crate::scheduler::test_utils::make_test_nodes; @@ -1373,7 +1390,11 @@ pub(crate) mod tests { ) } - fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec { + fn make_test_tenant( + policy: PlacementPolicy, + shard_count: ShardCount, + preferred_az: Option, + ) -> Vec { let tenant_id = TenantId::generate(); (0..shard_count.count()) @@ -1385,7 +1406,7 @@ pub(crate) mod tests { shard_number, shard_count, }; - TenantShard::new( + let mut ts = TenantShard::new( tenant_shard_id, ShardIdentity::new( shard_number, @@ -1394,7 +1415,13 @@ pub(crate) mod tests { ) .unwrap(), policy.clone(), - ) + ); + + if let Some(az) = &preferred_az { + ts.set_preferred_az(az.clone()); + } + + ts }) .collect() } @@ -1405,7 +1432,7 @@ pub(crate) mod tests { fn tenant_ha_scheduling() -> anyhow::Result<()> { // Start with three nodes. Our tenant will only use two. The third one is // expected to remain unused. - let mut nodes = make_test_nodes(3); + let mut nodes = make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut context = ScheduleContext::default(); @@ -1457,7 +1484,7 @@ pub(crate) mod tests { #[test] fn intent_from_observed() -> anyhow::Result<()> { - let nodes = make_test_nodes(3); + let nodes = make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1)); @@ -1507,7 +1534,7 @@ pub(crate) mod tests { #[test] fn scheduling_mode() -> anyhow::Result<()> { - let nodes = make_test_nodes(3); + let nodes = make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1)); @@ -1532,7 +1559,7 @@ pub(crate) mod tests { #[test] fn optimize_attachment() -> anyhow::Result<()> { - let nodes = make_test_nodes(3); + let nodes = make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); @@ -1599,7 +1626,7 @@ pub(crate) mod tests { #[test] fn optimize_secondary() -> anyhow::Result<()> { - let nodes = make_test_nodes(4); + let nodes = make_test_nodes(4, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); @@ -1698,14 +1725,14 @@ pub(crate) mod tests { /// that it converges. #[test] fn optimize_add_nodes() -> anyhow::Result<()> { - let nodes = make_test_nodes(4); + let nodes = make_test_nodes(4, &[]); // Only show the scheduler a couple of nodes let mut scheduler = Scheduler::new([].iter()); scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap()); scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap()); - let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4)); + let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None); let mut schedule_context = ScheduleContext::default(); for shard in &mut shards { assert!(shard @@ -1754,16 +1781,16 @@ pub(crate) mod tests { fn initial_scheduling_is_optimal() -> anyhow::Result<()> { use itertools::Itertools; - let nodes = make_test_nodes(2); + let nodes = make_test_nodes(2, &[]); let mut scheduler = Scheduler::new([].iter()); scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap()); scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap()); - let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4)); + let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None); let a_context = Rc::new(RefCell::new(ScheduleContext::default())); - let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4)); + let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None); let b_context = Rc::new(RefCell::new(ScheduleContext::default())); let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone())); @@ -1788,4 +1815,147 @@ pub(crate) mod tests { Ok(()) } + + #[test] + fn random_az_shard_scheduling() -> anyhow::Result<()> { + use rand::seq::SliceRandom; + + for seed in 0..50 { + eprintln!("Running test with seed {seed}"); + let mut rng = StdRng::seed_from_u64(seed); + + let az_a_tag = AvailabilityZone("az-a".to_string()); + let az_b_tag = AvailabilityZone("az-b".to_string()); + let azs = [az_a_tag, az_b_tag]; + let nodes = make_test_nodes(4, &azs); + let mut shards_per_az: HashMap = HashMap::new(); + + let mut scheduler = Scheduler::new([].iter()); + for node in nodes.values() { + scheduler.node_upsert(node); + } + + let mut shards = Vec::default(); + let mut contexts = Vec::default(); + let mut az_picker = azs.iter().cycle().cloned(); + for i in 0..100 { + let az = az_picker.next().unwrap(); + let shard_count = i % 4 + 1; + *shards_per_az.entry(az.clone()).or_default() += shard_count; + + let tenant_shards = make_test_tenant( + PlacementPolicy::Attached(1), + ShardCount::new(shard_count.try_into().unwrap()), + Some(az), + ); + let context = Rc::new(RefCell::new(ScheduleContext::default())); + + contexts.push(context.clone()); + let with_ctx = tenant_shards + .into_iter() + .map(|shard| (shard, context.clone())); + for shard_with_ctx in with_ctx { + shards.push(shard_with_ctx); + } + } + + shards.shuffle(&mut rng); + + #[derive(Default, Debug)] + struct NodeStats { + attachments: u32, + secondaries: u32, + } + + let mut node_stats: HashMap = HashMap::default(); + let mut attachments_in_wrong_az = 0; + let mut secondaries_in_wrong_az = 0; + + for (shard, context) in &mut shards { + let context = &mut *context.borrow_mut(); + shard.schedule(&mut scheduler, context).unwrap(); + + let attached_node = shard.intent.get_attached().unwrap(); + let stats = node_stats.entry(attached_node).or_default(); + stats.attachments += 1; + + let secondary_node = *shard.intent.get_secondary().first().unwrap(); + let stats = node_stats.entry(secondary_node).or_default(); + stats.secondaries += 1; + + let attached_node_az = nodes + .get(&attached_node) + .unwrap() + .get_availability_zone_id(); + let secondary_node_az = nodes + .get(&secondary_node) + .unwrap() + .get_availability_zone_id(); + let preferred_az = shard.preferred_az().unwrap(); + + if attached_node_az != preferred_az { + eprintln!( + "{} attachment was scheduled in AZ {} but preferred AZ {}", + shard.tenant_shard_id, attached_node_az, preferred_az + ); + attachments_in_wrong_az += 1; + } + + if secondary_node_az == preferred_az { + eprintln!( + "{} secondary was scheduled in AZ {} which matches preference", + shard.tenant_shard_id, attached_node_az + ); + secondaries_in_wrong_az += 1; + } + } + + let mut violations = Vec::default(); + + if attachments_in_wrong_az > 0 { + violations.push(format!( + "{} attachments scheduled to the incorrect AZ", + attachments_in_wrong_az + )); + } + + if secondaries_in_wrong_az > 0 { + violations.push(format!( + "{} secondaries scheduled to the incorrect AZ", + secondaries_in_wrong_az + )); + } + + eprintln!( + "attachments_in_wrong_az={} secondaries_in_wrong_az={}", + attachments_in_wrong_az, secondaries_in_wrong_az + ); + + for (node_id, stats) in &node_stats { + let node_az = nodes.get(node_id).unwrap().get_availability_zone_id(); + let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2; + let allowed_attachment_load = + (ideal_attachment_load - 1)..(ideal_attachment_load + 2); + + if !allowed_attachment_load.contains(&stats.attachments) { + violations.push(format!( + "Found {} attachments on node {}, but expected {}", + stats.attachments, node_id, ideal_attachment_load + )); + } + + eprintln!( + "{}: attachments={} secondaries={} ideal_attachment_load={}", + node_id, stats.attachments, stats.secondaries, ideal_attachment_load + ); + } + + assert!(violations.is_empty(), "{violations:?}"); + + for (mut shard, _ctx) in shards { + shard.intent.clear(&mut scheduler); + } + } + Ok(()) + } } diff --git a/storage_scrubber/Cargo.toml b/storage_scrubber/Cargo.toml index f9987662b9f5..a1b5b0b12f19 100644 --- a/storage_scrubber/Cargo.toml +++ b/storage_scrubber/Cargo.toml @@ -8,7 +8,6 @@ license.workspace = true aws-sdk-s3.workspace = true either.workspace = true anyhow.workspace = true -git-version.workspace = true hex.workspace = true humantime.workspace = true serde.workspace = true diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 55c1423ed0d5..201eb1087ded 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2553,7 +2553,7 @@ def poll_node_status( desired_availability: Optional[PageserverAvailability], desired_scheduling_policy: Optional[PageserverSchedulingPolicy], max_attempts: int, - backoff: int, + backoff: float, ): """ Poll the node status until it reaches 'desired_scheduling_policy' and 'desired_availability' @@ -2948,7 +2948,7 @@ def start( self.id ): self.env.storage_controller.poll_node_status( - self.id, PageserverAvailability.ACTIVE, None, max_attempts=20, backoff=1 + self.id, PageserverAvailability.ACTIVE, None, max_attempts=200, backoff=0.1 ) return self @@ -4617,7 +4617,8 @@ def scrubber_cli( "REGION": s3_storage.bucket_region, "BUCKET": s3_storage.bucket_name, "BUCKET_PREFIX": s3_storage.prefix_in_bucket, - "RUST_LOG": "DEBUG", + "RUST_LOG": "INFO", + "PAGESERVER_DISABLE_FILE_LOGGING": "1", } env.update(s3_storage.access_env_vars()) @@ -4637,10 +4638,8 @@ def scrubber_cli( (output_path, stdout, status_code) = subprocess_capture( self.log_dir, args, - echo_stderr=True, - echo_stdout=True, env=env, - check=False, + check=True, capture_stdout=True, timeout=timeout, ) diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 35e0c0decb26..be8f70bb7076 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -198,9 +198,6 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool): def run_pgbench(connstr: str, pg_bin: PgBin): log.info(f"Start a pgbench workload on pg {connstr}") - # s10 is about 150MB of data. In debug mode init takes about 15s on SSD. - pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", "-s10", connstr]) - log.info("pgbench init done") pg_bin.run_capture(["pgbench", "-T60", connstr]) @@ -247,9 +244,15 @@ def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): log.info( f"primary connstr is {primary.connstr()}, secondary connstr {secondary.connstr()}" ) + + # s10 is about 150MB of data. In debug mode init takes about 15s on SSD. + pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", "-s10", primary.connstr()]) + log.info("pgbench init done in primary") + t = threading.Thread(target=run_pgbench, args=(primary.connstr(), pg_bin)) t.start() - # Wait until pgbench_accounts is created + filled on replica *and* + + # Wait until we see that the pgbench_accounts is created + filled on replica *and* # index is created. Otherwise index creation would conflict with # read queries and hs feedback won't save us. wait_until(60, 1.0, partial(pgbench_accounts_initialized, secondary)) diff --git a/test_runner/regress/test_lfc_resize.py b/test_runner/regress/test_lfc_resize.py index cb0b30d9c6e8..0f791e924707 100644 --- a/test_runner/regress/test_lfc_resize.py +++ b/test_runner/regress/test_lfc_resize.py @@ -10,11 +10,11 @@ from fixtures.neon_fixtures import NeonEnv, PgBin -# -# Test branching, when a transaction is in prepared state -# @pytest.mark.timeout(600) def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin): + """ + Test resizing the Local File Cache + """ env = neon_simple_env endpoint = env.endpoints.create_start( "main", @@ -32,27 +32,48 @@ def run_pgbench(connstr: str): pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr]) pg_bin.run_capture(["pgbench", "-c10", f"-T{n_resize}", "-Mprepared", "-S", connstr]) - thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True) + # Initializing the pgbench database can be very slow, especially on debug builds. + connstr = endpoint.connstr(options="-cstatement_timeout=300s") + + thread = threading.Thread(target=run_pgbench, args=(connstr,), daemon=True) thread.start() conn = endpoint.connect() cur = conn.cursor() - for _ in range(n_resize): + # For as long as pgbench is running, twiddle the LFC size once a second. + # Note that we launch this immediately, already while the "pgbench -i" + # initialization step is still running. That's quite a different workload + # than the actual pgbench benchamark run, so this gives us coverage of both. + while thread.is_alive(): size = random.randint(1, 512) cur.execute(f"alter system set neon.file_cache_size_limit='{size}MB'") cur.execute("select pg_reload_conf()") time.sleep(1) + thread.join() + # At the end, set it at 100 MB, and perform a final check that the disk usage + # of the file is in that ballbark. + # + # We retry the check a few times, because it might take a while for the + # system to react to changing the setting and shrinking the file. cur.execute("alter system set neon.file_cache_size_limit='100MB'") cur.execute("select pg_reload_conf()") + nretries = 10 + while True: + lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache" + lfc_file_size = os.path.getsize(lfc_file_path) + res = subprocess.run( + ["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True + ) + lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0] + log.info(f"Size of LFC file {lfc_file_size}, blocks {lfc_file_blocks}") + assert lfc_file_size <= 512 * 1024 * 1024 - thread.join() + if int(lfc_file_blocks) <= 128 * 1024 or nretries == 0: + break + + nretries = nretries - 1 + time.sleep(1) - lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache" - lfc_file_size = os.path.getsize(lfc_file_path) - res = subprocess.run(["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True) - lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0] - log.info(f"Size of LFC file {lfc_file_size}, blocks {lfc_file_blocks}") - assert lfc_file_size <= 512 * 1024 * 1024 assert int(lfc_file_blocks) <= 128 * 1024 diff --git a/test_runner/regress/test_readonly_node.py b/test_runner/regress/test_readonly_node.py index 347fc3a04ddb..5e8b8d38f7e2 100644 --- a/test_runner/regress/test_readonly_node.py +++ b/test_runner/regress/test_readonly_node.py @@ -122,6 +122,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): Test static endpoint is protected from GC by acquiring and renewing lsn leases. """ + LSN_LEASE_LENGTH = 8 neon_env_builder.num_pageservers = 2 # GC is manual triggered. env = neon_env_builder.init_start( @@ -139,7 +140,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): "image_creation_threshold": "1", "image_layer_creation_check_threshold": "0", # Short lease length to fit test. - "lsn_lease_length": "3s", + "lsn_lease_length": f"{LSN_LEASE_LENGTH}s", }, initial_tenant_shard_count=2, ) @@ -170,10 +171,14 @@ def generate_updates_on_main( with env.endpoints.create_start("main") as ep_main: with ep_main.cursor() as cur: cur.execute("CREATE TABLE t0(v0 int primary key, v1 text)") - lsn = None + lsn = Lsn(0) for i in range(2): lsn = generate_updates_on_main(env, ep_main, i) + # Round down to the closest LSN on page boundary (unnormalized). + XLOG_BLCKSZ = 8192 + lsn = Lsn((int(lsn) // XLOG_BLCKSZ) * XLOG_BLCKSZ) + with env.endpoints.create_start( branch_name="main", endpoint_id="static", @@ -183,7 +188,8 @@ def generate_updates_on_main( cur.execute("SELECT count(*) FROM t0") assert cur.fetchone() == (ROW_COUNT,) - time.sleep(3) + # Wait for static compute to renew lease at least once. + time.sleep(LSN_LEASE_LENGTH / 2) generate_updates_on_main(env, ep_main, i, end=100) @@ -204,8 +210,9 @@ def generate_updates_on_main( # Do some update so we can increment latest_gc_cutoff generate_updates_on_main(env, ep_main, i, end=100) + # Wait for the existing lease to expire. + time.sleep(LSN_LEASE_LENGTH) # Now trigger GC again, layers should be removed. - time.sleep(4) for shard, ps in tenant_get_shards(env, env.initial_tenant): client = ps.http_client() gc_result = client.timeline_gc(shard, env.initial_timeline, 0) diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 4106efd4f9cc..3861f0b82274 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -4,6 +4,7 @@ import time from collections import defaultdict from datetime import datetime, timezone +from enum import Enum from typing import Any, Dict, List, Optional, Set, Tuple, Union import pytest @@ -2466,6 +2467,87 @@ def has_hit_migration_failpoint(): raise +class MigrationFailpoints(Enum): + # While only the origin is attached + PRE_GENERATION_INC = "reconciler-live-migrate-pre-generation-inc" + # While both locations are attached + POST_NOTIFY = "reconciler-live-migrate-post-notify" + # While only the destination is attached + POST_DETACH = "reconciler-live-migrate-post-detach" + + +@pytest.mark.parametrize( + "migration_failpoint", + [ + MigrationFailpoints.PRE_GENERATION_INC, + MigrationFailpoints.POST_NOTIFY, + MigrationFailpoints.POST_DETACH, + ], +) +def test_storage_controller_proxy_during_migration( + neon_env_builder: NeonEnvBuilder, migration_failpoint: MigrationFailpoints +): + """ + If we send a proxied GET request to the controller during a migration, it should route + the request to whichever pageserver was most recently issued a generation. + + Reproducer for https://github.com/neondatabase/neon/issues/9062 + """ + neon_env_builder.num_pageservers = 2 + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + env = neon_env_builder.init_configs() + env.start() + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + env.neon_cli.create_tenant(tenant_id, timeline_id) + + # Activate a failpoint that will cause live migration to get stuck _after_ the generation has been issued + # to the new pageserver: this should result in requests routed to the new pageserver. + env.storage_controller.configure_failpoints((migration_failpoint.value, "pause")) + + origin_pageserver = env.get_tenant_pageserver(tenant_id) + dest_ps_id = [p.id for p in env.pageservers if p.id != origin_pageserver.id][0] + + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + migrate_fut = executor.submit( + env.storage_controller.tenant_shard_migrate, + TenantShardId(tenant_id, 0, 0), + dest_ps_id, + ) + + def has_hit_migration_failpoint(): + expr = f"at failpoint {str(migration_failpoint.value)}" + log.info(expr) + assert env.storage_controller.log_contains(expr) + + wait_until(10, 1, has_hit_migration_failpoint) + + # This request should be routed to whichever pageserver holds the highest generation + tenant_info = env.storage_controller.pageserver_api().tenant_status( + tenant_id, + ) + + if migration_failpoint in ( + MigrationFailpoints.POST_NOTIFY, + MigrationFailpoints.POST_DETACH, + ): + # We expect request to land on the destination + assert tenant_info["generation"] == 2 + elif migration_failpoint == MigrationFailpoints.PRE_GENERATION_INC: + # We expect request to land on the origin + assert tenant_info["generation"] == 1 + + # Eventually migration completes + env.storage_controller.configure_failpoints((migration_failpoint.value, "off")) + migrate_fut.result() + except: + # Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown + env.storage_controller.configure_failpoints((migration_failpoint.value, "off")) + raise + + @run_only_on_default_postgres("this is like a 'unit test' against storcon db") def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_configs()