Skip to content

Commit

Permalink
safekeeper: consider partial uploads when pulling timeline (#8628)
Browse files Browse the repository at this point in the history
## Problem
The control file contains the id of the safekeeper that uploaded it.
Previously, when sending a snapshot of the control file to another sk,
it would eventually be gc-ed by the receiving sk. This is incorrect
because the original sk might still need it later.

## Summary of Changes
When sending a snapshot and the control file contains an uploaded
segment:
* Create a copy of the segment in s3 with the destination sk in the
  object name
* Tweak the streamed control file to point to the object create in the
  previous step

Note that the snapshot endpoint now has to know the id of the requestor,
so the api has been extended to include the node if of the destination
sk.

Closes #8542
  • Loading branch information
VladLazar authored Aug 15, 2024
1 parent 168913b commit fef77b0
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 42 deletions.
42 changes: 25 additions & 17 deletions safekeeper/src/control_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,30 @@ impl Deref for FileStorage {
}
}

impl TimelinePersistentState {
pub(crate) fn write_to_buf(&self) -> Result<Vec<u8>> {
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;

if self.eviction_state == EvictionState::Present {
// temp hack for forward compatibility
const PREV_FORMAT_VERSION: u32 = 8;
let prev = downgrade_v9_to_v8(self);
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
prev.ser_into(&mut buf)?;
} else {
// otherwise, we write the current format version
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
self.ser_into(&mut buf)?;
}

// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
Ok(buf)
}
}

#[async_trait::async_trait]
impl Storage for FileStorage {
/// Persists state durably to the underlying storage.
Expand All @@ -180,24 +204,8 @@ impl Storage for FileStorage {
&control_partial_path
)
})?;
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;

if s.eviction_state == EvictionState::Present {
// temp hack for forward compatibility
const PREV_FORMAT_VERSION: u32 = 8;
let prev = downgrade_v9_to_v8(s);
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
prev.ser_into(&mut buf)?;
} else {
// otherwise, we write the current format version
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
s.ser_into(&mut buf)?;
}

// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
let buf: Vec<u8> = s.write_to_buf()?;

control_partial.write_all(&buf).await.with_context(|| {
format!(
Expand Down
7 changes: 4 additions & 3 deletions safekeeper/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use reqwest::{IntoUrl, Method, StatusCode};
use utils::{
http::error::HttpErrorBody,
id::{TenantId, TimelineId},
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
};

Expand Down Expand Up @@ -97,10 +97,11 @@ impl Client {
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
stream_to: NodeId,
) -> Result<reqwest::Response> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/snapshot",
self.mgmt_api_endpoint, tenant_id, timeline_id
"{}/v1/tenant/{}/timeline/{}/snapshot/{}",
self.mgmt_api_endpoint, tenant_id, timeline_id, stream_to.0
);
self.get(&uri).await
}
Expand Down
11 changes: 9 additions & 2 deletions safekeeper/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo

/// Stream tar archive with all timeline data.
async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let destination = parse_request_param(&request, "destination_id")?;
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
Expand All @@ -225,7 +226,13 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
// so create the chan and write to it in another task.
let (tx, rx) = mpsc::channel(1);

task::spawn(pull_timeline::stream_snapshot(tli, tx));
let conf = get_conf(&request);
task::spawn(pull_timeline::stream_snapshot(
tli,
conf.my_id,
destination,
tx,
));

let rx_stream = ReceiverStream::new(rx);
let body = Body::wrap_stream(rx_stream);
Expand Down Expand Up @@ -565,7 +572,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
request_span(r, tenant_delete_handler)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot",
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
|r| request_span(r, timeline_snapshot_handler),
)
.post("/v1/pull_timeline", |r| {
Expand Down
64 changes: 49 additions & 15 deletions safekeeper/src/pull_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,8 @@ use std::{
io::{self, ErrorKind},
sync::Arc,
};
use tokio::{
fs::{File, OpenOptions},
io::AsyncWrite,
sync::mpsc,
task,
};
use tokio_tar::{Archive, Builder};
use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
use tokio_tar::{Archive, Builder, Header};
use tokio_util::{
io::{CopyToBytes, SinkWriter},
sync::PollSender,
Expand All @@ -32,22 +27,29 @@ use crate::{
routes::TimelineStatus,
},
safekeeper::Term,
state::TimelinePersistentState,
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline},
wal_backup,
wal_storage::{self, open_wal_file, Storage},
GlobalTimelines, SafeKeeperConf,
};
use utils::{
crashsafe::{durable_rename, fsync_async_opt},
id::{TenantId, TenantTimelineId, TimelineId},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
logging::SecretString,
lsn::Lsn,
pausable_failpoint,
};

/// Stream tar archive of timeline to tx.
#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
pub async fn stream_snapshot(tli: WalResidentTimeline, tx: mpsc::Sender<Result<Bytes>>) {
if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await {
pub async fn stream_snapshot(
tli: WalResidentTimeline,
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
) {
if let Err(e) = stream_snapshot_guts(tli, source, destination, tx.clone()).await {
// Error type/contents don't matter as they won't can't reach the client
// (hyper likely doesn't do anything with it), but http stream will be
// prematurely terminated. It would be nice to try to send the error in
Expand Down Expand Up @@ -81,6 +83,8 @@ impl Drop for SnapshotContext {

pub async fn stream_snapshot_guts(
tli: WalResidentTimeline,
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
) -> Result<()> {
// tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
Expand All @@ -104,7 +108,7 @@ pub async fn stream_snapshot_guts(
// which is also likely suboptimal.
let mut ar = Builder::new_non_terminated(pinned_writer);

let bctx = tli.start_snapshot(&mut ar).await?;
let bctx = tli.start_snapshot(&mut ar, source, destination).await?;
pausable_failpoint!("sk-snapshot-after-list-pausable");

let tli_dir = tli.get_timeline_dir();
Expand Down Expand Up @@ -158,13 +162,43 @@ impl WalResidentTimeline {
async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
&self,
ar: &mut tokio_tar::Builder<W>,
source: NodeId,
destination: NodeId,
) -> Result<SnapshotContext> {
let mut shared_state = self.write_shared_state().await;
let wal_seg_size = shared_state.get_wal_seg_size();

let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
let mut cf = File::open(cf_path).await?;
ar.append_file(CONTROL_FILE_NAME, &mut cf).await?;
let mut control_store = TimelinePersistentState::clone(shared_state.sk.state());
// Modify the partial segment of the in-memory copy for the control file to
// point to the destination safekeeper.
let replace = control_store
.partial_backup
.replace_uploaded_segment(source, destination)?;

if let Some(replace) = replace {
// The deserialized control file has an uploaded partial. We upload a copy
// of it to object storage for the destination safekeeper and send an updated
// control file in the snapshot.
tracing::info!(
"Replacing uploaded partial segment in in-mem control file: {replace:?}"
);

let remote_timeline_path = wal_backup::remote_timeline_path(&self.tli.ttid)?;
wal_backup::copy_partial_segment(
&replace.previous.remote_path(&remote_timeline_path),
&replace.current.remote_path(&remote_timeline_path),
)
.await?;
}

let buf = control_store
.write_to_buf()
.with_context(|| "failed to serialize control store")?;
let mut header = Header::new_gnu();
header.set_size(buf.len().try_into().expect("never breaches u64"));
ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
.await
.with_context(|| "failed to append to archive")?;

// We need to stream since the oldest segment someone (s3 or pageserver)
// still needs. This duplicates calc_horizon_lsn logic.
Expand Down Expand Up @@ -342,7 +376,7 @@ async fn pull_timeline(
let client = Client::new(host.clone(), sk_auth_token.clone());
// Request stream with basebackup archive.
let bb_resp = client
.snapshot(status.tenant_id, status.timeline_id)
.snapshot(status.tenant_id, status.timeline_id, conf.my_id)
.await?;

// Make Stream of Bytes from it...
Expand Down
10 changes: 10 additions & 0 deletions safekeeper/src/wal_backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,16 @@ pub(crate) async fn backup_partial_segment(
.await
}

pub(crate) async fn copy_partial_segment(
source: &RemotePath,
destination: &RemotePath,
) -> Result<()> {
let storage = get_configured_remote_storage();
let cancel = CancellationToken::new();

storage.copy_object(source, destination, &cancel).await
}

pub async fn read_object(
file_path: &RemotePath,
offset: u64,
Expand Down
57 changes: 55 additions & 2 deletions safekeeper/src/wal_backup_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
//! file. Code updates state in the control file before doing any S3 operations.
//! This way control file stores information about all potentially existing
//! remote partial segments and can clean them up after uploading a newer version.

use camino::Utf8PathBuf;
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use remote_storage::RemotePath;
use serde::{Deserialize, Serialize};

use tracing::{debug, error, info, instrument, warn};
use utils::lsn::Lsn;
use utils::{id::NodeId, lsn::Lsn};

use crate::{
metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
Expand Down Expand Up @@ -82,6 +81,12 @@ pub struct State {
pub segments: Vec<PartialRemoteSegment>,
}

#[derive(Debug)]
pub(crate) struct ReplaceUploadedSegment {
pub(crate) previous: PartialRemoteSegment,
pub(crate) current: PartialRemoteSegment,
}

impl State {
/// Find an Uploaded segment. There should be only one Uploaded segment at a time.
pub(crate) fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
Expand All @@ -90,6 +95,54 @@ impl State {
.find(|seg| seg.status == UploadStatus::Uploaded)
.cloned()
}

/// Replace the name of the Uploaded segment (if one exists) in order to match
/// it with `destination` safekeeper. Returns a description of the change or None
/// wrapped in anyhow::Result.
pub(crate) fn replace_uploaded_segment(
&mut self,
source: NodeId,
destination: NodeId,
) -> anyhow::Result<Option<ReplaceUploadedSegment>> {
let current = self
.segments
.iter_mut()
.find(|seg| seg.status == UploadStatus::Uploaded);

let current = match current {
Some(some) => some,
None => {
return anyhow::Ok(None);
}
};

// Sanity check that the partial segment we are replacing is belongs
// to the `source` SK.
if !current
.name
.ends_with(format!("sk{}.partial", source.0).as_str())
{
anyhow::bail!(
"Partial segment name ({}) doesn't match self node id ({})",
current.name,
source
);
}

let previous = current.clone();

let new_name = current.name.replace(
format!("_sk{}", source.0).as_str(),
format!("_sk{}", destination.0).as_str(),
);

current.name = new_name;

anyhow::Ok(Some(ReplaceUploadedSegment {
previous,
current: current.clone(),
}))
}
}

struct PartialBackup {
Expand Down
23 changes: 21 additions & 2 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import (
LocalFsStorage,
MockS3Server,
RemoteStorage,
RemoteStorageKind,
Expand Down Expand Up @@ -4425,14 +4426,32 @@ def data_dir(self) -> Path:
def timeline_dir(self, tenant_id, timeline_id) -> Path:
return self.data_dir / str(tenant_id) / str(timeline_id)

def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId):
tline_path = (
self.env.repo_dir
/ "local_fs_remote_storage"
/ "safekeeper"
/ str(tenant_id)
/ str(timeline_id)
)
assert isinstance(self.env.safekeepers_remote_storage, LocalFsStorage)
return self._list_segments_in_dir(
tline_path, lambda name: ".metadata" not in name and ".___temp" not in name
)

def list_segments(self, tenant_id, timeline_id) -> List[str]:
"""
Get list of segment names of the given timeline.
"""
tli_dir = self.timeline_dir(tenant_id, timeline_id)
return self._list_segments_in_dir(
tli_dir, lambda name: not name.startswith("safekeeper.control")
)

def _list_segments_in_dir(self, path: Path, keep_filter: Callable[[str], bool]) -> list[str]:
segments = []
for _, _, filenames in os.walk(tli_dir):
segments.extend([f for f in filenames if not f.startswith("safekeeper.control")])
for _, _, filenames in os.walk(path):
segments.extend([f for f in filenames if keep_filter(f)])
segments.sort()
return segments

Expand Down
Loading

1 comment on commit fef77b0

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2248 tests run: 2168 passed, 1 failed, 79 skipped (full report)


Failures on Postgres 14

  • test_basebackup_with_high_slru_count[github-actions-selfhosted-10-13-30]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_basebackup_with_high_slru_count[release-pg14-github-actions-selfhosted-10-13-30]"
Flaky tests (1)

Postgres 16

  • test_layer_bloating: debug

Code coverage* (full report)

  • functions: 32.4% (7220 of 22263 functions)
  • lines: 50.4% (58300 of 115720 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
fef77b0 at 2024-08-15T10:02:17.137Z :recycle:

Please sign in to comment.